C#线程篇---Task(任务)和线程池不得不说的秘密(5)

在上篇最后一个例子之后,我们发现了怎么去使用线程池,调用ThreadPool的QueueUserWorkItem方法来发起一次异步的、计算限制的操作,例子很简单,不是吗?

  然而,在今天这篇博客中,我们要知道的是,QueueUserWorkItem这个技术存在许多限制。其中最大的问题是没有一个内建的机制让你知道操作在什么时候完成,也没有一个机制在操作完成是获得一个返回值,这些问题使得我们都不敢启用这个技术。

  Microsoft为了克服这些限制(同时解决其他一些问题),引入了任务(tasks)的概念。顺带说一下我们得通过System.Threading.Tasks命名空间来使用它们。

  现在我要说的是,用线程池不是调用ThreadPool的QueueUserWorkItem方法,而是用任务来做相同的事:

 1         static void Main(string[] args)
 2         {
 3             Console.WriteLine("主线程启动");
 4             //ThreadPool.QueueUserWorkItem(StartCode,5);
 5             new Task(StartCode, 5).Start();
 6             Console.WriteLine("主线程运行到此!");
 7             Thread.Sleep(1000);
 8         }
 9
10         private static void StartCode(object i)
11         {
12             Console.WriteLine("开始执行子线程...{0}",i);
13             Thread.Sleep(1000);//模拟代码操作
14         }
15     }

嘿,你会发现结果是一样的。
再来看看这个是什么:

TaskCreationOptions这个类型是一个枚举类型,传递一些标志来控制Task的执行方式。TaskCreationOptions定义如下:

慢点,注释很详细,看看这些有好处,TaskScheduler(任务调度器)不懂没关系,请继续往下看,我会介绍的,但请注意,这些标识都只是一些提议而已,在调度一个Task时,可能会、也可能不会采纳这些提议,不过有一条要注意:AttachedToParent标志,它总会得到Task采纳,因为它和TaskScheduler本身无关。

  来看下这段代码:

 1         static void Main(string[] args)
 2         {
 3
 4             //1000000000这个数字会抛出System.AggregateException
 5
 6             Task<Int32> t = new Task<Int32>(n => Sum((Int32)n), 1000000000);
 7
 8             //可以现在开始,也可以以后开始
 9
10             t.Start();
11
12             //Wait显式的等待一个线程完成
13
14             t.Wait();
15
16             Console.WriteLine("The Sum is:"+t.Result);
17         }
18
19         private static Int32 Sum(Int32 i)
20         {
21             Int32 sum = 0;
22             for (; i > 0; i--)
23                 checked { sum += i; }
24             return sum;
25         }
26     }

  这段代码大家应该猜得出是什么意思吧,人人都会写。
  但是,我的结果为什么是t.Result而不直接是返回的Sum呢?  有没有多此一举的感觉?

