Net多线程编程(转载)

System.Threading.Tasks.Parallel类提供了Parallel.Invoke,Parallel.For,Parallel.ForEach这三个静态方法。

1 Parallel.Invoke

尽可能并行执行所提供的每个操作,除非用户取消了操作。

方法:

1)public static void Invoke(params Action[] actions);

2)public static void Invoke(ParallelOptions parallelOptions,

params Action[] actions);

参数:

parallelOptions:一个对象,用于配置此操作的行为。

Actions:要执行的操作数组

异常:

对方法1:

System.ArgumentNullException: actions 参数为 null。

System.AggregateException:当 actions 数组中的任何操作引发异常时引发的异常。

System.ArgumentException:actions数组包含 null 个元素。

对方法2除上述异常外还包括:

System.OperationCanceledException:parallelOptions 设置了System.Threading.CancellationToken。

System.ObjectDisposedException:在 parallelOptions 中与 System.Threading.CancellationToken 关联的System.Threading.CancellationTokenSource已被释放。

说明:

1)Invoke方法只有在actions全部执行完才会返回,即使在执行过程中出现异常也会完成。

2)不能保证actions中的所有操作同时执行。比如actions大小为4,但硬件线程数为2,那么同时运行的操作数最多为2。

3)actions中的操作并行的运行且与顺序无关,若编写与运行顺序有关的并发代码,应选择其他方法。

4)如果使用Invoke加载多个操作,多个操作运行时间迥异,总的运行时间以消耗时间最长操作为基准,这会导致很多逻辑内核长时间处于空闲状态。

5)受限的并行可扩展性,这源于Invoke所调用的委托数目是固定的。

2 Parallel.For

可能会并行运行迭代,可以监视和操作循环的状态。Parallel.For有多个重载的方法,下面列举部分方法。

方法:

1)public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int> body);

2)public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body);

3)public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body);

4)public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally);

参数:

fromInclusive:开始索引(含)。

toExclusive:结束索引(不含)。

body:将被每个迭代调用一次的委托。

parallelOptions:一个对象,用于配置此操作的行为。

localInit:一个委托,用于返回每个任务的本地数据的初始状态。

localFinally:一个委托,用于对每个任务的本地状态执行一个最终操作。

返回结果:

ParallelLoopResult :包含有关已完成的循环部分的信息。

异常:

System.ArgumentNullException:body 参数为 null,或 localInit 参数为 null,或 localFinally 参数为 null,或 parallelOptions 参数为 null。 System.AggregateException:包含在所有线程上引发的全部单个异常的异常。

对于方法3)和4)除包含以上异常外还包括:

System.OperationCanceledException:在 parallelOptions 设置了参数 System.Threading.CancellationToken。

System.ObjectDisposedException:在 parallelOptions 中与 System.Threading.CancellationToken 关联的 System.Threading.CancellationTokenSource已被释放。

说明:

1)不支持浮点和步进。

2)无法保证迭代的执行顺序。

3)如果fromInclusive大于或等于toExclusive,方法立即返回而不会执行任何迭代。

4)对于body参数中含有的ParallelLoopState实例,其作用为提早中断并行循环。

5)只有在迭代全部完成以后才会返回结果,否则循环将一直阻塞。

3 Parallel.ForEach

方法

1)public static ParallelLoopResult ForEach(IEnumerable<TSource> source, Action<TSource> body);

2)public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState> body);

3)public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, Action<TSource> body);

参数:

source:数据源

body:将被每个迭代调用一次的委托。

parallelOptions:一个对象,用于配置此操作的行为。

返回结果:

ParallelLoopResult :包含有关已完成的循环部分的信息。

异常:

System.ArgumentNullException:source 参数为 null。-或- 方body 参数为 null。

System.AggregateException:包含了所有线程上引发的全部单个异常。

对于方法2)还包括:

System.OperationCanceledException:在 parallelOptions 设置了参数 System.Threading.CancellationToken。

System.ObjectDisposedException:在 parallelOptions 中与 System.Threading.CancellationToken 关联的 System.Threading.CancellationTokenSource已被释放。

对于3)包括的异常为:

