在面对相互独立的数据或者相互独立的任务时,也许正是Parallel登场的时候。
比如说有一个盒子的集合,分别让盒子旋转一定的角度。
void RotateBox(IEnumerable<Box> boxes, float degree){Parallel.ForEach(boxes, box => box.Rotate(degree));}
如果并行任务中的一个任务出现异常,需要结束出问题的任务呢?
Parallel.ForEach为我们提供了一个重载方法,可以控制任务是否继续。
void RotateBoxes(IEnumerable<Box> boxes){Parallel.ForEach(boxes, (box, state) => {if(!box.IsInvertible){state.Stop();}else{box.Invert();}});}
如果想取消整个并行任务呢?
Parallel.ForEach也为我们提供一个重载方法,可以接收一个ParallelOption的形参,通过设置ParallelOption的CancellationToken属性来取消整个并行过程。
void RotateBoxes(IEnumerable<Box> boxes,float degrees, CancellationToken token){Paralle.ForEach(boxes,new ParallelOptions{CancellationToken=token},box => box.Rotate(degrees));}
在使用的时候,一般先定义个全局CancellationTokenSource类型的变量。
static CancellationTokenSource token = new CancellationTokenSource();
然后,在某个并行任务中设置取消。
token.Cancel();
最后,再把这个token赋值给以上方法的CancellationToken属性。
各个并行任务如何共享状态,共享一个变量呢?
int InvertBoxes(IEnumerable<Box> boxes){object mutex = new object();//用来锁int nonInvertibleCount = 0; //各任务共享的变量Paralle.ForEach(boxes, box =>{if(box.IsInvertible){box.Invert();}else{lock(mutex){++nonInvertibleCount;}}});return nonInvertibleCount;}
可见,对于各并行线程共享的变量,需要加一个线程锁,以防止多个线程同时操作共享变量。
另外,Parallel.ForEach提供了一个重载,其中localFinally形参接收一个委托类型,通过该委托让并行任务共享一个变量。比如:
static int ParallelSum(IEnumerable<int> values){object mutex = new object();int result = 0;Parallel.ForEach(source: values,LocalInit: () => 0,body: (item, state, localVlaue) => localValue + item,localFinally: localValue => {lock(mutex){result += localValue;}});return result;}
当然,也别忘了PLINQ也支持并行:
static int ParalleSum(IEnumerable<int> values){return values.AsParallel().Sum();}
PLINQ的Aggregate方法也可实现:
static int ParallelSum(IEnumerable<int> values){return values.AsParallel().Aggregate(seed: 0,func: (sum, item) => sum + item;);}
以上,是对相互独立数据的处理。
那么,如何处理相互独立的多个任务呢?
通过Parallel.Invoke方法可以实现。
static voiD ProcessArray(double[] array){Parallel.Invoke(() => ProcessPartialArray(array, 0, array.Length/2),() => ProcessPartialArray(array, array.Length/2, array.Length));}static void ProcessPartialArray(double[] array, int begin, int end){}
使用Parallel.Invoke方法还可以让一个Action或这方法执行多次。
static void Do20(Action action){//让某个Action执行20次Action[] actions = Enumerable.Repeat(action, 20).ToArray();Parallel.Invoke(actions);}
Parallel.Invoke方法也提供了重载,接收ParallelOption类型的形参,用来控制取消整个并行过程。
static void Do20(Action action){//让某个Action执行20次Action[] actions = Enumerable.Repeat(action, 20).ToArray();Parallel.Invoke(new ParallelOptions{CancellationToken = token},actions);}
参考资料:C#并发编程经典实例