下面我来说说这段代码我想表达的意思:

  在一个线程调用Wait方法时,系统会检查线程要等待的Task是否已经开始执行,如果任务正在执行,那么这个Wait方法会使线程阻塞,知道Task运行结束为止。

  就说上面的程序执行,因为累加数字太大,它抛出算术运算溢出错误,在一个计算限制任务抛出一个未处理的异常时,这个异常会被“包含”不并存储到一个集合中,而线程池线程是允许返回到线程池中的,在调用Wait方法或者Result属性时,这个成员会抛出一个System.AggregateException对象。

  现在你会问,为什么要调用Wait或者Result?或者一直不查询Task的Exception属性?你的代码就永远注意不到这个异常的发生,如果不能捕捉到这个异常,垃圾回收时,抛出AggregateException,进程就会立即终止,这就是“牵一发动全身”,莫名其妙程序就自己关掉了,谁也不知道这是什么情况。所以,必须调用前面提到的某个成员,确保代码注意到异常,并从异常中恢复。悄悄告诉你,其实在用Result的时候,内部会调用Wait。

  怎么恢复?

  为了帮助你检测没有注意到的异常,可以向TaskScheduler的静态UnobservedTaskException时间等级一个回调方法,当Task被垃圾回收时,如果出现一个没有被注意到的异常,CLR终结器会引发这个事件。一旦引发,就会向你的时间处理器方法传递一个UnobservedTaskExceptionEvenArgs对象,其中包含了你没有注意的AggregateException。然后再调用UnobservedTasExceptionEvenArgs的SetObserved方法来指出你的异常已经处理好了,从而阻止CLR终止进程。这是个图省事的做法,要少做这些,宁愿终止进程,也不要呆着已经损坏的状态而继续运行。做人也一样,病了宁肯休息,也不要带病坚持上班,你没那么伟大,公司也不需要你的这一点伟大,命是自己的。(─.─|||扯远了。

  除了单个等待任务,Task 还提供了两个静态方法:WaitAny和WaitAll,他们允许线程等待一个Task对象数组。

  WaitAny方法会阻塞调用线程,知道数组中的任何一个Task对象完成,这个方法会返回一个索引值,指明完成的是哪一个Task对象。如果发生超时,方法将返回-1。它可以通过一个CancellationToken取消,会抛出一个OperationCanceledException。

  WaitAll方法也会阻塞调用线程,知道数组中的所有Task对象都完成,如果全部完成就返回true,如果超时就返回false。当然它也能取消,同样会抛出OperationCanceledException。

  说了这么两个取消任务的方法,现在来试试这个方法,加深下印象,修改先前例子代码,完整代码如下:

 1         static void Main(string[] args)
 2         {
 3             CancellationTokenSource cts = new CancellationTokenSource();
 4
 5              6
 7             Task<Int32> t = new Task<Int32>(() => Sum(cts.Token,10000), cts.Token);
 8
 9             //可以现在开始,也可以以后开始
10
11             t.Start();
12
13             //在之后的某个时间,取消CancellationTokenSource 以取消Task
14
15             cts.Cancel();//这是个异步请求,Task可能已经完成了。我是双核机器,Task没有完成过
16
17
18             //注释这个为了测试抛出的异常
19             //Console.WriteLine("This sum is:" + t.Result);
20             try
21             {
22                 //如果任务已经取消了,Result会抛出AggregateException
23
24                 Console.WriteLine("This sum is:" + t.Result);
25             }
26             catch (AggregateException x)
27             {
28                 //将任何OperationCanceledException对象都视为已处理。
29                 //其他任何异常都造成抛出一个AggregateException,其中
30                 //只包含未处理的异常
31
32                 x.Handle(e => e is OperationCanceledException);
33                 Console.WriteLine("Sum was Canceled");
34             }
35
36         }
37
38         private static Int32 Sum(CancellationToken ct ,Int32 i)
39         {
40             Int32 sum = 0;
41             for (; i > 0; i--)
42             {
43                 //在取消标志引用的CancellationTokenSource上如果调用
44                 //Cancel,下面这一行就会抛出OperationCanceledException
45
46                 ct.ThrowIfCancellationRequested();
47
48                 checked { sum += i; }
49             }
50
51             return sum;
52         }
53     }

  这个例子展示了一个任务在进行的时候中途取消的操作,我觉得它很有趣,你试试也会发现。
  Lamada表达式写这个,是个亮点,得学学,将CancellationToken闭包变量“传递”。

  

  如果不用Lamada表达式,这问题还真不好解决:

  Task<Int32> t = new Task<Int32>(() => Sum(cts.Token,10000), cts.Token);

  Sum(cts.Token,10000) 内的Token需要和cts.Token关联起来,你还能想出怎么关联起来么?

  

  好,任务取消也讲玩了,来看个更好用的技术:

  

 1         static void Main(string[] args)
 2         {
 3
 4             Task<Int32> t = new Task<Int32>(i => Sum((Int32)i),10000);
 5
 6             //可以现在开始,也可以以后开始
 7
 8             t.Start();
 9
10            Task cwt =  t.ContinueWith(task=>Console.WriteLine("The sum is:{0}",task.Result));
11            cwt.Wait();
12
13         }
14
15         private static Int32 Sum(Int32 i)
16         {
17             Int32 sum = 0;
18             for (; i > 0; i--)
19             {
20                 checked { sum += i; }
21             }
22
23             return sum;
24         }
25     }