System.ArgumentNullException:source 参数为 null。-或- 方body 参数为 null。

System.InvalidOperationException:source 分区程序中的 System.Collections.Concurrent.Partitioner<TSource>.SupportsDynamicPartitions 属性返回 false。或 在 source 分区程序中的任何方法返回 null 时引发异常。或在source 分区程序中的 System.Collections.Concurrent.Partitioner<TSource>.GetPartitions(System.Int32)方法不返回正确数目的分区。

说明:

1)对于body参数中含有的ParallelLoopState实例,其作用为提早中断并行循环。

2)Parallel.ForEach方法不保证执行顺序,它不像foreach循环那样总是顺序执行。

3)对于方法3)中的source,它的类型是Partitioner<TSource>。可以使用Partitioner.Create方法创建分区,该方法的几个重整方法为:

l public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive);

l public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive, int rangeSize);

fromInclusive为范围下限(含),toExclusive为范围下限(不含),rangeSize为每个子范围的大小。

使用Partitioner创建的子范围大小默认大约是计算机内核的三倍,而当使用rangeSize指定范围大小时,那么子范围大小为指定值。

4)只有在迭代全部完成以后才会返回结果,否则循环将一直阻塞。

4 ParallelOptions

定义:

存储选项,用于配置 System.Threading.Tasks.Parallel 类的方法。

ParallelOptions属性

1)public CancellationToken CancellationToken { get; set; }

获取或设置传播有关应取消操作的通知。

2)public int MaxDegreeOfParallelism { get; set; }

获取或设置此 ParallelOptions 实例所允许的最大并行度。

3)public TaskScheduler TaskScheduler { get; set; }

获取或设置与此 System.Threading.Tasks.ParallelOptions 实例关联的 System.Threading.Tasks.TaskScheduler

说明:

1)通过设置CancellationToken来取消并行循环,当前正在运行的迭代会执行完,然后抛出System.OperationCanceledException类型的异常。

2)TPL的方法总是会试图利用所有可用内核以达到最好的效果,但是很可能.NET Framework内部使用的启发式算法所得到的注入和使用的线程数比实际需要的多(通常都会高于硬件线程数,这样会更好地支持CPU和I/O混合型的工作负载)。

通常将最大并行度设置为小于等于逻辑内核数。如果设置为等于逻辑内核数,那么要确保不会影响其他程序的执行。设置为小于逻辑内核数是为了有空闲内核来处理其他紧急的任务。

用途:

1)从循环外部取消并行循环

2)指定并行度

3)指定自定义任务调度程序

5 ParallelLoopState

定义:

可使并行循环迭代与其他迭代交互。 此类的实例由 Parallel 类提供给每个循环;不能在用户代码中创建实例。

方法:

1)Break()方法:通知并行循环在执行完当前迭代之后尽快停止执行,可确保低索引步骤完成。且可确保正在执行的迭代继续运行直到完成。

2)Stop()方法:通知并行循环尽快停止执行。对于尚未运行的迭代不能会尝试执行低索引迭代。不保证所有已运行的迭代都执行完。

用途:提早退出并行循环。

说明:

1)不能同时在同一个并行循环中同时使用Break和Stop。

2)Stop比Break更常用。break语句用在并行循环中的效果和用在串行循环中不同。Break用在并行循环中,委托的主体方法在每次迭代的时候被调用,退出委托的主体方法对并行循环的执行没有影响。Stop停止循环比Break快。

6 ParallelLoopResult结构

定义:

并行循环运行结果的信息。

属性:

1)public bool IsCompleted { get; }

如果该循环已运行完成(该循环的所有迭代均已执行,并且该循环没有收到提前结束的请求),则为 true;否则为 false。

2)public long? LowestBreakIteration { get; }

返回一个表示从中调用 Break 语句的最低迭代的整数

用途:判断当并行循环结束时,是否因调用了break方法或stop方法而提前退出并行循环,或所有迭代均已执行。

判断依据:


条件


IsCompleted


运行完成


!IsCompleted &&

LowestBreakIteration==null


使用了Stop语句而提前终止


!IsCompleted &&

