Thursday, February 25, 2010

02/25:Convert Excel To DataTable And Vice Versa

 

  • Will Use NOIP (http://npoi.codeplex.com/)
    • Add a class with the following code
      • using System;

        using System.Collections.Generic;

        using System.Data;

        using System.IO;

        using System.Web;

        using NPOI;
        using NPOI.HPSF;

        using NPOI.HSSF;
        using NPOI.HSSF.UserModel;

        using NPOI.POIFS;

        using NPOI.Util;

        public class DataTableRenderToExcel
        {

            public static Stream RenderDataTableToExcel(DataTable SourceTable)
            {
                HSSFWorkbook workbook = new HSSFWorkbook();

                MemoryStream ms = new MemoryStream();

                HSSFSheet sheet = workbook.CreateSheet();

                HSSFRow headerRow = sheet.CreateRow(0);

                // handling header.      

                foreach (DataColumn column in SourceTable.Columns)

                    headerRow.CreateCell(column.Ordinal).SetCellValue(column.ColumnName);

                // handling value.       
                int rowIndex = 1;     

                foreach (DataRow row in SourceTable.Rows)
                {

                    HSSFRow dataRow = sheet.CreateRow(rowIndex);

                    foreach (DataColumn column in SourceTable.Columns)
                    {

                        dataRow.CreateCell(column.Ordinal).SetCellValue(row[column].ToString());

                    }

                    rowIndex++;

                }

                workbook.Write(ms);

                ms.Flush();

                ms.Position = 0;

                sheet = null;

                headerRow = null;

                workbook = null;

                return ms;
            }

            public static void RenderDataTableToExcel(DataTable SourceTable, string FileName)
            {
                MemoryStream ms = RenderDataTableToExcel(SourceTable) as MemoryStream;

                FileStream fs = new FileStream(FileName, FileMode.Create, FileAccess.Write);

                byte[] data = ms.ToArray();

                fs.Write(data, 0, data.Length);

                fs.Flush(); fs.Close();

                data = null; ms = null; fs = null;

            }

            public static DataTable RenderDataTableFromExcel(Stream ExcelFileStream, string SheetName, int HeaderRowIndex)
            {

                HSSFWorkbook workbook = new HSSFWorkbook(ExcelFileStream);

                HSSFSheet sheet = workbook.GetSheet(SheetName);

                DataTable table = new DataTable();

                HSSFRow headerRow = sheet.GetRow(HeaderRowIndex);

                int cellCount = headerRow.LastCellNum;

                for (int i = headerRow.FirstCellNum; i < cellCount; i++)
                {

                    DataColumn column = new DataColumn(headerRow.GetCell(i).StringCellValue);

                    table.Columns.Add(column);

                }

                int rowCount = sheet.LastRowNum;

                for (int i = (sheet.FirstRowNum + 1); i < sheet.LastRowNum; i++)
                {
                    HSSFRow row = sheet.GetRow(i);

                    DataRow dataRow = table.NewRow();

                    for (int j = row.FirstCellNum; j < cellCount; j++)

                        dataRow[j] = row.GetCell(j).ToString();
                }

                ExcelFileStream.Close(); workbook = null; sheet = null; return table;
            }

            public static DataTable RenderDataTableFromExcel(Stream ExcelFileStream, int SheetIndex, int HeaderRowIndex)
            {
                HSSFWorkbook workbook = new HSSFWorkbook(ExcelFileStream);

                HSSFSheet sheet = workbook.GetSheetAt(SheetIndex);

                DataTable table = new DataTable();

                HSSFRow headerRow = sheet.GetRow(HeaderRowIndex);

                int cellCount = headerRow.LastCellNum;

                for (int i = headerRow.FirstCellNum; i < cellCount; i++)
                {
                    DataColumn column = new DataColumn(headerRow.GetCell(i).StringCellValue);

                    table.Columns.Add(column);

                }

                int rowCount = sheet.LastRowNum;

                for (int i = (sheet.FirstRowNum ); i < sheet.LastRowNum; i++)
                {
                    HSSFRow row = sheet.GetRow(i);

                    DataRow dataRow = table.NewRow();

                    for (int j = row.FirstCellNum; j < cellCount; j++)
                    {
                        if (row.GetCell(j) != null)

                            dataRow[j] = row.GetCell(j).ToString();
                    }

                    table.Rows.Add(dataRow);
                }

                ExcelFileStream.Close();

                workbook = null;

                sheet = null;

                return table;

            }
        }

    • Test the code for RenderDataTableFromEXcel
      • FileStream fileStream = new FileStream("test.xls", FileMode.Open);
                   try
                   {
                       // read from file or write to file
                       DataTable table = DataTableRenderToExcel.RenderDataTableFromExcel(fileStream, 0, 0);

                   }
                   finally
                   {
                       fileStream.Close();
                   }

Wednesday, February 24, 2010

02/24: (ZT)浅谈.NET下的多线程和并行计算

http://www.cnblogs.com/lovecindywang/archive/2010/01/05/1639850.html

作者:lovecindywang

浅谈.NET下的多线程和并行计算(一)前言

作为一个ASP.NET开发人员,在之前的开发经历中接触多线程编程的机会并不是很多,但是随着.NET 4.0的发布临近,我越来越感受到未来的1-2年中并行计算将会有很大的应用。于是决定通过写日志的方式来总结一下.NET 3.5下的多线程编程进而引入.NET 4.0提供的新的并行库以及新的并行编程模式和编程的思维方式。

个人觉得在日常的编程中对于ASP.NET程序员来说使用多线程编程不是很多,其实我们无时无刻不在享受多线程的优势。首先,WEB服务器环境就是 一个多线程环境,每一个请求都是独立的线程,如果没有多线程很难想象只能同步处理一个请求的WEB服务器有什么用,类似,我们的数据库也应该是一个多线程 环境。对于Windows应用程序的程序员来说恐怕就很难不接触多线程了,最简单的就是我们会新开线程去做一些耗时的操作,这样就可以避免UI停止响应, 在操作结束后再把操作结果应用在主线程的控件上。虽然说这样的应用是多线程,甚至很多程序员习惯什么操作都新开一个线程去做,但是我觉得这样的多线程应用 的思维还停留在单核时代,在多核时代,我们确实可以让任务实际的并行执行而不是看上去并行执行。

首先来说说概念,进程和线程的基本的概念不用多说,自然我们也能理解一个进程至少包含一个线程。通过在一个进程中开启多个线程,我们就可以让一个程 序在同一时间看上去能做多个事情,比如可以在接受用户响应的时候进行一些计算。在以前处理器往往只有一个核心,也就是说在同一时间,处理器只能做一件事 情。那么怎么实现之前说的多个线程同时执行呢。其实这个同时只是表面上看上去同时,本质上多个线程依次占用处理器的若干时间片,大家轮流使用其资源,由于 这个时间片非常短,所以在一个长的时间看来似乎是几个线程同时得到了执行。

举一个生动的例子,我们经常看到有一些画家能同时在一个画布上画两个不同的图片,一个画人一个画房子,最后一起完成这个画。但仔细看的话发现,他是 两手拿了两只画笔,在这里画一笔那里画一笔,在同一时间其实也只有一只笔在画。这个画家应该也像普通人一样是单核的,只是线程切换比较快罢了。我经常在打 电话的时候和网友进行聊天,在同一时间做两件事情,但是这样很费脑子,在打字前我要回忆一下刚才聊天的内容,然后输入聊天的文字,然后再去回想一下刚才那 哥们说了啥,在电话里面回他一句,这种回忆的工作就是准备线程的上下文,交给脑子去处理。虽然同一时间是做了两件事情,但是这个上下文的准备工作也浪费了 点时间,如果我在打电话网络聊天的同事在去做第三件事情比如看电影,那我估计就不行了。所以,线程也不能开的很多,特别对于人脑来说。但是对于电脑处理器 来说就不一样了,你只要准备好数据和指令他执行就是了,至于这些事情来自几件事情它不关心,24小时一秒都不浪费在执行指令完全没问题,当然你也可以让它 闲着。

您可能会想了,既然线程切换需要时间,那么我们开两个线程执行两个任务不是还没有一个一个执行来的快吗?其实即使对于单核的处理器都不一定,因为在 实际的应用中我们的任务往往不可能从头计算到尾一直占用处理器资源,在很多时候我们要等待IO响应或用户的响应,如果只是一个线程做事情的话处理器太闲 了。对于现在多核的处理器来说,在同一时刻理论上可以在每一个处理器上都并行执行指令,我们就更需要利用多线程来提高运算速度了。当然也不是说一个任务要 执行10秒,我们在双核的机器上并行执行这个任务只需要5秒了,那是因为很多时候这个任务很难划分成两个分支来并行执行,如果每个指令都要依靠上个指令的 执行结果,那么这样的操作很难在多个处理器上并行执行。但是,我们可以这样想,至少如果有两个这样任务的话,我们就可以完全利用多个处理器的优势来并行执 行了。

但是也不是多可以随便的开线程,每一个线程默认情况下都会占用1M的栈空间(对于普通应用程序来说),在32位Windows平台下可以给一个用户 进程使用的程序最大在2G,那么也就是说在程序中使用的线程不能超过2000个,在实际测试中可以发现一般来说开1930左右个线程就会收到内存不足的异 常,其实这个数量是绝对够用的,即使复杂的Outlook2007程序一般也只用了50个不到的线程(可以在任务管理器中观察到)。

在不得已的情况下很多人都不太会去使用多线程也是有原因的,一是因为多线程编程复杂,我们也习惯了一行一行代码执行的编程模式,对于一个好的多线程 程序来说,要尽量分割任务让它在多个线程中使用以利用到多个处理器核。还有就是多个线程使用相同资源的话还要考虑资源的锁定以免产生数据的不一致。锁定/ 事务/并发的概念在数据库中也是非常常见的。二是因为调试困难,特别是一个线程的执行依赖其它线程的执行。三是因为多线程的程序随着环境的变化(处理器 /操作系统)可能执行的性能还不一定相同,如果只针对某个环境进行编程可能还不能充分利用多处理器的优势。比如我们对一个任务划分成2个线程并行执行,那 么对于四核的处理器来说,划分成4个线程并行执行会不会更合理呢,说实话我也举的这事挺难说的?还有,我们的编程基于.NET框架,而其本质还是使用的是 操作系统的线程,操作系统中本来就有很多进程运行着,处理器是大家的处理器,不是专供我们程序使用的,在这么一个鱼龙混杂的环境,我们的程序究竟是不是会 表现的如我们预期那样,也很难说。

多线程好,多线程难,本系列文章也只能在一个比较浅显的层次来谈谈如何在.NET框架中进行多线程编程,以及一些常见应用(比如Windows应用)中多线程的典型应用。本系列文章预计会有30篇这样的规模,希望对大家有帮助。

浅谈.NET下的多线程和并行计算(二)线程基本知识

首先来看看如何创建线程:

Console.WriteLine(Process.GetCurrentProcess().Threads.Count);

Thread t1 = new Thread(() =>

{

Thread.Sleep(1000);

Thread t = Thread.CurrentThread;

Console.WriteLine("Name: " + t.Name);

Console.WriteLine("ManagedThreadId: " + t.ManagedThreadId);

Console.WriteLine("State: " + t.ThreadState);

Console.WriteLine("Priority: " + t.Priority);

Console.WriteLine("IsBackground: " + t.IsBackground);

Console.WriteLine("IsThreadPoolThread: " + t.IsThreadPoolThread);

})

{

Name = "Thread1",

Priority = ThreadPriority.Highest

};

t1.Start();

Console.WriteLine(Process.GetCurrentProcess().Threads.Count);

我们在Thread的构造方法中传入一个Lambda表达式,对应ThreadStart委托(无参void返回值的方法)来构造一个线程任务。这段程序中有几个注意点:

1)从输出结果中可以看到,当前程序启动后就3三个线程,新开线程后显示为4个线程,在线程方法中休眠了一秒,防止主线程执行完次线程就过早结束了。

2)我们可以为线程设置一个名字,方便调试。我们也可以设置线程的优先级,这个在之后会有进一步介绍。

3)第7行,托管线程的唯一标识符,微软建议使用托管线程的Id而不是操作系统中线程的Id来跟踪线程。

4)第10行代码输出了当前线程不是后台线程,也就是是前台线程,这是默认值。进程会等待前台线程结束结束,而如果是后台线程的话,所有前台线程结 束后后台线程自动终止。对于Windows GUI应用程序来说,使用后台线程很可能发生诡异现象,也就是在程序从任务管理器的应用程序一栏中消失后其进程还在,只能通过手动终止进程来释放内存。

5)第11行代码表明这个线程不是由线程池创建的,有关线程池见后文的介绍。

clip_image002

那么我们再来看看如何为线程传入参数,一种方式是使用匹配ParameterizedThreadStart委托(object参数void返回值)的方法:

new Thread((date) => Console.WriteLine(((DateTime)date).ToString())).Start(DateTime.Now);

由于参数是object类型的,我们在使用的时候不得不进行转换,而且还有一个问题就是不支持多个参数,如果要多个参数的话只能使用自定义的对象进行包装,我们还可以使用另外一种方法,那就是使用一个无参方法来包装线程方法主体:

new Thread(() => Add(1, 2)).Start();

static void Add(int i, int j)

{

Console.WriteLine(i + j);

}

上述几行代码的运行结果如下:

clip_image004

再来看一下后台线程前台线程:

new Thread(() => Console.ReadLine()) { IsBackground = false }.Start();

这是默认情况,可以看到控制台一直在等待用户的输入,按回车后程序结束,如果把IsBackground属性设置为true的话,可以看到程序在运行后马上接结束了,并没有等待线程方法的结束。