ContinueWith?  啥东西~~??

  要写可伸缩的软件,一定不能使你的线程阻塞。这意味着如果调用Wait或者在任务未完成时查询Result属性,极有可能造成线程池创建一个新线程,这增大了资源的消耗,并损害了伸缩性。

  ContinueWith便是一个更好的方式,一个任务完成时它可以启动另一个任务。上面的例子不会阻塞任何线程。

  当Sum的任务完成时,这个任务会启动另一个任务以显示结果。ContinueWith会返回对新的Task对象的一个引用,所以为了看到结果,我需要调用一下Wait方法,当然你也可以查询下Result,或者继续ContinueWith,返回的这个对象可以忽略,它仅仅是一个变量。

  还要指出的是,Task对象内部包含了ContinueWith任务的一个集合。所以,实际上可以用一个Task对象来多次调用ContinueWith。任务完成时,所有ContinueWith任务都会进入线程池队列中,在构造ContinueWith的时候我们可以看到一个TaskContinuationOptions枚举值,不能忽视,看看它的定义:

PrefereFairness是尽量公平的意思,就是较早调度的任务可能较早的运行,先来后到,将线程放到全局队列,便可以实现这个效果。

ExecuteSynchronously指同步执行,强制两个任务用同一个线程一前一后运行,然后就同步运行了。

看得是不是晕乎乎 ?有这么多枚举例子,怎么掌握啊?多看几次,知道任务的使用情况,以后用起来得心应手~想学新技术,就要能耐住,才能基础牢固。来看个例子,用用这些枚举。

 1         static void Main(string[] args)
 2         {
 3             Task<Int32> t = new Task<Int32>(i => Sum((Int32)i),10000);
 4
 5             t.Start();
 6
 7             t.ContinueWith(task=>Console.WriteLine("The sum is:{0}",task.Result),
 8                 TaskContinuationOptions.OnlyOnRanToCompletion);
 9
10             t.ContinueWith(task=>Console.WriteLine("Sum throw:"+task.Exception),
11                 TaskContinuationOptions.OnlyOnFaulted);
12
13             t.ContinueWith(task=>Console.WriteLine("Sum was cancel:"+task.IsCanceled),
14                 TaskContinuationOptions.OnlyOnCanceled);
15             try
16             {
17                 t.Wait();  // 测试用
18             }
19             catch (AggregateException)
20             {
21                 Console.WriteLine("出错");
22             }
23
24
25         }
26
27         private static Int32 Sum(Int32 i)
28         {
29             Int32 sum = 0;
30             for (; i > 0; i--)
31             {
32                 checked { sum += i; }
33             }
34
35             return sum;
36         }
37     }

  ContinueWith讲完了。可是还没有结束哦。

  AttachedToParnt枚举类型(父任务)也不能放过!看看怎么用,写法有点新奇,看看:

 1         static void Main(string[] args)
 2         {
 3             Task<Int32[]> parent = new Task<Int32[]>(() => {
 4                 var results = new Int32[3];
 5                 //
 6                 new Task(() => results[0] = Sum(10000), TaskCreationOptions.AttachedToParent).Start();
 7                 new Task(() => results[1] = Sum(20000), TaskCreationOptions.AttachedToParent).Start();
 8                 new Task(() => results[2] = Sum(30000), TaskCreationOptions.AttachedToParent).Start();
 9                 return results;
10             });
11
12             var cwt = parent.ContinueWith( parentTask=>Array.ForEach(parentTask.Result,Console.WriteLine));
13
14
15             parent.Start();
16             cwt.Wait();
17         }
18
19         private static Int32 Sum(Int32 i)
20         {
21             Int32 sum = 0;
22             for (; i > 0; i--)
23             {
24                 checked { sum += i; }
25             }
26             return sum;
27         }
28     }

Oh,我都写晕了。。。(+﹏+)~
例子中,父任务创建兵启动3个Task对象。默认情况下,一个任务创建的Task对象是顶级任务,这些任务跟创建它们的那个任务没有关系。