LowestBreakIteration!=null


使用了Break语句而提前终止

7 捕获并行循环中的异常

原则:

1)异常优先于从循环外部取消和使用Break()方法或Stop()方法提前退出并行循环。

2)并行循环体抛出一个未处理的异常,并行循环就不能再开始新的迭代。

3)默认情况下当某次迭代抛出一个未处理异常,那么正在执行的迭代如果没抛出异常,正在执行的迭代会执行完。当所有迭代都执行完(有可能其他的迭代在执行的过程中也抛出异常),并行循环将在调用它的线程中抛出异常。

并行循环运行的过程中,可能有多个迭代抛出异常,所以一般使用AggregateException来捕获异常。AggregateException继承自Exception。为了防止仅使用AggregateException未能捕获某些异常,使用AggregateException的同时还要使用Exception。

8 使用模式

8.1 Parallel.Invoke

 1 public static void DemonstrateInvoke()
 2 {
 3     //使用Lambda
 4     Parallel.Invoke(
 5     () =>
 6     {
 7         //具体操作1
 8     },
 9     () =>
10     {
11         //具体操作2
12     });
13
14     //不使用lambda
15     Parallel.Invoke(Operation1, Operation2);
16 }
17
18 private static void Operation1()
19 {
20     //具体操作1
21 }
22
23 private static void Operation2()
24 {
25     //具体操作2
26 }        

8.2 Parallel.For

1 串行循环:
2 int toExclusive = ...;
3 for(int i =0;i<=toExclusive;i++){};
4
5 对应的并行循环:
6 Parallel.For(0, toExclusive+1, (i) =>
7 {
8     //具体操作
9 });

8.3 Parallel.ForEach

 1 一般用法
 2 IEnumerable<string> coll = ...;
 3 Parallel.ForEach(coll,(str)=>
 4 {
 5     //具体操作
 6 });
 7
 8 基于分区的模式
 9 优化分区数,使其最接近系统逻辑内核数:
10 子分区范围 = 对“待处理集合大小/系统逻辑内核数”取整+1。
11 int logicalCores =...;
12 IEnumerable<string> collP = ...;
13 int fromInclusive = ...;
14 int toExclusiv = ...;
15 int rangeSize = (int)((toExclusiv-fromInclusive )/logicalCores) +1;
16 Parallel.ForEach(Partitioner.Create(fromInclusive, toExclusiv, rangeSize), range =>
17 {
18     for (int i = range.Item1; i < range.Item2; i++)
19     {
20         //使用集合:collection[i]
21     }
22 });    

8.4 从循环外部取消并行循环

注意:不使用IsCancellationRequested或ThrowIfCancellationRequested()的情况下无法捕获类型为AggregateException的异常。

1)对于Parallel.For

使用IsCancellationRequested属性

 1 public static void CancelFromExternal()
 2 {
 3     CancellationTokenSource cts = new CancellationTokenSource();
 4     //其他操作...
 5
 6     //异步执行Operation方法
 7     Task.Factory.StartNew(()=>{Operation(cts);});
 8     //异步执行condition的计算过程
 9     Task.Factory.StartNew(()=>{
10         bool condition = ...;
11         if (condition) cts.Cancel();
12     }
13
14     //其他操作...
15 }
16
17 private static void Operation(CancellationTokenSource cts)
18 {
19     CancellationToken ct = cts.Token;
20     ParallelOptions op = new ParallelOptions { CancellationToken = ct };
21     int toExclusive = ...;
22     Parallel.For(0, toExclusive, op, (i) =>
23     {
24
25         //其他操作...
26
27         //return只对当前子线程有效
28         if (ct.IsCancellationRequested)
29         { return; }
30
31         //其他操作...
32     });
33 }                    

使用ThrowIfCancellationRequested()方法抛出异常

将上面的并行循环部分替换为下面的代码:

1 Parallel.For(0, toExclusive, op, (i) =>
2 {
3
4     //其他操作...
5
6     ct.ThrowIfCancellationRequested();
7
8     //其他操作...
9 });

不使用IsCancellationRequested和ThrowIfCancellationRequested()方法

将Operation方法中的涉及到IsCancellationRequested和ThrowIfCancellationRequested()方法的代码去掉

2)对于Parallel.ForEach