之前说过线程的优先级属性,我们做一个实验:

bool b = true;

new Thread(() =>

{

while (b)

{

i++;

}

}) { Priority = ThreadPriority.Highest }.Start();

new Thread(() =>

{

while (b)

{

j++;

}

}) { Priority = ThreadPriority.Lowest }.Start();

Thread.Sleep(1000);

b = false;

Console.WriteLine("i: {0}, j: {1}", i, j);

开启两个线程做的事情很简单,累加一个静态变量的值,一个优先级最高,一个优先级最低,然后让主线程等待1秒输出结果:

clip_image006

从结果中可以看到,优先级高的线程得到运行的次数比优先级低的线程多那么一点,但即使是最低优先级的线程都有很大的机会来执行。

现在再来看看线程的中断:

Thread t2 = new Thread(() =>

{

try

{

while (true)

{

Console.WriteLine(Thread.CurrentThread.ThreadState);

Thread.Sleep(1000);

}

}

catch (ThreadAbortException abortException)

{

Console.WriteLine("catch");

Console.WriteLine(Thread.CurrentThread.ThreadState);

Console.WriteLine((string)abortException.ExceptionState);

}

});

t2.Start();

Thread.Sleep(2000);

t2.Abort("haha");

Thread.Sleep(100);

Console.WriteLine(t2.ThreadState);

在线程方法中,我们1秒输出一次线程的状态,然后主线程休眠2秒后中断线程,略微等待一点时间,等线程中断结束后再获取一次线程的状态。可以看到:

clip_image008

每一秒出现一次Running,2秒后由于线程中断处罚ThreadAbortException进入catch块,此时线程的状态是AbortRequested,也能接受到我们中断线程时传入的状态信息,最后线程的状态为Stopped。

现在再来看看线程的Join,用于阻塞调用线程等Join的线程完成,或传入一个时间,阻塞一定的时间:

Thread t3 = new Thread(() =>

{

for (int k = 0; k < 10; k++)

{

Thread.Sleep(100);

Console.Write("X");

}

Console.WriteLine();

});

Thread t4 = new Thread(() =>

{

for (int k = 0; k < 10; k++)

{

Thread.Sleep(100);

Console.Write("Y");

}

Console.WriteLine();

});

t3.Start();

t3.Join(TimeSpan.FromMilliseconds(500));

t4.Start();

Console.WriteLine();

这里可以看到,启动t3之后,我们让主线程阻塞500毫秒,这样的话t3应该已经输出若干X了,然后我们启动t4,随后的500毫秒,t3和t4交替输出X和Y,最后500毫秒由于t3已经结束,所以只会输出Y:

clip_image010

最后,再来看一个有趣的问题:

我们设置一个静态字段:

static int threadstaticvalue;

然后创建两个线程来循环累加这个值:

new Thread(() =>

{

for (int l = 0; l < 100000; l++)

{

threadstaticvalue++;

}

Console.WriteLine("from {0}: {1}", Thread.CurrentThread.Name, threadstaticvalue);

}) { Name = "1" }.Start();

new Thread(() =>

{

for (int m = 0; m < 200000; m++)

{

threadstaticvalue++;

}

Console.WriteLine("from {0}: {1}", Thread.CurrentThread.Name, threadstaticvalue);

}) { Name = "2" }.Start();

运行几次输出结果如下:

clip_image012clip_image014clip_image016clip_image018

虽然我们在代码中指定了两个线程分别累加值10万次和20万次,但是可以看到输出结果五花八门!这是因为两个线程都访问了共享的静态字段,可能错开访问可能正巧同步。其实,在静态字段上加上一个ThreadStatic特性就可以解决:

[ThreadStatic]

static int threadstaticvalue;

线程同步这个话题很大,我们下次接着讨论。

浅谈.NET下的多线程和并行计算(三)线程同步基础上

其 实,如果线程都是很独立的,不涉及到任何资源访问的,那么这些毫无干扰的线程不会产生什么问题。但是在实际应用中我们的线程总是涉及到资源访问的,而且往 往涉及到共享资源的访问,那么就产生了线程同步的问题。一直觉得线程同步这个名词很奇怪,字面上看同步就是使得步调一致,线程同步是不是就是让线程步调一 致的访问资源呢?事实上反了,线程同步恰巧是让线程不同时去访问资源而是去按照我们期望的顺序依次访问资源(是同步资源访问的行为而不是同步同时访问资 源)。一句话,多个线程(不仅仅局限于相同进程)如果需要访问相同的可变资源的话就可能需要考虑到线程同步的手段。还有两个常见的名词是线程安全和线程冲 突,所谓线程冲突就是由于多线程访问共享资源带来的问题,某个操作是线程安全就是表明这个操作没有线程冲突问题,要达到线程安全就要用线程同步的手段来解 决。在MSDN类库中可以看到方法都注明了是不是线程安全的,如果不是那么我们在多线程程序总使用这个方法的话就要考虑是否要线程同步了。

既然要让线程的步调一致,那么我们首先可以想到的是,如果一个线程没有完成我们就等,一直等到它完成:

Stopwatch sw = Stopwatch.StartNew();


Thread t1 = new Thread(() =>


{


Thread.Sleep(1000);


result = 100;


});


t1.Start();


Thread.Sleep(500);


while (t1.IsAlive) ;


Console.WriteLine(sw.ElapsedMilliseconds);


Console.WriteLine(result);


假设线程在完成后会把结果写入result这么一个静态的变量,主线程在启动了新线程之后只花了500毫秒就做好了自己的事情,接下去一定要等待线 程计算完成之后才能进行后续的操作,这个时候我们通过不断询问线程是不是还存在来得知线程是不是完成了计算,500毫秒后返回结果:



clip_image020



由于我们主线程是采用循环来等待的,对于处理器来说可不是等待,它足足运算了500毫秒,也就是浪费了500毫秒处理器资源。其实,我们可以在循环 中让线程休眠一小段时间,让出处理器时间片,然后再去轮询检查,这样虽然不能在线程一完成后就得到结果,但是却节省了处理器资源:



Stopwatch sw = Stopwatch.StartNew();


Thread t1 = new Thread(() =>


{


Thread.Sleep(1000);


result = 100;


});


t1.Start();


Thread.Sleep(500);


while (t1.IsAlive)


Thread.Sleep(500);


Console.WriteLine(sw.ElapsedMilliseconds);


Console.WriteLine(result);


结果如下:



clip_image022



除了轮询傻等还有一种方式就是使用Join来阻塞主线程一直到次线程的计算任务完成:



Stopwatch sw = Stopwatch.StartNew();


Thread t1 = new Thread(() =>


{


Thread.Sleep(1000);


result = 100;


});


t1.Start();


Thread.Sleep(500);


t1.Join();


Console.WriteLine(sw.ElapsedMilliseconds);


Console.WriteLine(result);


结果如下:



clip_image024



通过刚才两个例子我们再次回顾了Sleep和Join的用法,现在我们来看看采用锁机制进行同步的方法:



Stopwatch sw = Stopwatch.StartNew();


Thread t1 = new Thread(() =>


{


lock (locker)


{


Thread.Sleep(1000);


result = 100;


}


});


t1.Start();


Thread.Sleep(100);


lock (locker)


{


Console.WriteLine(sw.ElapsedMilliseconds);


Console.WriteLine(result);


}


我想即使是ASP.NET应用,我们都经常会需要用到lock来锁定资源。IIS本身是一个多线程环境,如果在我们的应用程序中使用了全局的缓存, 对于一个网站程序来说非常有可能在同一时间会有多个请求去修改这个缓存,在这个时候我们通常就会采用lock来锁定一个引用类型的对象。这个对象代表了锁 的范围,同一时间只有一个进程能获取到这个锁对象,也就是其中包含的代码只能由一个线程同时执行。上面的例子中我们在开启线程后让主线程等待了一小段时 间,然后对象就被锁定了,这个过程持续一秒,然后主线程在一秒后才能再次获得这个locker并输出结果:



clip_image026



在这个例子中我们的locker定义如下:



static object locker = new object();


有关lock之后会有更详细的讨论。之前我们看了等待和锁定的同步方法,不管怎么说这是一种被动的方法,是不是能让线程在完成之后主动通知其它等待的线程呢?答案是肯定的:



Stopwatch sw = Stopwatch.StartNew();


Thread t1 = new Thread(() =>


{


Thread.Sleep(1000);


result = 100;


are.Set();


});


t1.Start();


are.WaitOne();


Console.WriteLine(sw.ElapsedMilliseconds);


Console.WriteLine(result);


来看看这样一段代码,暂缺不管are是什么对象,但是看这个逻辑可以理解到,主线程在启动了新线程后就开始等待了,然后线程在一秒后完成了结果之后,调用了Set(),这句话的意思就是说我完成了,所有等我的人可以继续了,are对象定义如下:



static EventWaitHandle are = new AutoResetEvent(false);


AutoResetEvent从名字上可以看出它是一个自动Reset的事件通知方式。我举一个例子,您可能一下子就明白了,我们可以把它比作地铁 的闸机,插入了票,闸机放行一个人。AutoResetEvent的构造方法中传入的false代表,这个闸机默认是关闭的,只有调用了一次Set之后才 能通过,如果您把它改为true的话可以看到主线程根本没有等待,直接输出了结果。所谓的Reset就是闸机关闭的操作,也就是说对于 AutoResetEvent,一个人用一个票过去之后闸机就关闭了,除非还有后面的人插入了票(Set)。这个程序的运行结果如下:



clip_image028



和AutoResetEvent对应的还有一个ManualResetEvent



static EventWaitHandle mre = new ManualResetEvent(false);


它们唯一的区别就是这个闸机在插入一张票之后还是可以通行,除非有人手动去设置了一下Reset来关闭闸机,我们来看一个简单的例子:



Stopwatch sw = Stopwatch.StartNew();


Thread t1 = new Thread(() =>


{


Thread.Sleep(1000);


result = 100;


mre.Set();


});


Thread t2 = new Thread(() =>


{


mre.WaitOne();


Console.WriteLine("get result" + result + " and write to file");


});


t1.Start();


t2.Start();


mre.WaitOne();


Console.WriteLine(sw.ElapsedMilliseconds);


Console.WriteLine(result);


比如说我们开一个线程来计算,不但主线程等待这个结果来显示给用户,另外一个线程也等待这个结果来写入日志文件,这样我们就可以利用ManualResetEvent的特性,由于t2或主线程在通过闸机之后并不会导致它自动关闭,所以两者都可以通过,输出结果如下:



clip_image030



前面说过,AutoResetEvent像一个地铁闸机,一票一人,那么我们来看一个更形象的例子:



are.Set();


for (int i = 0; i < 10; i++)


{


new Thread(() =>


{


are.WaitOne();


Console.WriteLine(DateTime.Now.ToString("mm:ss"));


Thread.Sleep(1000);


are.Set();


}).Start();


}


我们先调用了AutoResetEvent的Set方法来让闸机默认是可以通行的。然后开10个线程,排队等待进入,如果能进入了,花一秒的时间来通行然后关闭闸机,程序输出结果如下:



clip_image032



一秒过一个人,就像我们预期的那样。ManualResetEvent在闸机过人之后不会自动关闭,这就相当于很多旅游景点的团队检票方式,导游把 100张团队票交给检票人员,检票人员数了人数之后一批放行,然后关闭闸机(散客的话就用AutoResetEvent咯),想一下我们怎么写程序来模拟 这种批量进入闸机的方式:



clip_image034



首先要使用一个变量来统计进入的人数,达到一定人数之后放行:



static int group = 0;


然后我们来分析一下这段程序:



for (int i = 0; i < 10; i++)


{


new Thread(() =>


{


lock (locker)


{


Thread.Sleep(1000); // 一秒来一个人


group++;


if (group == 5) // 好了,旅游团5人到齐


{


mre.Set(); // 检票员说你们进去吧


are.Set(); // 检票员打开闸机


group = 0;


}


}


mre.WaitOne(); // 大家等旅游团到齐(等检票员数人数)


Console.WriteLine(DateTime.Now.ToString("mm:ss")); // 进去了,团体进入时间


Thread.Sleep(100); // 让大家都走完


mre.Reset(); // 进去之后检票员停止放行


are.WaitOne(); // 第二波人等闸机再次打开


}).Start();


}


这个例子使用了ManualResetEvent和AutoResetEvent模拟这种团体检票的情景,不过这里有一个很有趣的问题,不知您有没 有想过,在mre.Reset()之前我们让线程休眠了100毫秒,目的是为了让阻塞的线程都得到恢复然后输出当前的时间。如果我们注释这句话的话就发现 很有可能在5个线程没有全部恢复的情况下就有某一个线程先调用了mre.Reset()导致闸机关闭,那么这个情况就很难预测了。这相当于在同步线程时出 现的线程同步问题。其实.NET内置了另外一个结构来实现这种操作:



static Semaphore s = new Semaphore(2, 2);


for (int i = 0; i < 10; i++)


{


new Thread(() =>


{


s.WaitOne();


Thread.Sleep(1000);


Console.WriteLine(DateTime.Now.ToString("mm:ss"));


s.Release();


}).Start();


}


结果如下:



clip_image036



本节我们走马观花看了很多同步结构,下节继续来讨论同步的一些基础结构。



浅谈.NET下的多线程和并行计算(四)线程同步基础下