TaskCreationOptions.AttachedToParent标志将一个Task和创建它的那个Task关联起来,除非所有子任务(子任务的子任务)结束运行,否则创建任务(父任务)不会认为已经结束。调用ContinueWith方法创建一个Task时,可以指定TaskContinuationOptions.AttachedToParent标志将延续任务置顶为一个子任务。

  看了这么多任务的方法操作示例了,现在来挖挖任务内部构造

  每个Task对象都有一组构成任务状态的字段。

  •   一个Int32 ID(只读属性)
  • 代表Task执行状态的一个Int32
  • 对父任务的一个引用
  • 对Task创建时置顶TaskSchedule的一个引用
  • 对回调方法的一个引用
  • 对要传给回调方法的对象的一个引用(通过Task只读AsyncState属性查询)
  • 对一个ExceptionContext的引用
  • 对一个ManualResetEventSlim对象的引用

还有没个Task对象都有对根据需要创建的一些补充状态的一个引用,补充状态包含这些:

  • 一个CancellationToken
  • 一个ContinueWithTask对象集合
  • 为抛出未处理异常的子任务,所准备的一个Task对象集合

说了这么多,只想要大家知道:

  虽然任务提供了大量功能,但并不是没有代价的。因为必须为所有的这些状态分配内存。

如果不需要任务提供的附加功能,使用ThreadPool.QueueUserWorkItem,资源的使用效率会更高一些。

Task类还实现了IDispose接口,允许你在用完Task对象后调用Dispose,不过大多数不管,让垃圾回收器回收就好。

创建一个Task对象时,代表Task唯一的一个Int32字段初始化为零,TaskID从1开始,每分配一个ID都递增1。顺带说一下,在你调试中查看一个Task对象的时候,会造成调试器显示Task的ID,从而造成为Task分配一个ID。

  这个ID的意义在于,每个Task都可以用一个唯一的值来标识。Visual Studio会在它的“并行任务”和并行堆栈“窗口中显示这些任务ID。要知道的是,这是Visual Studio自己分配的ID,不是在自己代码中分配的ID,几乎不可能将Visual Studio分配的ID和代码正在做的事情联系起来。要查看自己正在运行的任务,可以在调试的时候查看Task的静态CurrentId属性,如果没有任务在执行,CurrentId返回null。

  再看看TaskStatus的值,这个可以查询Task对象的生存期:

这些在任务运行的时候都是可以一一查到的,还有~判断要像这样:

1 if(task.Status==TaskStatus.RantoCompletion)...

为了简化编码,Task只提供几个只读Boolean属性:IsCanceled,IsFaulted,IsCompleted,它们能返回最终状态true/false。
如果Task是通过调用某个函数来创建的,这个Task对象就会出于WaitingForActivation状态,它会自动运行。

最后我们要来了解一下TaskFactory(任务工厂):

  1.需要创建一组Task对象来共享相同的状态

  2.为了避免机械的将相同的参数传给每一个Task的构造器。

满足这些条件就可以创建一个任务工厂来封装通用的状态。TaskFactory类型和TaskFactory<TResult>类型,它们都派生System.Object。

你会学到不一样的编码方式:

 1         static void Main(string[] args)
 2         {
 3             Task parent = new Task(() =>
 4             {
 5                 var cts = new CancellationTokenSource();
 6                 var tf = new TaskFactory<Int32>(cts.Token, TaskCreationOptions.AttachedToParent, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
 7
 8                 //创建并启动3个子任务
 9                 var childTasks = new[] {
10             tf.StartNew(() => Sum(cts.Token, 10000)),
11             tf.StartNew(() => Sum(cts.Token, 20000)),
12             tf.StartNew(() => Sum(cts.Token, Int32.MaxValue))  // 这个会抛异常
13          };
14
15                 // 任何子任务抛出异常就取消其余子任务
16                 for (Int32 task = 0; task < childTasks.Length; task++)
17                     childTasks[task].ContinueWith(t => cts.Cancel(), TaskContinuationOptions.OnlyOnFaulted);
18
19                 // 所有子任务完成后,从未出错/未取消的任务获取返回的最大值
20                 // 然后将最大值传给另一个任务来显示最大结果
21                 tf.ContinueWhenAll(childTasks,
22                    completedTasks => completedTasks.Where(t => !t.IsFaulted && !t.IsCanceled).Max(t => t.Result),
23                    CancellationToken.None)
24                    .ContinueWith(t => Console.WriteLine("The maxinum is: " + t.Result),
25                       TaskContinuationOptions.ExecuteSynchronously).Wait(); // Wait用于测试
26             });
27
28             // 子任务完成后,也显示任何未处理的异常
29             parent.ContinueWith(p =>
30             {
31                 // 用StringBuilder输出所有
32
33                 StringBuilder sb = new StringBuilder("The following exception(s) occurred:" + Environment.NewLine);
34                 foreach (var e in p.Exception.Flatten().InnerExceptions)
35                     sb.AppendLine("   " + e.GetType().ToString());
36                 Console.WriteLine(sb.ToString());
37             }, TaskContinuationOptions.OnlyOnFaulted);
38
39             // 启动父任务
40             parent.Start();
41
42             try
43             {
44                 parent.Wait(); //显示结果
45             }
46             catch (AggregateException)
47             {
48             }
49         }
50
51         private static Int32 Sum(CancellationToken ct, Int32 n)
52         {
53             Int32 sum = 0;
54             for (; n > 0; n--)
55             {
56                 ct.ThrowIfCancellationRequested();
57                 checked { sum += n; }
58             }
59             return sum;
60         }
61     }