使用IsCancellationRequested属性

 1 public static void CancelFromExternal()
 2 {
 3     //同1)中CancelFromExternal方法
 4 }
 5
 6 private static void Operation(CancellationTokenSource cts)
 7 {
 8     CancellationToken ct = cts.Token;
 9     ParallelOptions op = new ParallelOptions { CancellationToken = ct };
10     IEnumerable<string> coll = new List<string> { "str1", "str2" };
11     Parallel.ForEach(coll, op,(str, loopState) =>
12     {
13         //其他操作...
14
15       //return只对当前子线程有效
16       if (ct.IsCancellationRequested)
17       { return; }
18
19       //其他操作...
20   });
21 }

使用ThrowIfCancellationRequested()方法抛出异常

将Operation方法中的:

if (ct.IsCancellationRequested)

{ return; }

替换为:

ct.ThrowIfCancellationRequested();

不使用IsCancellationRequested和ThrowIfCancellationRequested()方法

将Operation方法中的涉及到IsCancellationRequested和ThrowIfCancellationRequested()方法的代码去掉

8.5 指定并行度

1 int maxDegreeOfParallelism = Environment.ProcessorCount - 1;
2 ParallelOptions op = new ParallelOptions { MaxDegreeOfParallelism =     maxDegreeOfParallelism };
3 IEnumerable<string> coll = new List<string> { };
4 Parallel.ForEach(coll, op ,(str) =>
5 {
6     //具体操作
7 });

8.6 提早退出并行循环

1)对于Parallel.For

 1 int toExclusive = 10;
 2 Parallel.For(0, toExclusive, (i, loopState) =>
 3 {
 4     //其他操作...
 5     //计算condition
 6     bool condition = ...;
 7     if (condition)
 8     {
 9         loopState.Break();//或使用loopState.Stop
10         return;
11     }
12
13     //其他操作
14 });    

2)对于Parallel.ForEach

 1 IEnumerable<string> coll = new List<string> {"str1","str2" };
 2 Parallel.ForEach(coll, (str, loopState) =>
 3 {
 4     //其他操作...
 5
 6     //计算condition
 7     bool condition = ...;
 8     if (condition)
 9     {
10         loopState.Break();//或使用loopState.Stop
11         return;
12     }
13
14     //其他操作
15
16 });

9 异常处理模式

基本形式

在确保使用AggregateException 能够捕捉到所有的异常时,可以省去catch(Exception e)的部分。

 1 try
 2 {
 3     //Do something
 4 }
 5 catch(AggregateException e)
 6 {
 7     Foreach(Exception ex in e.InnerExceptions)
 8     {
 9         //Do something
10     }
11 }
12 catch(Exception e)
13 {
14     //Do something
15 }

时间: 2024-10-09 20:17:12

Net多线程编程(转载)的相关文章

VC++6.0 下配置 pthread库2010年12月12日 星期日 13:14VC下的pthread多线程编程 转载

VC++6.0 下配置 pthread库2010年12月12日 星期日 13:14VC下的pthread多线程编程     转载 #include <stdio.h>#include <stdlib.h>#include <pthread.h> void* tprocess1(void* args){       int i=1;       while(i<=10){            printf("process1:%d\n",i);

Java多线程编程模式实战指南(三):Two-phase Termination模式--转载

本文由本人首次发布在infoq中文站上:http://www.infoq.com/cn/articles/java-multithreaded-programming-mode-two-phase-termination.转载请注明作者: 黄文海 出处:http://viscent.iteye.com. 停止线程是一个目标简单而实现却不那么简单的任务.首先,Java没有提供直接的API用于停止线程.此外,停止线程时还有一些额外的细节需要考虑,如待停止的线程处于阻塞(等待锁)或者等待状态(等待其它

Java多线程编程模式实战指南(二):Immutable Object模式--转载