回 顾一下上次,我们讨论了lock/AutoResetEvent/ManualResetEvent以及Semaphore。这些用于线程同步的结构叫做 同步基元。同步基元从类型上可以分为锁定/通知/联锁三种。lock显然锁定方式,而且是独占锁定,也就是在锁释放之前不能由其它线程获得。 Semaphore也是一种锁定,只不过不是独占锁,可以指定多少个线程访问代码块。AutoResetEvent和ManualResetEvent当 然就是通知方式了,前者在通行之后自动重置,后者需要手动重置。我们还看到了即使使用同步机制不一定能确保线程按照我们规划的去执行,因为从根本上来说, 操作系统的线程调度我们是没有办法预测的,除非使用阻塞或锁等待等方式,否则我们很难去预测两个无关的线程究竟哪个先得到执行(即使设置了优先级),而且 在使用这些同步机制的时候我们也要考虑到性能问题,如果多线程程序做的不好的话很可能会比单线程执行效率还低,比如我们开启了多个线程相互阻塞等待并没有 任何的并行运算,比如在一个多线程环境汇总我们锁的范围很大,导致多线程环境变为了一个单线程环境,有关性能问题以后再讨论,这次我们来看看其它的一些同 步基元。



本文的例子基于上文定义的一些基本静态对象:



static int result = 0;


static object locker = new object();


static EventWaitHandle are = new AutoResetEvent(false);


static EventWaitHandle mre = new ManualResetEvent(false);


使用lock保护共享资源不被多个线程同时修改是常见的做法,其实lock本质上基于Monitor,而使用Monitor本身可以带来更丰富的特性,比如可以设置超过某个等待时间段就不继续等待:



for (int i = 0; i < 10; i++)


{


new Thread(() =>


{


if (Monitor.TryEnter(locker, 2000))


{


Thread.Sleep(1000);


Console.WriteLine(DateTime.Now.ToString("mm:ss"));


Monitor.Exit(locker);


}


}).Start();


}


在这段代码中我们开启10个线程尝试申请locker独占锁,通过输出结果可以看出,由于我们设置了2秒超时,程序只输出了三次:



clip_image038



在第一个线程获取锁之后,一秒后释放,第二个线程获取,一秒后又释放,第三个线程最后获取到,之后的线程都超过了2秒等待时间,TryEnter返回false,线程结束。



除了TryEnter之外,Monitor还有一组有用的方法,Wait和Pulse(PulseAll)。一般情况下,我们的线程占据了独占锁之 后进行一些线程安全的资源访问,然后退出锁。使用Wait我们可以让当前线程阻塞并且暂时释放锁,一直到有其它线程通知(Pulse)阻塞的(一个或者多 个)线程锁状态已改变为止:



for (int i = 0; i < 2; i++)


{


Thread reader = new Thread(() =>


{


Console.WriteLine(string.Format("reader #{0} started", Thread.CurrentThread.ManagedThreadId));


while (true)


{


lock (locker)


{


if (data.Count == 0)


{


Console.WriteLine(string.Format("#{0} can not get result, wait", Thread.CurrentThread.ManagedThreadId));


Monitor.Wait(locker);


Console.WriteLine(string.Format("#{0} get result: {1}", Thread.CurrentThread.ManagedThreadId, data.Dequeue()));


}


}


}


});


reader.Start();


}


 


Thread writer = new Thread(() =>


{


Console.WriteLine(string.Format("writer #{0} started", Thread.CurrentThread.ManagedThreadId));


while (true)


{


lock (locker)


{


int s = DateTime.Now.Second;


Console.WriteLine(string.Format("#{0} set result: {1}", Thread.CurrentThread.ManagedThreadId, s));


data.Enqueue(s);


Console.WriteLine("notify thread");


Monitor.Pulse(locker);


}


Thread.Sleep(1000);


}


});


writer.Start();


在这里,data定义如下:


static Queue<int> data = new Queue<int>();


输出结果如下:



clip_image040



在这里,我们模拟了两个读取线程和一个写入线程,写入线程每一秒写入当前的秒到队列,读取线程不断从队列读取一个值。读取线程中判断如果队列没值的 话就让出独占锁并且阻塞当前线程,然后写入线程拿到了独占锁写入值,并且发出通知,让排队的第一个读取线程得到恢复,由于使用了Pulse()只能通知一 个线程,所以可以发现两个读取线程依次有一次机会从队列中读取值。



在本文一开始提到了,同步基元还有一种叫做联锁(互锁)的结构,可以以比较高的性能,对线程共享的变量进行原子操作,它比使用lock来的性能高而且简洁:



Stopwatch sw = Stopwatch.StartNew();


Thread t1 = new Thread(() =>


{


for (int j = 0; j < 500; j++)


{


Interlocked.Increment(ref result);


Thread.Sleep(10);


}


});


Thread t2 = new Thread(() =>


{


for (int j = 0; j < 500; j++)


{


Interlocked.Add(ref result, 9);


Thread.Sleep(10);


}


});


t1.Start();


t2.Start();


t1.Join();


t2.Join();


Console.WriteLine(sw.ElapsedMilliseconds);


Console.WriteLine(result);


运行结果如下:



clip_image042



第一个线程进行了500次累加操作,第二个线程进行了500次加法操作,使得最后result的值为10*500=5000,总共消耗的时间为5秒多一点。



上次我们介绍了AutoResetEvent和ManualResetEvent,WaitHandle提供了两个静态方法WaitAll和WaitAny可以让我们实现等待多个EventWaitHandle都完成或等待其中任意一个完成:



Stopwatch sw = Stopwatch.StartNew();


ManualResetEvent[] wh = new ManualResetEvent[10];


for (int i = 0; i < 10; i++)


{


wh[i] = new ManualResetEvent(false);


new Thread((j) =>


{


int d = ((int)j + 1) * 100;


Thread.Sleep(d);


Interlocked.Exchange(ref result, d);


wh[(int)j].Set();


}).Start(i);


}


WaitHandle.WaitAny(wh);


Console.WriteLine(sw.ElapsedMilliseconds);


Console.WriteLine(result);


程序输出如下:



clip_image044



在这里我们使用了10个ManualResetEvent关联到10个线程,这些线程的执行时间分别为100毫秒/200毫秒一直到1000毫秒, 由于主线程只等其中的一个信号量发出,所以在100毫秒后就输出了结果(但是注意到程序在1秒后完成,因为这些线程默认都是前台线程)。如果我们把 WaitAny改为WaitAll的话结果如下:



clip_image046



上文中我们用ManualResetEvent实现过发出信号让多人响应,这次我们实现的是多个信号让单人响应。



在实际应用的多线程环境中,我们通常会有很多的线程来读取缓存中的值,但是只会有1/10甚至1/10000的线程去修改缓存,对于这种情况如果我们使用lock不分读写都对缓存对象进行锁定的话,相当于多线程环境在用缓存的时候变为了但线程,做个实验:



Stopwatch sw = Stopwatch.StartNew();


ManualResetEvent[] wh = new ManualResetEvent[30];


for (int i = 1; i <= 20; i++)


{


wh[i - 1] = new ManualResetEvent(false);


new Thread((j) =>


{


lock (locker)


{


var sum = list.Count;


Thread.Sleep(100);


wh[(int)j].Set();


}


 


}).Start(i - 1);


}


for (int i = 21; i <= 30; i++)


{


wh[i - 1] = new ManualResetEvent(false);


new Thread((j) =>


{


lock (locker)


{


list.Add(1);


Thread.Sleep(100);


wh[(int)j].Set();


}


}).Start(i - 1);


}


WaitHandle.WaitAll(wh);


Console.WriteLine(sw.ElapsedMilliseconds);


Console.WriteLine(list.Count);


输出结果如下:



clip_image048



我们同时开了30个线程,其中20个读10个写,主线程等待它们全部执行完毕后输出时间,可以发现这30个线程用了3秒,写线程用了10秒独占锁可以理解,但是读线程并没有任何并发。.NET提供了现成的读写锁对象使得读操作可以并发:



Stopwatch sw = Stopwatch.StartNew();


ManualResetEvent[] wh = new ManualResetEvent[30];


for (int i = 1; i <= 20; i++)


{


wh[i - 1] = new ManualResetEvent(false);


new Thread((j) =>


{


rw.EnterReadLock();


var sum = list.Count;


Thread.Sleep(100);


wh[(int)j].Set();


rw.ExitReadLock();


 


}).Start(i - 1);


}


for (int i = 21; i <= 30; i++)


{


wh[i - 1] = new ManualResetEvent(false);


new Thread((j) =>


{


rw.EnterWriteLock();


list.Add(1);


Thread.Sleep(100);


wh[(int)j].Set();


rw.ExitWriteLock();


}).Start(i - 1);


}


WaitHandle.WaitAll(wh);


Console.WriteLine(sw.ElapsedMilliseconds);


Console.WriteLine(list.Count);


输出结果就像我们想的一样完美:



clip_image050



读线程并没有过多等待基本都并发访问。



最后,我们来介绍一下一种比较方便的实现线程同步方法的办法:



[MethodImpl(MethodImplOptions.Synchronized)]


static void M()


{


Thread.Sleep(1000);


Console.WriteLine(DateTime.Now.ToString("mm:ss"));


}


 


[MethodImpl(MethodImplOptions.Synchronized)]


void N()


{


Thread.Sleep(1000);


Console.WriteLine(DateTime.Now.ToString("mm:ss"));


}


M方法是静态方法,N方法是实例方法,我们都为它们应用了MethodImpl特性并且指示它们是同步方法(只能被一个线程同时访问)。然后我们写如下代码验证:



for (int i = 0; i < 10; i++)


{


new Thread(() =>


{


M();


}).Start();


}


程序输出结果如下:



clip_image052



可以发现,虽然有10个线程同时访问M方法,但是每次只能有一个线程执行。



再来测试一下N方法:



Program p = new Program();


for (int i = 0; i < 10; i++)


{


new Thread(() =>


{


p.N();


}).Start();


}


程序输出结果和上次一样:



clip_image054



但是我们要注意的是本质上,为M静态方法标注同步是对类进行加锁,而为N实例方法标注同步是对类的实例进行加锁,也就是说如果我们每次都用新的实例来调用N的话不能实现同步:



for (int i = 0; i < 10; i++)


{


new Thread(() =>


{


new Program().N();


}).Start();


}


结果如下:



clip_image056



通过这两篇文章我们基本介绍了线程同步的一些方法和结构,可以发现我们的手段很多,但是要随心所欲让各种线程同时执行然后汇总数据然后通知其它线程 继续计算然后再汇总数据等等并不是很简单。弄的不好让多线程变成但线程弄的不好也可能让数据变脏。其实在.NET 4.0中有一些新特性简化这些行为甚至编写多线程程序都不需要知道这些同步基元,但是这些基础了解一下还是有好处的。



浅谈.NET下的多线程和并行计算(五)线程池基础上



池 (Pool)是一个很常见的提高性能的方式。比如线程池连接池等,之所以有这些池是因为线程和数据库连接的创建和关闭是一种比较昂贵的行为。对于这种昂贵 的资源我们往往会考虑在一个池容器中放置一些资源,在用的时候去拿,在不够的时候添点,在用完就归还,这样就可以避免不断的创建资源和销毁资源。



如 果您做过相关实验的话可能会觉得不以为然,似乎开1000个线程也用不了几百毫秒。我们要这么想,对于一个高并发的环境来说,每一秒假设有100个请求, 每个请求需要使用(开和关)10个线程,也就是一秒需要处理1000个线程的开和关,每个线程独立堆栈1M,可以想象在这一秒中内存分配和回收是多么夸 张,这个开销不能说不昂贵。



首先,要理解线程池线程分为两类工作线程和IO线程,可以单独设置最小线程数和最大线程数:



ThreadPool.SetMinThreads(2, 2);



ThreadPool.SetMaxThreads(4, 4);



最大线程数很好理解,就是线程池最多创建这些线程,如果最大4个线程,现在这4个线程都在运行的话,后续进来的线程只能排队等待了。那么为什么有最 小线程一说法呢?其实之所以使用线程池是不希望线程在创建后运行结束后理解回收,这样的话以后要用的时候还需要创建,我们可以让线程池至少保留几个线程, 即使没有线程在工作也保留。上述语句我们设置线程池一开始就保持2个工作线程和2个IO线程,最大不超过4个线程。



至于线程池的使用相当简单先来看一段代码:



for (int i = 0; i < totalThreads; i++)



{



ThreadPool.QueueUserWorkItem(o =>



{



Thread.Sleep(1000);



int a, b;



ThreadPool.GetAvailableThreads(out a, out b);



Console.WriteLine(string.Format("({0}/{1}) #{2} : {3}", a, b, Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString("mm:ss")));



});



}



Console.WriteLine("Main thread finished");



Console.ReadLine();



代码里面用到了一个事先定义的静态字段:



static readonly int totalThreads = 10;



代码运行结果如下:



clip_image058



每一个线程都休眠一秒然后输出当前线程池可用的工作线程和IO线程以及当前线程的托管ID和时间。我们通过这段代码可以发现线程池的几个特性:



1) 线程池中的线程都是后台线程,如果没有在主线程使用ReadLine的话,程序马上会退出。



2) 线程池一开始就占用了2个线程,一秒后占用了4个线程,工作线程将会由3-6四个线程来处理。



3) 线程池最多使用了4个工作线程和0个IO线程。



那么,我们如何知道线程池中的线程都运行结束了呢,可以想到上文用过的Monitor结构:



Stopwatch sw = Stopwatch.StartNew();



for (int i = 0; i < totalThreads; i++)



{



ThreadPool.QueueUserWorkItem(o =>



{



Thread.Sleep(1000);



int a, b;



ThreadPool.GetAvailableThreads(out a, out b);



Console.WriteLine(string.Format("({0}/{1}) #{2} : {3}", a, b, Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString("mm:ss")));



lock (locker)



{



runningThreads--;



Monitor.Pulse(locker);



}



});



}