任务工厂就这么用,就是一个任务的集合。

现在看看TaskScheduler(任务调度)

  任务基础结构是很灵活的,TaskScheduler对象功不可没。

  TaskScheduler对象负责执行调度的任务,同时向Visual Studio调试器公开任务信息,就像一座桥梁,让我们能够掌控自己的任务线程。

  TaskScheduler有两个派生类:thread pool task scheduler(线程池任务调度),和synchronization context task scheduler(同步上下文任务调度器)。默认情况下,所以应用程序使用的都是线程池任务调度器,这个任务调度器将任务调度给线程池的工作者线程。可以查询TaskScheduler的静态Default属性来获得对默认任务调度器的一个引用。

  同步上下文任务调度器通常用于桌面应用程序,Winfrom,WPF及Silverlight。这个任务调度器将多有任务都调度给应用程序的GUI线程,使所有任务代码都能成功更新UI组建,比如按钮、菜单项等。同步上下文任务调度器根本不使用线程池。同样,可以查询TaskScheduler的静态FromCurrentSynchronizationContext方法来获得对一个同步上下文任务调度器的引用。

就像这样创建类型:

  

1 //同步上下文任务调度
2 TaskScheduler m_syncContextTaskScheduler =
3            TaskScheduler.FromCurrentSynchronizationContext();

任务调度有很多的,下面列举一部分,供参考,更多的请参看http://code.msdn.microsoft.com/ParExtSamples  它包括了大量的示例代码。

  

  它内容实在有点多。写了我很久了。好不容易把任务这块一次写完,希望大家有更多的收获。

 --------------------下篇预告,线程池如何管理线程。把基础介绍完结,也算是一个新的起点了。^_^

时间: 2024-10-16 18:14:16

C#线程篇---Task(任务)和线程池不得不说的秘密(5)的相关文章

C#线程篇---Task(任务)和线程池不得不说的秘密

我们要知道的是,QueueUserWorkItem这个技术存在许多限制.其中最大的问题是没有一个内建的机制让你知道操作在什么时候完成,也没有一个机制在操作完成是获得一个返回值,这些问题使得我们都不敢启用这个技术. Microsoft为了克服这些限制(同时解决其他一些问题),引入了任务(tasks)的概念.顺带说一下我们得通过System.Threading.Tasks命名空间来使用它们. 现在我要说的是,用线程池不是调用ThreadPool的QueueUserWorkItem方法,而是用任务来做

C#线程篇---你所不知道的线程池(4)

线程的创建和销毁都要耗费大量的时间,有什么更好的办法?用线程池! 太多的线程浪费内存资源,有什么更好的办法?用线程池! 太多线程有损性能,有什么更好的办法?用线程池!(⊙_⊙)? 线程池是什么?继前三篇线程基础之后,我们要来学学线程池了.注意,这些信息相当有用! 为了设计和实现可伸缩的.可响应的和可靠的应用程序或组建,线程池是你必须采用的核心技术. 线程池是CLR的,线程池自动为你管理线程的创建和销毁,线程池创建的一组线程将为各种任务而重用,极大提高了使用线程的成本,这也就意味着,你的应用程序其