本文由本人首次发布在infoq中文站上:http://www.infoq.com/cn/articles/java-multithreaded-programming-mode-immutable-object.转载请注明作者: 黄文海 出处:http://viscent.iteye.com. 多线程共享变量的情况下,为了保证数据一致性,往往需要对这些变量的访问进行加锁.而锁本身又会带来一些问题和开销.Immutable Object模式使得我们可以在不使用锁的情况下,既保证共享变量访问的线程安

多线程编程学习总结(转载)

线程的概念和原理 为什么使用多线程? 为了更高效的完成任务和利用CPU资源,现在的操作系统设计为多任务操作系统,而多进程和多线程是实现多任务的方式. 什么是进程和线程? 进程是指一个内存中运行的应用程序,每个进程都有自己独立的一块内存空间,一个进程中可以启动多个线程.进程是OS分配资源的最小单位. 线程是指进程中的一个执行流程,一个进程中可以运行多个线程.线程总是属于某个进程,进程中的多个线程共享进程的内存.进程是OS调度的最小单位. 工作原理? 多线程是这样一种机制,它允许在程序中并发执行多个

转载自~浮云比翼:Step by Step:Linux C多线程编程入门(基本API及多线程的同步与互斥)

Step by Step:Linux C多线程编程入门(基本API及多线程的同步与互斥) 介绍:什么是线程,线程的优点是什么 线程在Unix系统下,通常被称为轻量级的进程,线程虽然不是进程,但却可以看作是Unix进程的表亲,同一进程中的多条线程将共享该进程中的全部系统资源,如虚拟地址空间,文件描述符和信号处理等等.但同一进程中的多个线程有各自的调用栈(call stack),自己的寄存器环境(register context),自己的线程本地存储(thread-local storage). 一

[转载] C++ 多线程编程总结

原文: http://www.cnblogs.com/zhiranok/archive/2012/05/13/cpp_multi_thread.html 在开发C++程序时,一般在吞吐量.并发.实时性上有较高的要求.设计C++程序时,总结起来可以从如下几点提高效率: l  并发 l  异步 l  缓存 下面将我平常工作中遇到一些问题例举一二,其设计思想无非以上三点. 1任务队列 1.1    以生产者-消费者模型设计任务队列 生产者-消费者模型是人们非常熟悉的模型,比如在某个服务器程序中,当Us

Android中多线程编程(四)AsyncTask类的详细解释(附源码)

Android中多线程编程中AsyncTask类的详细解释 1.Android单线程模型 2.耗时操作放在非主线程中执行 Android主线程和子线程之间的通信封装类:AsyncTask类 1.子线程中更新UI 2.封装.简化异步操作. 3.AsyncTask机制:底层是通过线程池来工作的,当一个线程没有执行完毕,后边的线程是无法执行的.必须等前边的线程执行完毕后,后边的线程才能执行. AsyncTask类使用注意事项: 1.在UI线程中创建AsyncTask的实例 2.必须在UI线程中调用As

java笔记--使用线程池优化多线程编程

使用线程池优化多线程编程 认识线程池 在Java中,所有的对象都是需要通过new操作符来创建的,如果创建大量短生命周期的对象,将会使得整个程序的性能非常的低下.这种时候就需要用到了池的技术,比如数据库连接池,线程池等. 在java1.5之后,java自带了线程池,在util包下新增了concurrent包,这个包主要作用就是介绍java线程和线程池如何使用的. 在包java.util.concurrent下的 Executors类中定义了Executor.ExecutorService.Sche

Linux多线程编程-互斥锁

互斥锁 多线程编程中,(多线程编程)可以用互斥锁(也称互斥量)可以用来保护关键代码段,以确保其独占式的访问,这有点像二进制信号量.POSIX互斥锁相关函数主要有以下5个: #include <pthread.h> int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr); int pthread_mutex_destroy(pthread_mutex_t *mutex); int p

Linux多线程编程-条件变量

条件变量 如果说线程间的互斥锁是用来同步共享数据的访问的话,那么条件变量是用于线程之间共享数据的值.条件变量提供了一种线程之间的通知机制,当某个共享数据达到某个值时,唤醒等待这个共享数据的线程.条件变量相关函数主要 有5个: #include <pthread.h> int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *cond_attr); int pthread_cond_destroy(pthread_