lock (locker)



{



while (runningThreads > 0)



Monitor.Wait(locker);



}



Console.WriteLine(sw.ElapsedMilliseconds);



Console.ReadLine();



程序中用到了两个辅助字段:



static object locker = new object();



static int runningThreads = totalThreads;



程序运行结果如下:



clip_image060



我们看到,10个线程使用了3.5秒全部执行完毕。20个线程呢?



clip_image062



需要6秒。细细分析这2个图我们不难发现,新的线程不是在不够用的时候立即创建而是延迟了0.5秒左右的时间,这是因为线程池会等待一下看是不是有 线程在这段时间内可用,如果实在没有的话再创建。其实可以这么理解这6秒,前一秒只有2个线程,后4秒有4个线程执行了16个,最后1秒又只有2个线程 了,所以一共是2+4*4+2=20,6秒处理了20个线程。



ThreadPool还有一个很有用的方法可以注册一个信号量,我们发出信号后所有关联的线程才执行,否则就一直等待,还可以指定等待的时间:



首先定义信号量和存储结果的字段:



static ManualResetEvent mre = new ManualResetEvent(false);



static int result = 0;



程序如下:



Stopwatch sw = Stopwatch.StartNew();



for (int i = 0; i < totalThreads; i++)



{



ThreadPool.RegisterWaitForSingleObject(mre, (state, istimeout) =>



{



Thread.Sleep(1000);



int a, b;



ThreadPool.GetAvailableThreads(out a, out b);



Interlocked.Increment(ref result);



Console.WriteLine(string.Format("({0}/{1}) #{2} : {3}", a, b, Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString("mm:ss")));



lock (locker)



{



runningThreads--;



Monitor.Pulse(locker);



}



}, null, 500, true);



}



Thread.Sleep(1000);



result = 10;



mre.Set();



lock (locker)



{



while (runningThreads > 0)



Monitor.Wait(locker);



}



Console.WriteLine(sw.ElapsedMilliseconds);



Console.WriteLine(result);



Console.ReadLine();



程序结果如下:



clip_image064



注意到RegisterWaitForSingleObject的第一个参数就是信号量,第二个参数就是方法主体(接受两个参数分别是传给线程的一 个状态变量以及线程执行的时候是否超时),第三个参数是状态变量,第四个参数超时时间我们设置了500毫秒,由于主线程在1秒后发出信号,超时500毫 秒,所以这些线程并没等到信号的发出500毫秒之后就运行了。之所以程序的运行结果为30是因为即使500毫秒之后线程超时开始执行但是也要等1秒才累加 结果,在这个时候主线程早已把结果更新为10了,所以累加从10开始而不是0开始。最后布尔参数为true表明接受到信号后只线程执行一次。



观察到,所有线程执行完毕花了7秒的时间,除去开始的等待时间0.5秒,相比之前的例子还多了0.5秒的时间。这是为什么呢?请大家帮忙分析分析。 还有一个更奇怪的问题是,RegisterWaitForSingleObject消耗的是IO线程而不是工作线程,难道微软觉得 RegisterWaitForSingleObject常见于IO操作的应用还是不希望不浪费工作线程?



浅谈.NET下的多线程和并行计算(六)线程池基础下



这节我们按照线程池的核心思想来自定义一个简单的线程池:



1) 池中使用的线程不少于一定数量,不多于一定数量



2) 池中线程不够的时候创建,富裕的时候收回



3) 任务排队,没有可用线程时,任务等待



我们的目的只是实现这些“需求”,不去考虑性能(比如等待一段时间再去创建新的线程等策略)以及特殊的处理(异常),在实现这个需求的过程中我们也回顾了线程以及线程同步的基本概念。



首先,把任务委托和任务需要的状态数据封装一个对象:



public class WorkItem


{


public WaitCallback Action { get; set; }


public object State { get; set; }


 


public WorkItem(WaitCallback action, object state)


{


this.Action = action;


this.State = state;


}


}


然后来创建一个对象作为线程池中的一个线程:



public class SimpleThreadPoolThread


{


private object locker = new object();


private AutoResetEvent are = new AutoResetEvent(false);


private WorkItem wi;


private Thread t;


private bool b = true;


private bool isWorking;


 


public bool IsWorking


{


get


{


lock (locker)


{


return isWorking;


}


}


}


public event Action<SimpleThreadPoolThread> WorkComplete;


 


public SimpleThreadPoolThread()


{


lock (locker)


{


// 当前没有实际任务


isWorking = false;


}


t = new Thread(Work) { IsBackground = true };


t.Start();


}


 


public void SetWork(WorkItem wi)


{


this.wi = wi;


}


 


public void StartWork()


{


// 发出信号


are.Set();


}


 


public void StopWork()


{


// 空任务


wi = null;


// 停止线程循环


b = false;


// 发出信号结束线程


are.Set();


}


 


private void Work()


{


while (b)


{


// 没任务,等待信号


are.WaitOne();


if (wi != null)


{


lock (locker)


{


// 开始


isWorking = true;


}


// 执行任务


wi.Action(wi.State);


lock (locker)


{


// 结束


isWorking = false;


}


// 结束事件


WorkComplete(this);


}


}


}


代码的细节可以看注释,对这段代码的整体结构作一个说明:



1) 由于这个线程是被线程池中任务复用的,所以线程的任务处于循环中,除非线程池打算回收这个线程,否则不会退出循环结束任务



2) 使用自动信号量让线程没任务的时候等待,由线程池在外部设置任务后发出信号来执行实际的任务,执行完毕后继续等待



3) 线程公开一个完成的事件,线程池可以挂接处理方法,在任务完成后更新线程池状态



4) 线程池中的所有线程都是后台线程



下面再来实现线程池:



public class SimpleThreadPool : IDisposable


{


private object locker = new object();


private bool b = true;


private int minThreads;


private int maxThreads;


private int currentActiveThreadCount;


private List<SimpleThreadPoolThread> simpleThreadPoolThreadList = new List<SimpleThreadPoolThread>();


private Queue<WorkItem> workItemQueue = new Queue<WorkItem>();


 


public int CurrentActiveThreadCount


{


get


{


lock (locker)


{


return currentActiveThreadCount;


}


}


 


}


 


public int CurrentThreadCount


{


get


{


lock (locker)


{


return simpleThreadPoolThreadList.Count;


}


}


}


 


public int CurrentQueuedWorkCount


{


get


{


lock (locker)


{


return workItemQueue.Count;


}


}


}


 


public SimpleThreadPool()


{


minThreads = 4;


maxThreads = 25;


Init();


}


 


public SimpleThreadPool(int minThreads, int maxThreads)


{


if (minThreads > maxThreads)


throw new ArgumentException("minThreads > maxThreads", "minThreads,maxThreads");


this.minThreads = minThreads;


this.maxThreads = maxThreads;


Init();


}


 


public void QueueUserWorkItem(WorkItem wi)


{


lock (locker)


{


// 任务入列


workItemQueue.Enqueue(wi);


}


}


 


private void Init()


{


lock (locker)


{


// 一开始创建最小线程


for (int i = 0; i < minThreads; i++)


{


CreateThread();


}


currentActiveThreadCount = 0;


}


new Thread(Work) { IsBackground = true }.Start();


}


 


private SimpleThreadPoolThread CreateThread()


{


SimpleThreadPoolThread t = new SimpleThreadPoolThread();


// 挂接任务结束事件


t.WorkComplete += new Action<SimpleThreadPoolThread>(t_WorkComplete);


// 线程入列


simpleThreadPoolThreadList.Add(t);


return t;


}


 


private void Work()


{


// 线程池主循环


while (b)


{


Thread.Sleep(100);


lock (locker)


{


// 如果队列中有任务并且当前线程小于最大线程


if (workItemQueue.Count > 0 && CurrentActiveThreadCount < maxThreads)


{


WorkItem wi = workItemQueue.Dequeue();


// 寻找闲置线程


SimpleThreadPoolThread availableThread = simpleThreadPoolThreadList.FirstOrDefault(t => t.IsWorking == false);


// 无则创建


if (availableThread == null)


availableThread = CreateThread();


// 设置任务


availableThread.SetWork(wi);


// 开始任务


availableThread.StartWork();


// 增加个活动线程


currentActiveThreadCount++;


}


}


}


}


 


private void t_WorkComplete(SimpleThreadPoolThread t)


{


lock (locker)


{


// 减少个活动线程


currentActiveThreadCount--;


// 如果当前线程数有所富裕并且比最小线程多


if ((workItemQueue.Count + currentActiveThreadCount) < minThreads && CurrentThreadCount > minThreads)


{


// 停止已完成的线程


t.StopWork();


// 从线程池删除线程


simpleThreadPoolThreadList.Remove(t);


}


}


}


 


public void Dispose()


{


// 所有线程停止


foreach (var t in simpleThreadPoolThreadList)


{


t.StopWork();


}


// 线程池主循环停止


b = false;


}


}


线程池的结构如下:



1) 在构造方法中可以设置线程池最小和最大线程



2) 维护一个任务队列和一个线程池中线程的列表



3) 初始化线程池的时候就创建最小线程数量定义的线程



4) 线程池主循环每20毫秒就去处理一次,如果有任务并且线程池还可以处理任务的话,先是找闲置线程,找不到则创建一个



5) 通过设置任务委托以及发出信号量来开始任务



6) 线程池提供了三个属性来查看当前活动线程数,当前总线程数和当前队列中的任务数



7) 任务完成的回调事件中我们判断如果当前线程有富裕并且比最小线程多则回收线程



8) 线程池是IDispose对象,在Dispose()方法中停止所有线程后停止线程池主循环



写一段代码来测试线程池:



using (SimpleThreadPool t = new SimpleThreadPool(2, 4))


{


Stopwatch sw2 = Stopwatch.StartNew();


for (int i = 0; i < 10; i++)


{


t.QueueUserWorkItem(new WorkItem((index =>


{


Console.WriteLine(string.Format("#{0} : {1} / {2}", Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString("mm:ss"), index));


Console.WriteLine(string.Format("CurrentActiveThread: {0} / CurrentThread: {1} / CurrentQueuedWork: {2}", t.CurrentActiveThreadCount, t.CurrentThreadCount, t.CurrentQueuedWorkCount));


Thread.Sleep(1000);


}), i));


}


while (t.CurrentQueuedWorkCount > 0 || t.CurrentActiveThreadCount > 0)


{


Thread.Sleep(10);


}


Console.WriteLine("All work completed");


Console.WriteLine(string.Format("CurrentActiveThread: {0} / CurrentThread: {1} / CurrentQueuedWork: {2}", t.CurrentActiveThreadCount, t.CurrentThreadCount, t.CurrentQueuedWorkCount));


Console.WriteLine(sw2.ElapsedMilliseconds);


} 


代码中我们向线程池推入10个任务,每个任务需要1秒执行,任务执行前输出当前任务的所属线程的Id,当前时间以及状态值。然后再输出线程池的几个状态属性。主线程循环等待所有任务完成后再次输出线程池状态属性以及所有任务完成耗费的时间:



clip_image066



我们可以看到:



1) 线程池中的线程总数从2到4到2



2) 线程池中活动的线程数从2到4到0



3) 线程池中排队的任务数从9到0



4) 所有线程完成一共使用了3秒时间



相比.NET内置的线程池,性能虽然有0.5秒的提高(可以见前文,.NET线程池在创建新的线程之前会等待0.5秒左右的时间),但其实一个好的 线程池的实现需要考虑很多策略(什么时候去创建新线程,什么时候去回收老线程),.NET的ThreadPool在整体性能上做的很好,所以不建议随便去 使用自定义的线程池。本例更只能作为实验和演示。



浅谈.NET下的多线程和并行计算(七)基于多线程的基本组件



在多线程应用中我们有一些很常见的需求,比如定时去做计划任务,或者是在执行一个长时间的任务,在执行这个任务的过程中能有进度显示(能想到要实现这个需求需要新开一个线程,避免阻塞UI的更新)。对于这些应用.NET提供了现成的组件。



首先来看一下System.Threading的Timer组件,它提供了定时执行某个任务的方法:



ThreadPool.SetMinThreads(2, 2);


ThreadPool.SetMaxThreads(4, 4);


 


Timer timer = new Timer((state) =>


{


int a, b;


ThreadPool.GetAvailableThreads(out a, out b);


Console.WriteLine(string.Format("({0}/{1}) #{2} : {3}", a, b, Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString("mm:ss")));


}, null, 2000, 1000);


 


Console.WriteLine(DateTime.Now.ToString("mm:ss"));


Thread.Sleep(5000);


Console.WriteLine("Change()");


timer.Change(3000, 500);


Thread.Sleep(5000);


Console.WriteLine("Dispose()");


timer.Dispose();


Thread.Sleep(5000);


Console.WriteLine(DateTime.Now.ToString("mm:ss"));


这段代码的运行结果如下:



clip_image068



我们可以看到:



1) Timer构造方法中第一个参数就是要定时执行的方法,这个方法接受一个状态参数,第二个参数是状态参数,第三个参数是首次调用回调方法前的延迟毫秒,第四个参数就是执行方法的间隔毫秒



2) 从结果中我们可以看到,第一次回调方法在2秒后执行,然后每一秒执行一次,之后我们调用了Change()方法把延迟时间设置为3秒,把间隔设置为500毫秒,看到Timer在完成了上次回调之后3秒后执行了新的回调,之后间隔500毫秒执行一次。