Python之路【第八篇】python实现线程池

线程池概念 什么是线程池?诸如web服务器.数据库服务器.文件服务器和邮件服务器等许多服务器应用都面向处理来自某些远程来源的大量短小的任务.构建服务器应用程序的一个过于简单的模型是:每当一个请求到达就创建一个新的服务对象,然后在新的服务对象中为请求服务.但当有大量请求并发访问时,服务器不断的创建和销毁对象的开销很大.所以提高服务器效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁,这样就引入了“池”的概念,“池”的概念使得人们可以定制一定量的资源,然后对这些资源

线程篇(二)

C# 温故而知新: 线程篇(二) 线程池和异步线程 目录: 1 什么是CLR线程池? 2 简单介绍下线程池各个优点的实现细节 3 线程池ThreadPool的常用方法介绍 4 简单理解下异步线程 5 异步线程的工作过程和几个重要的元素 6 有必要简单介绍下Classic Async Pattern 和Event-based Async Pattern 7 异步线程的发展趋势以及.net4.5异步的简化 8 本章示例 自定义一个简单的线程池 Asp.net异步IHttpAsyncHandler示例

菜鸟之旅——学习线程(Task)

菜鸟之旅--学习线程(Task) 前面两篇回顾线程和线程池的使用方法,微软在.NET4.5推出了新的线程模型-Task.本篇将简单的介绍Task的使用方法. Task与线程 Task与线程或者说线程池关系紧密,可以说是基于线程池实现的,虽说任务最终还是要抛给线程去执行,但是Task仍然会比线程.线程池的开销要小,并且提供了可靠的API来控制线任务执行. 使用Task来执行的任务最终会交给线程池来执行,若该任务需要长时间执行,可以将其标记为LongRunning,这是便会单独去请求创建线程来执行该

[Java Performance] 线程及同步的性能 - 线程池/ThreadPoolExecutors/ForkJoinPool

线程池和ThreadPoolExecutors 虽然在程序中可以直接使用Thread类型来进行线程操作,但是更多的情况是使用线程池,尤其是在Java EE应用服务器中,一般会使用若干个线程池来处理来自客户端的请求.Java中对于线程池的支持,来自ThreadPoolExecutor.一些应用服务器也确实是使用的ThreadPoolExecutor来实现线程池. 对于线程池的性能调优,最重要的参数就是线程池的大小. 对于任何线程池而言,它们的工作方式几乎都是相同的: 任务被投放到一个队列中(队列的

JAVA技术专题综述之线程篇(1)

本文详细介绍JAVA技术专题综述之线程篇 编写具有多线程能力的程序经常会用到的方法有: run(),start(),wait(),notify(),notifyAll(),sleep(),yield(),join() 还有一个重要的关键字:synchronized 本文将对以上内容进行讲解. 一:run()和start() 示例1: public cla ThreadTest extends Thread{public void run(){for(int i=0;i<10;i++){Syste

线程(Thread)、线程池(ThreadPool)技术

线程:是Windows任务调度的最小单位.线程是程序中的一个执行流,每个线程都有自己的专有寄存器(栈指针.程序计数器等),但代码区是共享的,即不同的线程可以执行同样的函数,在一个应用程序中,常常需要使用多个线程来处理不同的事情,这样可以提高程序的运行效率,也不会使主界面出现无响应的情况.在这里主要介绍线程(Thread).线程池(ThreadPool)两种不同创建线程的区别 在通常的情况下,当我们需要开启一个新的线程时,我们直接通过Thread(继承自 System.Threading;)去创建

Netty版本升级血泪史之线程篇

1. 背景 1.1. Netty 3.X系列版本现状 根据对Netty社区部分用户的调查,结合Netty在其它开源项目中的使用情况,我们可以看出目前Netty商用的主流版本集中在3.X和4.X上,其中以Netty 3.X系列版本使用最为广泛. Netty社区非常活跃,3.X系列版本从2011年2月7日发布的netty-3.2.4 Final版本到2014年12月17日发布的netty-3.10.0 Final版本,版本跨度达3年多,期间共推出了61个Final版本. 1.2. 升级还是坚守老版本