3) 最后,我们执行了Dispose()方法,在结束最后一次回调之后Timer就再也没有调用回调方法。



4) 在回调方法中我们输出了线程池的可用线程,可以看到Timer基于线程池,也就是Timer基于后台线程。



.NET中还提供了System.Timers.Timer,它封装并增强了System.Threading.Timer:



System.Timers.Timer timer2 = new System.Timers.Timer();


timer2.Elapsed += new System.Timers.ElapsedEventHandler(timer2_Elapsed);


timer2.Interval = 1000;


Console.WriteLine("Start()");


timer2.Start();


Console.WriteLine(DateTime.Now.ToString("mm:ss"));


Thread.Sleep(5000);


Console.WriteLine("Stop()");


timer2.Stop();


Thread.Sleep(5000);


Console.WriteLine("Change Interval and Start()");


timer2.Interval = 500;


timer2.Start();


Thread.Sleep(5000);


Console.WriteLine("Dispose()");


timer2.Dispose();


Thread.Sleep(5000);


Console.WriteLine(DateTime.Now.ToString("mm:ss"));


 


static void timer2_Elapsed(object sender, System.Timers.ElapsedEventArgs e)


{


int a, b;


ThreadPool.GetAvailableThreads(out a, out b);


Console.WriteLine(string.Format("({0}/{1}) #{2} : {3}", a, b, Thread.CurrentThread.ManagedThreadId, e.SignalTime.ToString("mm:ss")));


}


(假设我们还是设置线程池最小2个线程最大4个线程)



这段代码的结果如下:



clip_image070



从运行结果中我们可以看到:



1) 由于System.Timers.Timer封装了System.Threading.Timer,所以还是基于线程池



2) 默认Timer是停止的,启动后需要等待一个Interval再执行回调方法



最后,再来看看BackgroundWorker,它提供了对于前面说的执行一个任务,在UI上更新进度这种应用的封装,首先定义一个static的BackgroundWorker:



static BackgroundWorker bw = new BackgroundWorker();


然后写如下测试代码:



bw.WorkerReportsProgress = true;


bw.WorkerSupportsCancellation = true;


bw.ProgressChanged += new ProgressChangedEventHandler(bw_ProgressChanged);


bw.RunWorkerCompleted += new RunWorkerCompletedEventHandler(bw_RunWorkerCompleted);


bw.DoWork += new DoWorkEventHandler(bw_DoWork);


bw.Disposed += new EventHandler(bw_Disposed);


AutoResetEvent are = new AutoResetEvent(false);


Console.WriteLine(DateTime.Now.ToString("mm:ss"));


bw.RunWorkerAsync(are);


Thread.Sleep(2000);


are.Set();


Thread.Sleep(2000);


Console.WriteLine("CancelAsync()");


bw.CancelAsync();


while (bw.IsBusy)


Thread.Sleep(10);


bw.Dispose();


这段代码中我们:



1) 设置BackgroundWorker可以汇报进度(通过ProgressChanged事件)



2) 设置BackgroundWorker支持任务取消



3) 定义了进度更新的处理事件:



static void bw_ProgressChanged(object sender, ProgressChangedEventArgs e)


{


Console.WriteLine(string.Format("{0} : {1}% completed", DateTime.Now.ToString("mm:ss"), e.ProgressPercentage));


}


4) 定义了任务完成的处理事件:



static void bw_RunWorkerCompleted(object sender, RunWorkerCompletedEventArgs e)


{


if (e.Cancelled)


Console.WriteLine("Cancelled");


else if (e.Error != null)


Console.WriteLine(e.Error.ToString());


else


Console.WriteLine(e.Result);


Console.WriteLine(DateTime.Now.ToString("mm:ss"));


}


5) 定义了任务的主体方法:



static void bw_DoWork(object sender, DoWorkEventArgs e)


{


(e.Argument as AutoResetEvent).WaitOne();


 


for (int i = 0; i <= 100; i += 10)


{


//if (bw.CancellationPending)


//{


//Console.WriteLine("Cancelling...");


//for (int j = 0; j <= 100; j += 20)


//{


//bw.ReportProgress(j);


//Thread.Sleep(500);


//}


//e.Cancel = true;


//return;


//}


bw.ReportProgress(i);


Thread.Sleep(500);


}


e.Result = 100;


}


6) 定义了Dispose BackgroundWorker后的事件:



static void bw_Disposed(object sender, EventArgs e)


{


Console.WriteLine("disposed");


}


7) 使用信号量作为事件的状态参数让任务延迟2秒执行



8) 主线程通过IsBusy判断任务是否在执行,轮询等待



9) 最后Dispose这个组件



程序执行结果如下:



clip_image072



可以看到:



1) 任务延迟2秒执行,任务分为10个阶段执行,每执行一个阶段汇报一下进度。每个阶段需要500毫秒



2) 任务执行完成之后可以设置Result属性的值,这个值在bw_RunWorkerCompleted中可以获取到



我们还原注释的那些代码来看看取消任务的情况:



clip_image074



我们看到任务在2秒后取消,要注意这种异步取消任务的方式,我们调用了CancelAsync()其实不能从实质上取消任务的执行,要真正取消任务 需要在任务的主体方法中不断检测CancellationPending属性,如果为true表示用户希望取消任务,然后去执行一些取消任务的行为,在完 成后设置Cancel属性为true,并结束任务主体,这么做的目的是因为对于长时间的任务取消回滚的过程可能也是长时间的,我们同样可以在主体方法中对 取消的行为进行进度汇报。您可以自己做相关实验,可以发现在控制台程序中,BackgroundWorker的DoWork可 ProgressReport基于两个独立的线程,这两个线程都是基于线程池的,在Winform中 BackgroundWorker的DoWork基于线程池中的独立线程而ProgressReport执行于UI线程。



浅谈.NET下的多线程和并行计算(八)Winform中多线程编程基础上



首先我们创建一个Winform的应用程序,在上面添加一个多行文本框和一个按钮控件,按钮的事件如下:



Thread.Sleep(1000);


StringBuilder sb = new StringBuilder();


for (int i = 0; i < 10000; i++)


sb.Append("test");


string s = sb.ToString();


textBox1.Text = s;


首先我们可以把这个操作理解为一个非常耗时的操作,它至少占用1秒的时间。在1秒后,我们整了一个大字符串作为文本框的值,然后在标签上显示给文本框赋值这个UI渲染行为需要的时间,程序执行结果如下:



clip_image076



我们可以感受到,在点击了按钮之后整个程序的UI就卡住了,没有办法拖动没有办法改变大小,用于体验非常差。一般能想到会新建一个线程来包装这个方法,使得UI线程不被卡住:



new Thread(() =>


{


Thread.Sleep(1000);


StringBuilder sb = new StringBuilder();


for (int i = 0; i < 10000; i++)


sb.Append("test");


string s = sb.ToString();


textBox1.Text = s;


}).Start();


使用调试方式运行程序的话会得到如下的异常(非调试方式不会):



clip_image078



虽然我们知道这样设置:



Control.CheckForIllegalCrossThreadCalls = false;


可以屏蔽这个错误,但是在非创建控件的线程去更新控件的状态的做法会导致很多问题,比如死锁和控件部分被更新等。微软推荐我们使用Control的Invoke或BeginInvoke方法来把涉及到控件状态更新的操作让UI线程去做:



new Thread(() =>


{


Invoke(new Action(() =>


{


Thread.Sleep(1000);


StringBuilder sb = new StringBuilder();


for (int i = 0; i < 10000; i++)


sb.Append("test");


string s = sb.ToString();


textBox1.Text = s;


}));


}).Start();


你可能会想到这么写,但是运行程序后可以发现界面依然是卡死。想一下,虽然我们新开了一个线程,但是马上又把整个代码段交给UI线程去做了,当然起 不到效果。其实这个方法的工作可以分为两部分,一部分是我们数据的计算,一部分是把计算好的数据显示在界面上,我们只应该把真正和UI相关的操作放到 Invoke中让UI线程去做:



new Thread(() =>


{


Thread.Sleep(1000);


StringBuilder sb = new StringBuilder();


for (int i = 0; i < 10000; i++)


sb.Append("test");


string s = sb.ToString();


Invoke(new Action(() =>


{


textBox1.Text = s;


}));


}).Start();


再测试一次可以发现,UI在前1秒多的时间没有卡死,在最后的一点时间还是卡死了。在继续研究卡死问题之前我们来看一下,Control提供了InvokeRequired属性来让我们判断当前线程是不是UI线程,或者说当前的操作是否需要进行Invoke:



textBox1.Text = this.InvokeRequired.ToString();


new Thread(() =>


{


textBox1.Text += Environment.NewLine + this.InvokeRequired.ToString();


Invoke(new Action(() =>


{


textBox1.Text += Environment.NewLine + this.InvokeRequired.ToString();


}));


}).Start(); 


通过非调试方式启动程序可以得到如下结果:



clip_image080



很明显:



1) 在线程外的赋值不需要Invoke(在UI线程)



2) 在线程内的赋值需要Invoke(不在UI线程)



3) 在Invoke中的赋值已经封送给UI线程,所以不需要Invoke



继续研究卡死问题,您可能会想到,Control还提供了一个BeginInvoke方法,我们来试试看:



new Thread(() =>


{


Thread.Sleep(1000);


StringBuilder sb = new StringBuilder();


for (int i = 0; i < 10000; i++)


sb.Append("test");


string s = sb.ToString();


BeginInvoke(new Action(() =>


{


textBox1.Text = s;


}));


}).Start();


好像效果上还是没什么区别,那么Invoke和BeginInvoke的区别在哪里呢?



我们知道Windows应用程序基于消息,Windows API提供了SendMessage和PostMessage两个API,前者执行消息后返回(不经过消息管道,先于PostMessage执行),后者 把消息发送到管道异步执行。Invoke和BeginInvoke从行为上来说类似这两个API,但是实际上两者都使用了PostMessage,前者使 用信号量在消息执行前阻塞,达到同步的效果。我们来做一个实验:



new Thread(() =>


{


Thread.Sleep(1000);


StringBuilder sb = new StringBuilder();


for (int i = 0; i < 10000; i++)


sb.Append("test");


string s = sb.ToString();


Stopwatch sw = Stopwatch.StartNew();


Invoke(new Action(() =>


{


textBox1.Text = s;


}));


MessageBox.Show(sw.ElapsedMilliseconds.ToString());


}).Start();


运行程序:



clip_image082



可以体会到,在文本框的值出现之后才出现弹出框,文本框赋值这个消息的执行过程耗时2秒。把Invoke改为BeginInvoke其它不动再执行程序:



clip_image084



明显感到弹出框先显示2秒后文本框的值出现。BeginInvoke没有阻塞后续语句的执行。因此,需要注意,如果我们在方法中使用的变量在 BeginInvoke之后有修改,极有可能发生混乱。如果您使用过委托的BeginInvoke应该会知道,通常建议总是调用EndInvoke来回收 资源,对于Control的EndInvoke来说,如果您不需要获取返回值的话,那么它不是必须的(来自msdn)。



现在您可能还有疑问为什么使用了BeginInvoke,UI还是卡了大概2秒,可以这么理解,我们把这么多的文字赋值到文本框中,这个UI行为是 非常耗时的,不管是Invoke还是BeginInvoke最终是发送消息给UI线程处理(两者都没有使用线程池),它就是需要这么多时间,在一般情况下 我们不会在UI上呈现这么多数据。



一般来说我们能做的优化是:



1) 尽量把非UI的操作使用新的线程去异步计算,不阻塞UI线程,真正需要操作UI的时候才去提交给UI线程



2) 尽量减少UI的操作复杂度,比如如果需要在UI上绘制一个复杂图形可以在内存中先创建一个位图,绘制好之后把整个位图在UI上绘制,而不是直接在UI上绘制这个图形



举个例子,UI就好象一块画布,我们要在上面画一个巨作怎么才能不过多占用这块布的时间,让大家都能用上呢?一个方法就是我们在准备颜色和画笔的时 候不占着这个布,真正要去画的时候才去用,另外一个方法就是在另一块画布上先画,然后把图案采用复印的方式印到我们的主画布上。



对于大量数据的呈现,我们还可以:



1) 采用分页,只显示一部分数据,对于Windows程序的分页可能就是滚动条性质的了,在滚动条下拉的时候再去呈现当前“页”的数据



2) 即使是一页的数据,也可以一部分一部分呈现



举个例子,对于word文档的加载一般我们一打开就可以看到第一页,然后滚动块慢慢变小,页数慢慢增多,如果一开始就加载1000页的话我们可能要1分钟后才能看到第一页,如果等不及直接向后翻滚动条的话会立即加载后面的数据:



new Thread(() =>


{


StringBuilder sb = new StringBuilder();


for (int i = 0; i < 100; i++)


sb.Append("test");


string s = sb.ToString();


for (int i = 0; i < 20; i++)


{


BeginInvoke(new Action(() =>


{


textBox1.Text += s + i;


}));


Thread.Sleep(10);


}


}).Start();


设置文本框允许纵向滚动条并且运行一下这段程序可以体会到这个效果:



clip_image086



浅谈.NET下的多线程和并行计算(八)Winform中多线程编程基础上



首先我们创建一个Winform的应用程序,在上面添加一个多行文本框和一个按钮控件,按钮的事件如下:



Thread.Sleep(1000);


StringBuilder sb = new StringBuilder();


for (int i = 0; i < 10000; i++)


sb.Append("test");


string s = sb.ToString();


textBox1.Text = s;


首先我们可以把这个操作理解为一个非常耗时的操作,它至少占用1秒的时间。在1秒后,我们整了一个大字符串作为文本框的值,然后在标签上显示给文本框赋值这个UI渲染行为需要的时间,程序执行结果如下:



clip_image076[1]



我们可以感受到,在点击了按钮之后整个程序的UI就卡住了,没有办法拖动没有办法改变大小,用于体验非常差。一般能想到会新建一个线程来包装这个方法,使得UI线程不被卡住:



new Thread(() =>


{


Thread.Sleep(1000);


StringBuilder sb = new StringBuilder();


for (int i = 0; i < 10000; i++)


sb.Append("test");


string s = sb.ToString();


textBox1.Text = s;


}).Start();


使用调试方式运行程序的话会得到如下的异常(非调试方式不会):



clip_image078[1]



虽然我们知道这样设置:



Control.CheckForIllegalCrossThreadCalls = false;


可以屏蔽这个错误,但是在非创建控件的线程去更新控件的状态的做法会导致很多问题,比如死锁和控件部分被更新等。微软推荐我们使用Control的Invoke或BeginInvoke方法来把涉及到控件状态更新的操作让UI线程去做:



new Thread(() =>


{


Invoke(new Action(() =>


{


Thread.Sleep(1000);


StringBuilder sb = new StringBuilder();


for (int i = 0; i < 10000; i++)


sb.Append("test");


string s = sb.ToString();


textBox1.Text = s;


}));


}).Start();


你可能会想到这么写,但是运行程序后可以发现界面依然是卡死。想一下,虽然我们新开了一个线程,但是马上又把整个代码段交给UI线程去做了,当然起 不到效果。其实这个方法的工作可以分为两部分,一部分是我们数据的计算,一部分是把计算好的数据显示在界面上,我们只应该把真正和UI相关的操作放到 Invoke中让UI线程去做:



new Thread(() =>


{


Thread.Sleep(1000);


StringBuilder sb = new StringBuilder();


for (int i = 0; i < 10000; i++)


sb.Append("test");


string s = sb.ToString();


Invoke(new Action(() =>


{


textBox1.Text = s;


}));


}).Start();


再测试一次可以发现,UI在前1秒多的时间没有卡死,在最后的一点时间还是卡死了。在继续研究卡死问题之前我们来看一下,Control提供了InvokeRequired属性来让我们判断当前线程是不是UI线程,或者说当前的操作是否需要进行Invoke:



textBox1.Text = this.InvokeRequired.ToString();


new Thread(() =>


{


textBox1.Text += Environment.NewLine + this.InvokeRequired.ToString();


Invoke(new Action(() =>


{


textBox1.Text += Environment.NewLine + this.InvokeRequired.ToString();


}));


}).Start(); 


通过非调试方式启动程序可以得到如下结果:



clip_image080[1]



很明显:



1) 在线程外的赋值不需要Invoke(在UI线程)



2) 在线程内的赋值需要Invoke(不在UI线程)



3) 在Invoke中的赋值已经封送给UI线程,所以不需要Invoke



继续研究卡死问题,您可能会想到,Control还提供了一个BeginInvoke方法,我们来试试看:



new Thread(() =>


{


Thread.Sleep(1000);


StringBuilder sb = new StringBuilder();


for (int i = 0; i < 10000; i++)


sb.Append("test");


string s = sb.ToString();


BeginInvoke(new Action(() =>


{


textBox1.Text = s;


}));


}).Start();


好像效果上还是没什么区别,那么Invoke和BeginInvoke的区别在哪里呢?



我们知道Windows应用程序基于消息,Windows API提供了SendMessage和PostMessage两个API,前者执行消息后返回(不经过消息管道,先于PostMessage执行),后者 把消息发送到管道异步执行。Invoke和BeginInvoke从行为上来说类似这两个API,但是实际上两者都使用了PostMessage,前者使 用信号量在消息执行前阻塞,达到同步的效果。我们来做一个实验:



new Thread(() =>


{


Thread.Sleep(1000);


StringBuilder sb = new StringBuilder();


for (int i = 0; i < 10000; i++)


sb.Append("test");


string s = sb.ToString();


Stopwatch sw = Stopwatch.StartNew();


Invoke(new Action(() =>


{


textBox1.Text = s;


}));


MessageBox.Show(sw.ElapsedMilliseconds.ToString());


}).Start();


运行程序:



clip_image082[1]



可以体会到,在文本框的值出现之后才出现弹出框,文本框赋值这个消息的执行过程耗时2秒。把Invoke改为BeginInvoke其它不动再执行程序:



clip_image084[1]



明显感到弹出框先显示2秒后文本框的值出现。BeginInvoke没有阻塞后续语句的执行。因此,需要注意,如果我们在方法中使用的变量在 BeginInvoke之后有修改,极有可能发生混乱。如果您使用过委托的BeginInvoke应该会知道,通常建议总是调用EndInvoke来回收 资源,对于Control的EndInvoke来说,如果您不需要获取返回值的话,那么它不是必须的(来自msdn)。



现在您可能还有疑问为什么使用了BeginInvoke,UI还是卡了大概2秒,可以这么理解,我们把这么多的文字赋值到文本框中,这个UI行为是 非常耗时的,不管是Invoke还是BeginInvoke最终是发送消息给UI线程处理(两者都没有使用线程池),它就是需要这么多时间,在一般情况下 我们不会在UI上呈现这么多数据。



一般来说我们能做的优化是:



1) 尽量把非UI的操作使用新的线程去异步计算,不阻塞UI线程,真正需要操作UI的时候才去提交给UI线程



2) 尽量减少UI的操作复杂度,比如如果需要在UI上绘制一个复杂图形可以在内存中先创建一个位图,绘制好之后把整个位图在UI上绘制,而不是直接在UI上绘制这个图形



举个例子,UI就好象一块画布,我们要在上面画一个巨作怎么才能不过多占用这块布的时间,让大家都能用上呢?一个方法就是我们在准备颜色和画笔的时 候不占着这个布,真正要去画的时候才去用,另外一个方法就是在另一块画布上先画,然后把图案采用复印的方式印到我们的主画布上。



对于大量数据的呈现,我们还可以:



1) 采用分页,只显示一部分数据,对于Windows程序的分页可能就是滚动条性质的了,在滚动条下拉的时候再去呈现当前“页”的数据



2) 即使是一页的数据,也可以一部分一部分呈现



举个例子,对于word文档的加载一般我们一打开就可以看到第一页,然后滚动块慢慢变小,页数慢慢增多,如果一开始就加载1000页的话我们可能要1分钟后才能看到第一页,如果等不及直接向后翻滚动条的话会立即加载后面的数据:



new Thread(() =>


{


StringBuilder sb = new StringBuilder();


for (int i = 0; i < 100; i++)


sb.Append("test");


string s = sb.ToString();


for (int i = 0; i < 20; i++)


{


BeginInvoke(new Action(() =>


{


textBox1.Text += s + i;


}));


Thread.Sleep(10);


}


}).Start();


设置文本框允许纵向滚动条并且运行一下这段程序可以体会到这个效果:



clip_image086[1]



浅谈.NET下的多线程和并行计算(十).NET异步编程模型基础上



谈 多线程谈到现在,我们要明确多线程的一个好处是可以进行并行的运算(充分利用多核处理器,对于桌面应用程序来说就更重要一点了,没有WEB服务器,利用多 核只能靠自己),还有一个好处就是异步操作,就是我们可以让某个长时间的操作独立运行,不妨碍主线程继续进行一些计算,然后异步的去返回结果(也可以不返 回)。前者能提高性能是因为能利用到多核,而后者能提高性能是因为能让CPU不在等待中白白浪费,其实异步从广义上来说也可以理解为某种并行的运算。在之 前的这么多例子中,我们大多采用手工方式来新开线程,之前也说过了,在大并发的环境中随便开始和结束线程的代价太大,需要利用线程池,使用线程池的话又觉 得少了一些控制。现在让我们来总结一下大概会有哪几种常见的异步编程应用模式:



1) 新开一个A线程执行一个任务,然后主线程执行另一个任务后等待线程返回结果后继续



2) 新开一个A线程执行一个任务,然后主线程不断轮询A线程是否执行完毕,如果没有的话可以选择等待或是再进行一些操作



3) 新开一个A线程执行一个任务,执行完毕之后立即执行一个回调方法去更新一些状态变量,主线程和A线程不一定有直接交互



4) 新开一个A线程执行一个任务,执行完毕之后啥都不做



(补充一句,异步编程不一定是依赖于线程的,从广义上来说,使用队列异步处理数据也可以算是一种异步编程模式)



对于这任何一种,我们要使用线程池来编写应用的话都是比较麻烦的,比如如下的代码实现了1)这种应用:



class AsyncObj


{


public EventWaitHandle AsyncWaitHandle { get; set; }


public object Result { get; set; }


 


public AsyncObj()


{


AsyncWaitHandle = new AutoResetEvent(false);


}


}


AsyncObj ao = new AsyncObj();


ThreadPool.QueueUserWorkItem(state =>


{


AsyncObj obj = state as AsyncObj;


Console.WriteLine("asyc operation started @ " + DateTime.Now.ToString("mm:ss"));


Thread.Sleep(2000);


Console.WriteLine("asyc operation completed @ " + DateTime.Now.ToString("mm:ss"));


obj.Result = 100;


obj.AsyncWaitHandle.Set();


}, ao);


 


Console.WriteLine("main operation started @ " + DateTime.Now.ToString("mm:ss"));


Thread.Sleep(1000);


Console.WriteLine("main operation completed @ " + DateTime.Now.ToString("mm:ss"));


ao.AsyncWaitHandle.WaitOne();


Console.WriteLine("get syc operation result : " + ao.Result.ToString() + " @ " + DateTime.Now.ToString("mm:ss"));


结果如下:



clip_image088



对于2)-4)等情况又是另外一套了,这样我们的代码可能会变得乱七八糟,在.NET中我们的委托以及很多IO操作相关的类库都支持一种叫做异步编 程模型APM的编程模型。不仅仅方便了我们进行多线程应用,而且我们如果自己要设计类库的话也可以遵从这个模式(基于APM的接口实现我们自己的类 库)。.NET提供了基于IAsyncResult的异步编程模型和基于事件的异步编程模型,这节我们来看看基于IAsyncResult也就是 BeginInvoke和EndInvoke(对于非同用的操作来说就是BeginXXX和EndXXX)的编程模型的各种使用方法,可以说这么多种使用 方法可以满足我们大部分的要求。



首先来定义一个异步操作:



static int AsyncOperation(int x, int y)


{


Console.WriteLine("asyc operation started @ " + DateTime.Now.ToString("mm:ss"));


Thread.Sleep(2000);


int a, b;


ThreadPool.GetAvailableThreads(out a, out b);


Console.WriteLine(string.Format("({0}/{1}) #{2}", a, b, Thread.CurrentThread.ManagedThreadId));


Console.WriteLine("asyc operation completed @ " + DateTime.Now.ToString("mm:ss"));


return x + y;


}


我们需要开两个线程同时计算两个异步操作,然后主线程等待两个线程执行完毕后获取结果并且输出它们的和,难以想象代码是多么简单:



var func = new Func<int, int, int>(AsyncOperation);


var result1 = func.BeginInvoke(100, 200, null, null);


var result2 = func.BeginInvoke(300, 400, null, null);


Console.WriteLine("main operation started @ " + DateTime.Now.ToString("mm:ss"));


Thread.Sleep(1000);


Console.WriteLine("main operation completed @ " + DateTime.Now.ToString("mm:ss"));


int result = func.EndInvoke(result1) + func.EndInvoke(result2);


Console.WriteLine("get syc operation result : " + result + " @ " + DateTime.Now.ToString("mm:ss"));


主线程的计算需要1秒,两个异步线程都需要2秒,整个程序理论上需要2秒执行完毕,看看结果如何:



clip_image090



当然,在之前我们限制了线程池的线程数为2-4:



ThreadPool.SetMinThreads(2, 2);


ThreadPool.SetMaxThreads(4, 4);


从结果中可以看出,使用委托来异步调用方法基于线程池,调用EndInvoke的时候阻塞了主线程,得到结果后主线程继续。在代码中没看到 Thread没看到ThreadPool没看到信号量,我们却完成了一个异步操作,实现了一开始说的1)场景。现在再来看看第二种使用方式:



var func = new Func<string, int, string>(AsyncOperation2);


var result1 = func.BeginInvoke("hello ", 2000, null, null);


var result2 = func.BeginInvoke("world ", 3000, null, null);


Console.WriteLine("main operation started @ " + DateTime.Now.ToString("mm:ss"));


Thread.Sleep(1000);


Console.WriteLine("main operation completed @ " + DateTime.Now.ToString("mm:ss"));


WaitHandle.WaitAny(new WaitHandle[] { result1.AsyncWaitHandle, result2.AsyncWaitHandle });


string r1 = result1.IsCompleted ? func.EndInvoke(result1) : string.Empty;


string r2 = result2.IsCompleted ? func.EndInvoke(result2) : string.Empty;


if (string.IsNullOrEmpty(r1))


{


Console.WriteLine("get syc operation result : " + r2 + " @ " + DateTime.Now.ToString("mm:ss"));


func.EndInvoke(result1);


}


if (string.IsNullOrEmpty(r2))


{


Console.WriteLine("get syc operation result : " + r1 + " @ " + DateTime.Now.ToString("mm:ss"));


func.EndInvoke(result2);


}


BeginInvoke返回的是一个IAsyncResult,通过其AsyncWaitHandle 属性来获取WaitHandle。异步调用完成时会发出信号量。这样我们就可以更灵活一些了,可以在需要的时候去WaitOne()(可以设置超时时 间),也可以WaitAny()或是WaitAll(),上例我们实现的效果是开了2个线程一个3秒,一个2秒,只要有任何一个完成就获取其结果,主线程 任务完成之后再去EndInvoke没完成的那个来释放资源(比如有两个排序算法,它们哪个快取决于数据源,我们一起执行并且只要有一个得到结果就继 续)。在这里我们的工作方法AsyncOperation2的定义如下:



static string AsyncOperation2(string s, int time)


{


Console.WriteLine("asyc operation started @ " + DateTime.Now.ToString("mm:ss:fff"));


Thread.Sleep(time);


int a, b;


ThreadPool.GetAvailableThreads(out a, out b);


Console.WriteLine(string.Format("({0}/{1}) #{2}", a, b, Thread.CurrentThread.ManagedThreadId));


Console.WriteLine("asyc operation completed @ " + DateTime.Now.ToString("mm:ss:fff"));


return s.ToUpper();


}


这段程序运行结果如下:



clip_image092



可以看到,在2秒的那个线程结束后,主线程就继续了,然后再是3秒的那个线程结束。再来看看第三种,也就是使用轮询的方式来等待结果:



var func = new Func<int, int, int>(AsyncOperation);


var result = func.BeginInvoke(100, 200, null, null);


Console.WriteLine("main operation started @ " + DateTime.Now.ToString("mm:ss"));


Thread.Sleep(1000);


Console.WriteLine("main operation completed @ " + DateTime.Now.ToString("mm:ss"));


while (!result.IsCompleted)


{


Console.WriteLine("main thread wait again");


Thread.Sleep(500);


}


int r = func.EndInvoke(result);


Console.WriteLine("get syc operation result : " + r + " @ " + DateTime.Now.ToString("mm:ss"));


程序的输出结果如下,这对应我们一开始提到的第二种场景,在等待的时候我们的主线程还可以做一些(不依赖于返回结果的)计算呢:



clip_image094



再来看看第四种,采用回调的方式来获取结果,线程在结束后自动调用回调方法,我们可以在回调方法中进行EndInvoke:



var func = new Func<int, int, int>(AsyncOperation);


var result = func.BeginInvoke(100, 200, CallbackMethod, func);


Console.WriteLine("main operation started @ " + DateTime.Now.ToString("mm:ss"));


Thread.Sleep(1000);


Console.WriteLine("main operation completed @ " + DateTime.Now.ToString("mm:ss"));


Console.ReadLine();


BeginInvoke的第三个参数是回调方法,第四个参数是传给工作方法的状态变量,这里我们把工作方法的委托传给它,这样我们可以在回调方法中获取到这个委托:



static void CallbackMethod(IAsyncResult ar)


{


Console.WriteLine(string.Format("CallbackMethod runs on #{0}", Thread.CurrentThread.ManagedThreadId));


var caller = (Func<int, int, int>)ar.AsyncState;


int r = caller.EndInvoke(ar);


Console.WriteLine("get syc operation result : " + r + " @ " + DateTime.Now.ToString("mm:ss"));


}


程序的输出结果如下:



clip_image096



可以看到,主线程并没有因为工作线程而阻塞,它没有等待它的结果,异步方法结束后自动调用回调方法(运行于新线程),在回调方法中我们把状态变量进 行类型转换后得到方法委托,然后通过这个委托来调用EndInvoke获得结果。这里符合我们第3)种应用,这种情况下主线程不一定需要和异步方法进行直 接的交互(也就无需等待),当然主线程也完全可以再结合使用轮询或等待信号量等待异步线程完成后从共享变量(需要回调方法把结果写入共享变量)来获取结 果。



至于一开始说的第4)种应用需要注意,我们完全可以直接采用线程池来做,如果采用异步编程模型的话,即使不需要得到结果也别忘记调用 EndInvoke来释放资源,这是一个好习惯,因为.NET中很多涉及到IO和网络操作的类库都采用了APM方式,对于这些应用如果我们不调用 EndInvoke来释放非托管资源的话,GC恐怕无能为力的。下节继续讨论基于事件的异步编程模式。



浅谈.NET下的多线程和并行计算(十一).NET异步编程模型基础下



上 次我们说了,要进行多线程编程,我们可以使用最原始的方式也是最灵活的方式进行,那就是Thread(ThreadPool)+信号量+ 锁+Control.Invoke。.NET的异步编程模型给我们提供了一种基于IAsyncResult的编程模式,它尤其适用于处理下面的应用场景:



1) 被阻止,正在等待一个 IAsyncResult



2) 被阻止,正在等待多个 IAsyncResult 对象



3) 轮询 IAsyncResult 上的完成情形



.NET还提供了基于事件的异步编程模式(EAP),它能够提供:



1) 后台执行耗时任务(例如下载和数据库操作),但不会中断应用程序



2) 同时执行多个操作,每个操作完成时都会接到通知



下面是一个符合基于事件的异步编程模式的类:



public class AsyncExample


{


public int Method1(string param);


public void Method2(double param);


 


public void Method1Async(string param);


public void Method1Async(string param, object userState);


public event Method1CompletedEventHandler Method1Completed;


 


public void Method2Async(double param);


public void Method2Async(double param, object userState);


public event Method2CompletedEventHandler Method2Completed;


 


public void CancelAsync(object userState);


 


public bool IsBusy { get; }


}


我们看到它的操作一般提供同步和异步两种模式,异步操作提供一个完成事件,还提供了取消异步操作的方法。对于某些更高级的组件还提供了汇报进度的功 能,通过一个进度汇报事件来完成,此事件通常将叫做 ProgressChanged 或 方法名称ProgressChanged,它对应的事件处理程序会带有一个 ProgressChangedEventArgs 参数。ProgressChanged 事件的事件处理程序可以检查 ProgressChangedEventArgs.ProgressPercentage 属性来确定异步任务完成的百分比。此属性的范围是 0 到 100,可用来更新 ProgressBar 的 Value 属性。



说到这里您可能想到了,BackgroundWorker就是遵从这种模式的组件。那么我们在设计组件的时候如何来选择实现基于事件的APM还是基于IAsyncResult的APM呢,MSDN上有这么一段指导原则:



1) 将基于事件的模式用作默认API 以公开类的异步行为。



2) 当类主要用在客户端应用程序(例如 Windows 窗体)中时,不要公开 IAsyncResult 模式。 (比如PictureBox的LoadAsync 方法以及LoadCompleted 事件



3) 仅在必须公开 IAsyncResult 模式才能满足要求时公开该模式。例如,需要与现有 API 兼容时可能需要公开 IAsyncResult 模式。



4) 不要在不公开基于事件的模式时公开 IAsyncResult 模式。 如果必须公开 IAsyncResult 模式,应将其作为高级选项公开。例如,如果生成一个代理对象,则应默认生成基于事件的模式,其中具有一个生成 IAsyncResult 模式的选项。



5) 在 IAsyncResult 模式实现上生成基于事件的模式实现。



6) 避免在同一个类上同时公开基于事件的模式和 IAsyncResult 模式。在“较高级别”的类上公开基于事件的模式,在“较低级别”的类上公开 IAsyncResult 模式。例如,比较 WebClient 组件上的基于事件的模式与 HttpRequest 类上的 IAsyncResult 模式。



来看一个WebClient的例子:



WebClient wc = new WebClient();


wc.Encoding = Encoding.UTF8;


wc.DownloadStringCompleted += new DownloadStringCompletedEventHandler(wc_DownloadStringCompleted);


wc.DownloadProgressChanged += new DownloadProgressChangedEventHandler(wc_DownloadProgressChanged);


wc.DownloadStringAsync(new Uri("http://www.cnblogs.com"), "test");


Console.WriteLine(DateTime.Now.ToString("mm:ss"));


Console.ReadLine();


进度更新事件处理方法:



static void wc_DownloadProgressChanged(object sender, DownloadProgressChangedEventArgs e)


{


Console.WriteLine("{0} downloaded {1} of {2} bytes. {3} % complete...", (string)e.UserState, e.BytesReceived, e.TotalBytesToReceive, e.ProgressPercentage);


}


完成下载事件处理方法:



static void wc_DownloadStringCompleted(object sender, DownloadStringCompletedEventArgs e)


{


Console.WriteLine(DateTime.Now.ToString("mm:ss"));


Console.WriteLine(e.Result.Substring(0, 300));


}


程序输出结果:



clip_image098



我们可以看到WebClient的DownloadStringAsync方法在内部使用了WebRequest:



public void DownloadStringAsync(Uri address, object userToken)


{


if (Logging.On)


{


Logging.Enter(Logging.Web, this, "DownloadStringAsync", address);


}


if (address == null)


{


throw new ArgumentNullException("address");


}


this.InitWebClientAsync();


this.ClearWebClientState();


AsyncOperation asyncOp = AsyncOperationManager.CreateOperation(userToken);


this.m_AsyncOp = asyncOp;


try


{


WebRequest request = this.m_WebRequest = this.GetWebRequest(this.GetUri(address));


this.DownloadBits(request, null, new CompletionDelegate(this.DownloadStringAsyncCallback), asyncOp);


}


catch (Exception exception)


{


if (((exception is ThreadAbortException) || (exception is StackOverflowException)) || (exception is OutOfMemoryException))


{


throw;


}


if (!(exception is WebException) && !(exception is SecurityException))


{


exception = new WebException(SR.GetString("net_webclient"), exception);


}


this.DownloadStringAsyncCallback(null, exception, asyncOp);


}


catch


{


Exception exception2 = new WebException(SR.GetString("net_webclient"), new Exception(SR.GetString("net_nonClsCompliantException")));


this.DownloadStringAsyncCallback(null, exception2, asyncOp);


}


if (Logging.On)


{


Logging.Exit(Logging.Web, this, "DownloadStringAsync", "");


}


}


而且,使用了WebRequest的基于IAsyncResult的APM,可以看看DownloadBits的定义:



private byte[] DownloadBits(WebRequest request, Stream writeStream, CompletionDelegate completionDelegate, AsyncOperation asyncOp)


{


WebResponse response = null;


DownloadBitsState state = new DownloadBitsState(request, writeStream, completionDelegate, asyncOp, this.m_Progress, this);


if (state.Async)


{


request.BeginGetResponse(new AsyncCallback(WebClient.DownloadBitsResponseCallback), state);


return null;


}


response = this.m_WebResponse = this.GetWebResponse(request);


int bytesRetrieved = state.SetResponse(response);


while (!state.RetrieveBytes(ref bytesRetrieved))


{


}


state.Close();


return state.InnerBuffer;


}


在这里BeginGetResponse(),DownloadBitsResponseCallback回调方法如下:



private static void DownloadBitsResponseCallback(IAsyncResult result)


{


DownloadBitsState asyncState = (DownloadBitsState)result.AsyncState;


WebRequest request = asyncState.Request;


Exception exception = null;


try


{


WebResponse webResponse = asyncState.WebClient.GetWebResponse(request, result);


asyncState.WebClient.m_WebResponse = webResponse;


asyncState.SetResponse(webResponse);


}


catch (Exception exception2)


{


if (((exception2 is ThreadAbortException) || (exception2 is StackOverflowException)) || (exception2 is OutOfMemoryException))


{


throw;


}


exception = exception2;


if (!(exception2 is WebException) && !(exception2 is SecurityException))


{


exception = new WebException(SR.GetString("net_webclient"), exception2);


}


AbortRequest(request);


if ((asyncState != null) && (asyncState.WriteStream != null))


{


asyncState.WriteStream.Close();


}


}


finally


{


if (exception != null)


{


asyncState.CompletionDelegate(null, exception, asyncState.AsyncOp);


}


}


}


很显然,在WebClient.GetWebResponse中我们会进行EndGetResponse()操作:



protected virtual WebResponse GetWebResponse(WebRequest request, IAsyncResult result)


{


WebResponse response = request.EndGetResponse(result);


this.m_WebResponse = response;


return response;


}


那么继续看看SetResponse:



internal int SetResponse(WebResponse response)


   {


   this.ContentLength = response.ContentLength;


   if ((this.ContentLength == -1L) || (this.ContentLength > 0x10000L))


   {


   this.Length = 0x10000L;


   }


   else


   {


   this.Length = this.ContentLength;


   }


   if (this.WriteStream == null)


   {


   if (this.ContentLength > 0x7fffffffL)


   {


   throw new WebException(SR.GetString("net_webstatus_MessageLengthLimitExceeded"), WebExceptionStatus.MessageLengthLimitExceeded);


   }


   this.SgBuffers = new ScatterGatherBuffers(this.Length);


   }


   this.InnerBuffer = new byte[(int)this.Length];


   this.ReadStream = response.GetResponseStream();


   if (this.Async && (response.ContentLength >= 0L))


   {


   this.Progress.TotalBytesToReceive = response.ContentLength;


   }


   if (this.Async)


   {


   if ((this.ReadStream == null) || (this.ReadStream == Stream.Null))


   {


   WebClient.DownloadBitsReadCallbackState(this, null);


   }


   else


   {


   this.ReadStream.BeginRead(this.InnerBuffer, this.Offset, ((int)this.Length) - this.Offset, new AsyncCallback(WebClient.DownloadBitsReadCallback), this);


   }


   return -1;


   }


   if ((this.ReadStream != null) && (this.ReadStream != Stream.Null))


   {


   return this.ReadStream.Read(this.InnerBuffer, this.Offset, ((int)this.Length) - this.Offset);


   }


   return 0;


   }


关注下ReadStream.BeginRead()的回调方法:



private static void DownloadBitsReadCallback(IAsyncResult result)


{


DownloadBitsState asyncState = (DownloadBitsState)result.AsyncState;


DownloadBitsReadCallbackState(asyncState, result);


}


继续看看DownloadBitsReadCallbackState:



private static void DownloadBitsReadCallbackState(DownloadBitsState state, IAsyncResult result)


{


Stream readStream = state.ReadStream;


Exception exception = null;


bool flag = false;


try


{


int bytesRetrieved = 0;


if ((readStream != null) && (readStream != Stream.Null))


{


bytesRetrieved = readStream.EndRead(result);


}


flag = state.RetrieveBytes(ref bytesRetrieved);


}


catch (Exception exception2)


{


flag = true;


if (((exception2 is ThreadAbortException) || (exception2 is StackOverflowException)) || (exception2 is OutOfMemoryException))


{


throw;


}


exception = exception2;


state.InnerBuffer = null;


if (!(exception2 is WebException) && !(exception2 is SecurityException))


{


exception = new WebException(SR.GetString("net_webclient"), exception2);


}


AbortRequest(state.Request);


if ((state != null) && (state.WriteStream != null))


{


state.WriteStream.Close();


}


}


finally


{


if (flag)


{


if (exception == null)


{


state.Close();


}


state.CompletionDelegate(state.InnerBuffer, exception, state.AsyncOp);


}


}


}


在这里EndRead(),再来看看再来看看RetrieveBytes()方法:



internal bool RetrieveBytes(ref int bytesRetrieved)


{


if (bytesRetrieved > 0)


{


if (this.WriteStream != null)


{


this.WriteStream.Write(this.InnerBuffer, 0, bytesRetrieved);


}


else


{


this.SgBuffers.Write(this.InnerBuffer, 0, bytesRetrieved);


}


if (this.Async)


{


this.Progress.BytesReceived += (long)bytesRetrieved;


}


if (this.Offset != this.ContentLength)


{


if (this.Async)


{


this.WebClient.PostProgressChanged(this.AsyncOp, this.Progress);


this.ReadStream.BeginRead(this.InnerBuffer, this.Offset, ((int)this.Length) - this.Offset, new AsyncCallback(WebClient.DownloadBitsReadCallback), this);


}


else


{


bytesRetrieved = this.ReadStream.Read(this.InnerBuffer, this.Offset, ((int)this.Length) - this.Offset);


}


return false;


}


}


if (this.Async)


{


if (this.Progress.TotalBytesToReceive < 0L)


{


this.Progress.TotalBytesToReceive = this.Progress.BytesReceived;


}


this.WebClient.PostProgressChanged(this.AsyncOp, this.Progress);


}


if (this.ReadStream != null)


{


this.ReadStream.Close();


}


if (this.WriteStream != null)


{


this.WriteStream.Close();


}


else if (this.WriteStream == null)


{


byte[] dst = new byte[this.SgBuffers.Length];


if (this.SgBuffers.Length > 0)


{


BufferOffsetSize[] buffers = this.SgBuffers.GetBuffers();


int dstOffset = 0;


for (int i = 0; i < buffers.Length; i++)


{


BufferOffsetSize size = buffers[i];


Buffer.BlockCopy(size.Buffer, 0, dst, dstOffset, size.Size);


dstOffset += size.Size;


}


}


this.InnerBuffer = dst;


}


return true;


}


WebClient的PostProgressChanged方法,在汇报进度的时候调用了AsyncOperation的Post方法:



private void PostProgressChanged(AsyncOperation asyncOp, ProgressData progress)


{


if ((asyncOp != null) && ((progress.BytesSent + progress.BytesReceived) > 0L))


{


int num;


if (progress.HasUploadPhase)


{


if ((progress.TotalBytesToReceive < 0L) && (progress.BytesReceived == 0L))


{


num = (progress.TotalBytesToSend < 0L) ? 0 : ((progress.TotalBytesToSend == 0L) ? 50 : ((int)((50L * progress.BytesSent) / progress.TotalBytesToSend)));


}


else


{


num = (progress.TotalBytesToSend < 0L) ? 50 : ((progress.TotalBytesToReceive == 0L) ? 100 : ((int)(((50L * progress.BytesReceived) / progress.TotalBytesToReceive) + 50L)));


}


asyncOp.Post(this.reportUploadProgressChanged, new UploadProgressChangedEventArgs(num, asyncOp.UserSuppliedState, progress.BytesSent, progress.TotalBytesToSend, progress.BytesReceived, progress.TotalBytesToReceive));


}


else


{


num = (progress.TotalBytesToReceive < 0L) ? 0 : ((progress.TotalBytesToReceive == 0L) ? 100 : ((int)((100L * progress.BytesReceived) / progress.TotalBytesToReceive)));


asyncOp.Post(this.reportDownloadProgressChanged, new DownloadProgressChangedEventArgs(num, asyncOp.UserSuppliedState, progress.BytesReceived, progress.TotalBytesToReceive));


}


}


}


MSDN中有这么一段描述:为了使类正确运行,应当使用给定应用程序模型(包括 ASP.NET 和 Windows 窗体应用程序)的适当线程或上下文调用客户端事件处理程序,这一点很重要。我们提供了两个重要的帮助器类,以确保您的异步类在任何应用程序模型中都能正确 运行,这两个帮助器类是 AsyncOperation 和 AsyncOperationManager。AsyncOperationManager 提供了 CreateOperation 方法,该方法会返回一个 AsyncOperation。方法名称Async 方法调用 CreateOperation,类使用返回的 AsyncOperation 跟踪异步任务的生存期。若要向客户端报告进度、增量结果和完成,请调用 AsyncOperation 的 Post 和 OperationCompleted 方法。AsyncOperation 负责将对客户端事件处理程序的调用封送到适当的线程和上下文。



7) 当为了提供兼容性需要在同一个类上公开基于事件的模式和 IAsyncResult 模式时,同时公开这两种模式。例如,如果已经释放了一个使用 IAsyncResult 模式的 API,则需要保留 IAsyncResult 模式以提供向后兼容性。



8)如果得到的对象模型复杂性方面的优点大于分开实现的优点,则在同一个类上实现基于事件的模式和 IAsyncResult 模式。在一个类上同时公开两种模式比避免公开基于事件的模式效果更好。



9) 如果必须在同一个类上同时公开基于事件的模式和 IAsyncResult 模式,可使用设置为 Advanced 的 EditorBrowsableAttribute 将 IAsyncResult 模式实现标记为高级功能。这指示设计环境(如 Visual Studio IntelliSense)不显示 IAsyncResult 属性和方法。这些属性和方法仍然是完全可用的,但使用 IntelliSense 的开发人员能够更清楚地查看 API。



我们可以发现从自己实现异步编程,到使用基于IAsyncResult的APM到使用基于事件的APM,使用上越来越简单,封装的也越来越厉害(自 己实现一个WebClient的工作量也很大)。之所以有APM这种模式是因为.NET的BCL不管是基于线程池的异步操作还是基于IOCP的异步操作都 遵从了APM,对于开发人员来说不需要过多学习就可以掌握大多数提供了异步能力的类库的使用。而对于组件或类库设计师来说就更要遵循这个模式,并且要根据 需要选择提供基于IAsyncResult的APM还是基于事件的APM还是两者都提供。通过这两篇文章的介绍您应该已经知道了如何去使用基于APM的类 库,在之后高级系列文章中我们会介绍如何去实现自己的APM组件。



在这十篇文章中我们介绍了线程/线程池/线程同步/基于多线程的组件/异步编程模式,这些文章中用到的例子可以在这里下载(VS2008解决方案)。



浅谈.NET下的多线程和并行计算(十二)CLR via C#第三版阅读笔记(1)



最 近此书出了第三版,在阅读此书线程部分的过程中有很多心得,补充了此前知识盲点,因此把这些关键和重要的知识点汇集成日志文章并且纳入到这个系列中。顺便 说一下,笔者喜欢这本书的原因是作者作为微软顾问并没有按照MSDN的教条教大家怎么去用而是能说出很多自己的观点甚至很多是微软.NET框架不够的地 方,并给出自己的实现。



为什么说线程是比较昂贵的?



1) 从内存上来说,(对于32位架构)每一个线程包含线程内核对象(700字节)/线程环境块(4KB)/内核堆栈(12KB)/用户堆栈(1MB)。并且可 以发现,这1MB的用户堆栈内存在CLR线程创建的时候完全分配,并不是动态增加的(Windows线程的创建只是保留1MB的内存空间)。



2) 从线程切换上来说,需要做哪些步骤来进行切换?首先是把CPU寄存器中的值保存到当前线程的内核对象中,然后如果线程切换到不同CPU的话需要为CPU准备新的虚拟地址空间,最后把目标线程内核对象中寄存器的值复制到CPU寄存器中。



3) 更大的性能损害来自于,线程切换之后缓存中的数据可能会不能命中,需要重新准备这些数据。



4) 此外,在垃圾回收的时候,CLR会挂起所有线程,查看线程堆栈,垃圾回收压缩后重置堆栈指针地址。



当然,线程总比进程的创建好一点,不过作者也说了线程多啊,一个OUTLOOK有几十个线程,作者还纳闷怎么打开记事本的打开对话框会多22个线 程,我想这个和操作系统有关,Vista的打开文件对话框更复杂,在其中为了不阻塞UI很多Part都以新的线程来加载内容,还好这个打开对话框用的不是 CLR线程……当前CLR线程对应Windows线程,作者也希望在将来CLR能实现虚拟逻辑线程的概念,以改善性能。



什么时候手动创建线程而不是使用线程池



1) 需要自定义线程的优先级,线程池的线程总是Normal。



2) 需要一个前台线程,线程池的线程总是后台线程。



作者建议大家对于非UI线程创建为后台线程而不是前台线程,有的时候我们可以发现有些软件在关闭之后,或者说关闭UI之后在进程中还存在,占用内存,这是因为我们看到关闭的是UI线程,还有其它前台线程未关闭。



3) 需要手动中止线程,线程池不提供这个功能。



4) 线程执行时间很长,线程池用于短而多的线程任务比较合适。



线程的调度



1) 每一个线程的优先级是0到31。高优先级的线程ready之后,不管低优先级的线程在做什么,立即上位,没话说。Windows会把最高优先级的不同线程调度到各个CPU上并行执行,多核多处理器谁也不闲着。



2) Windows制定进程有6个优先等级,线程有7个,通过组合来得出实际的线程优先级0到30(0优先级保留给Windows用于内存释放)。CLR保留了线程优先级中的最低和最高级,供程序员可设置的只有5个等级。



3) 进程的优先级是一个虚拟的概念,只是为了帮助用于映射到1-31中的某个等级,一般来说进程的等级默认为创建它的进程的等级。很多进程都是Windows Explorer创建的,默认也就是Nomral这个等级,说白了我们的线程在大多情况下映射到Windows线程优先级为6-10。



CLR线程池



1) 最小线程数,线程池的线程总大于等于这个值,一般这个值设置为逻辑CPU数,也就是能充分利用CPU同时执行这些线程。



2) 最大线程数,默认1000,不建议修改这个值,如果这个值过小,很可能运行的线程的都被阻塞,而排队的线程永远得不到执行,作者甚至建议CLR团队放开这个限制,让线程池尽可能允许创建更多线程一直到内存溢出。



3) 线程池是非常智能的,并不会发现可用线程不够马上创建新的线程,而是会有一个延迟以确保真的需要新的线程来补充(因为也不建议线程池中的方法执行时间太长 比如超过500毫秒,影响线程池的判断)。线程池的目的就是减少实际线程的创建和回收,重复利用线程来做不同的工作。



有关APM



1) 即使不需要得到异步操作结果也建议调用EndXXX一次并且只是一次。一是用于捕获操作产生的异常二是用来释放分配的内存。



2) 作者认为异步调用产生的小对象在频繁调用的时候对性能有影响,如果操作确实执行时间很多,同步调用也罢。



3) 并不是所有的IO操作都原生支持异步的,比如创建文件,列目录等,对于这些操作只能通过新线程来模拟异步(占用CPU)。



4) 大部分异步IO不支持取消,即使操作系统支持FCL也不一定把这个功能纳入,其实我们也明白很多时候FCL做的是对Windows API的封装,这个封装的覆盖可能不全也可能没那么及时。



有关EAP



1) 可以看出作者很不喜欢EAP,作者认为EAP主要作用是方便IDE中进行拖拽使用,并且支持汇报线程工作在UI线程。作者认为MSDN上的那篇文章的一些指导原则大部分人都不同意,因为那文章是Winform组人写的。



2) 作者认为EAP消耗更多的内存,并且使用起来也不方便。因为注册事件的代码如果在开始异步方法之后可能会导致异步方法不能执行,而且如果需要更换处理方法 的话还需要反注册事件。并且EAP在处理异常上也不方便。在.NET 4.0中提供的Task能大大改善异步模式的易用性。



有关IO操作的异步



1) 我们知道计算密集的操作我们可以通过开启多个线程充分利用多核来提高系统性能。而对于IO操作如果仅仅采用多线程的话也仅仅能起到不卡住UI线程的作用, 由于IO操作时间长,也就会需要大量的新线程来处理。硬件和操作系统对很多IO行为支持IOCP,这样能进行IO操作的时候不占用线程。那么我们也就特别 需要注意怎么去使用API来开启IOCP的支持,而不能一味认为使用APM就是使用了IOCP(比如FileStream的使用注意 FileOptions.Asynchronous开关)。



2) 即使支持了IOCP,我们也要知道如何正确的去使用APM,在BeginXXX后立即EndXXX显然不是合理的使用方式,这就相当于说“你去做这件事情吧,然后我还跟在你后面看着你做”。