看看Parallel中高度封装的三个方法,Invoke,For和ForEach

  说到.net中的并行编程,也许你的第一反应就是Task,确实Task是一个非常灵活的用于并行编程的一个专用类,不可否认越灵活的东西用起来就越

复杂,高度封装的东西用起来很简单,但是缺失了灵活性,这篇我们就看看这些好用但灵活性不高的几个并行方法。

一:Invoke

  现在电子商务的网站都少不了订单的流程,没有订单的话网站也就没有存活的价值了,往往在订单提交成功后,通常会有这两个操作,第一个:发起

信用卡扣款,第二个:发送emial确认单,这两个操作我们就可以在下单接口调用成功后,因为两个方法是互不干扰的,所以就可以用invoke来玩玩了。

 1         static void Main(string[] args)
 2         {
 3             Parallel.Invoke(Credit, Email);
 4
 5             Console.Read();
 6         }
 7
 8         static void Credit()
 9         {
10             Console.WriteLine("******************  发起信用卡扣款中  ******************");
11
12             Thread.Sleep(2000);
13
14             Console.WriteLine("扣款成功!");
15         }
16
17         static void Email()
18         {
19             Console.WriteLine("******************  发送邮件确认单!*****************");
20
21             Thread.Sleep(3000);
22
23             Console.WriteLine("email发送成功!");
24         }

怎么样,实现起来是不是很简单,只要把你需要的方法塞给invoke就行了,不过在这个方法里面有一个重载参数需要注意下,

1  public static void Invoke(ParallelOptions parallelOptions, params Action[] actions);

有时候我们的线程可能会跑遍所有的内核,为了提高其他应用程序的稳定性,就要限制参与的内核,正好ParallelOptions提供了

MaxDegreeOfParallelism属性。

好了,下面我们大概翻翻invoke里面的代码实现,发现有几个好玩的地方:

<1>: 当invoke中的方法超过10个话,我们发现它走了一个internal可见的ParallelForReplicatingTask的FCL内部专用类,而这个类是继承自

   Task的,当方法少于10个的话,才会走常规的Task.

<2> 居然发现了一个装exception 的ConcurrentQueue<Exception>队列集合,多个异常入队后,再包装成AggregateException抛出来。

比如:throw new AggregateException(exceptionQ);

<3> 我们发现,不管是超过10个还是小于10个,都是通过WaitAll来等待所有的执行,所以缺点就在这个地方,如果某一个方法执行时间太长

   不能退出,那么这个方法是不是会长期挂在这里不能出来,也就导致了主流程一直挂起,然后页面就一直挂起,所以这个是一个非常危险

的行为,如果我们用task中就可以在waitall中设置一个过期时间,但invoke却没法做到,所以在使用invoke的时候要慎重考虑。

  1     try
  2     {
  3         if (actionsCopy.Length > 10 || (parallelOptions.MaxDegreeOfParallelism != -1 && parallelOptions.MaxDegreeOfParallelism < actionsCopy.Length))
  4         {
  5             ConcurrentQueue<Exception> exceptionQ = null;
  6             try
  7             {
  8                 int actionIndex = 0;
  9                 ParallelForReplicatingTask parallelForReplicatingTask = new ParallelForReplicatingTask(parallelOptions, delegate
 10                 {
 11                     for (int l = Interlocked.Increment(ref actionIndex); l <= actionsCopy.Length; l = Interlocked.Increment(ref actionIndex))
 12                     {
 13                         try
 14                         {
 15                             actionsCopy[l - 1]();
 16                         }
 17                         catch (Exception item)
 18                         {
 19                             LazyInitializer.EnsureInitialized<ConcurrentQueue<Exception>>(ref exceptionQ, () => new ConcurrentQueue<Exception>());
 20                             exceptionQ.Enqueue(item);
 21                         }
 22                         if (parallelOptions.CancellationToken.IsCancellationRequested)
 23                         {
 24                             throw new OperationCanceledException(parallelOptions.CancellationToken);
 25                         }
 26                     }
 27                 }, TaskCreationOptions.None, InternalTaskOptions.SelfReplicating);
 28                 parallelForReplicatingTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler);
 29                 parallelForReplicatingTask.Wait();
 30             }
 31             catch (Exception ex2)
 32             {
 33                 LazyInitializer.EnsureInitialized<ConcurrentQueue<Exception>>(ref exceptionQ, () => new ConcurrentQueue<Exception>());
 34                 AggregateException ex = ex2 as AggregateException;
 35                 if (ex != null)
 36                 {
 37                     using (IEnumerator<Exception> enumerator = ex.InnerExceptions.GetEnumerator())
 38                     {
 39                         while (enumerator.MoveNext())
 40                         {
 41                             Exception current = enumerator.Current;
 42                             exceptionQ.Enqueue(current);
 43                         }
 44                         goto IL_264;
 45                     }
 46                 }
 47                 exceptionQ.Enqueue(ex2);
 48                 IL_264:;
 49             }
 50             if (exceptionQ != null && exceptionQ.Count > 0)
 51             {
 52                 Parallel.ThrowIfReducableToSingleOCE(exceptionQ, parallelOptions.CancellationToken);
 53                 throw new AggregateException(exceptionQ);
 54             }
 55         }
 56         else
 57         {
 58             Task[] array = new Task[actionsCopy.Length];
 59             if (parallelOptions.CancellationToken.IsCancellationRequested)
 60             {
 61                 throw new OperationCanceledException(parallelOptions.CancellationToken);
 62             }
 63             for (int j = 0; j < array.Length; j++)
 64             {
 65                 array[j] = Task.Factory.StartNew(actionsCopy[j], parallelOptions.CancellationToken, TaskCreationOptions.None, InternalTaskOptions.None, parallelOptions.EffectiveTaskScheduler);
 66             }
 67             try
 68             {
 69                 if (array.Length <= 4)
 70                 {
 71                     Task.FastWaitAll(array);
 72                 }
 73                 else
 74                 {
 75                     Task.WaitAll(array);
 76                 }
 77             }
 78             catch (AggregateException ex3)
 79             {
 80                 Parallel.ThrowIfReducableToSingleOCE(ex3.InnerExceptions, parallelOptions.CancellationToken);
 81                 throw;
 82             }
 83             finally
 84             {
 85                 for (int k = 0; k < array.Length; k++)
 86                 {
 87                     if (array[k].IsCompleted)
 88                     {
 89                         array[k].Dispose();
 90                     }
 91                 }
 92             }
 93         }
 94     }
 95     finally
 96     {
 97         if (TplEtwProvider.Log.IsEnabled())
 98         {
 99             TplEtwProvider.Log.ParallelInvokeEnd((task != null) ? task.m_taskScheduler.Id : TaskScheduler.Current.Id, (task != null) ? task.Id : 0, forkJoinContextID);
100         }
101     }

二:For

下面再看看Parallel.For,我们知道普通的For是一个串行操作,如果说你的for中每条流程都需要执行一个方法,并且这些方法可以并行操作且

比较耗时,那么为何不尝试用Parallel.For呢,就比如下面的代码。

 1 class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             List<Action> actions = new List<Action>() { Credit, Email };
 6
 7             var result = Parallel.For(0, actions.Count, (i) =>
 8             {
 9                 actions[i]();
10             });
11
12             Console.WriteLine("执行状态:" + result.IsCompleted);
13
14             Console.Read();
15         }
16
17         static void Credit()
18         {
19             Console.WriteLine("******************  发起信用卡扣款中  ******************");
20
21             Thread.Sleep(2000);
22
23             Console.WriteLine("扣款成功!");
24         }
25
26         static void Email()
27         {
28             Console.WriteLine("******************  发送邮件确认单!*****************");
29
30             Thread.Sleep(3000);
31
32             Console.WriteLine("email发送成功!");
33         }
34     }

下面我们再看看Parallel.For中的最简单的重载和最复杂的重载:

1 public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int> body);
2
3 public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally);
4  

<1> 简单的重载不必多说,很简单,我上面的例子也演示了。

<2> 最复杂的这种重载提供了一个AOP的功能,在每一个body的action执行之前会先执行localInit这个action,在body之后还会执行localFinally

这个action,有没有感觉到已经把body切成了三块?好了,下面看一个例子。

 1     static void Main(string[] args)
 2         {
 3             var list = new List<int>() { 10, 20, 30, 40 };
 4
 5             var options = new ParallelOptions();
 6
 7             var total = 0;
 8
 9             var result = Parallel.For(0, list.Count, () =>
10             {
11                 Console.WriteLine("------------  thead --------------");
12
13                 return 1;
14             },
15               (i, loop, j) =>
16               {
17                   Console.WriteLine("------------  body --------------");
18
19                   Console.WriteLine("i=" + list[i] + " j=" + j);
20
21                   return list[i];
22               },
23               (i) =>
24               {
25                   Console.WriteLine("------------  tfoot --------------");
26
27                   Interlocked.Add(ref total, i);
28
29                   Console.WriteLine("total=" + total);
30               });
31
32             Console.WriteLine("iscompleted:" + result.IsCompleted);
33             Console.Read();
34         }

接下来我们再翻翻它的源代码,由于源码太多,里面神乎其神,我就找几个好玩的地方。

<1>  我在里面找到了一个rangeManager分区函数,代码复杂看不懂,貌似很强大。

 1         internal RangeManager(long nFromInclusive, long nToExclusive, long nStep, int nNumExpectedWorkers)
 2         {
 3             this.m_nCurrentIndexRangeToAssign = 0;
 4             this.m_nStep = nStep;
 5             if (nNumExpectedWorkers == 1)
 6             {
 7                 nNumExpectedWorkers = 2;
 8             }
 9             ulong num = (ulong)(nToExclusive - nFromInclusive);
10             ulong num2 = num / (ulong)((long)nNumExpectedWorkers);
11             num2 -= num2 % (ulong)nStep;
12             if (num2 == 0uL)
13             {
14                 num2 = (ulong)nStep;
15             }
16             int num3 = (int)(num / num2);
17             if (num % num2 != 0uL)
18             {
19                 num3++;
20             }
21             long num4 = (long)num2;
22             this.m_indexRanges = new IndexRange[num3];
23             long num5 = nFromInclusive;
24             for (int i = 0; i < num3; i++)
25             {
26                 this.m_indexRanges[i].m_nFromInclusive = num5;
27                 this.m_indexRanges[i].m_nSharedCurrentIndexOffset = null;
28                 this.m_indexRanges[i].m_bRangeFinished = 0;
29                 num5 += num4;
30                 if (num5 < num5 - num4 || num5 > nToExclusive)
31                 {
32                     num5 = nToExclusive;
33                 }
34                 this.m_indexRanges[i].m_nToExclusive = num5;
35             }
36         }

<2> 我又找到了这个神奇的ParallelForReplicatingTask类。

那么下面问题来了,在单线程的for中,我可以continue,可以break,那么在Parallel.For中有吗?因为是并行,所以continue基本上就没有

存在价值,break的话确实有价值,这个就是委托中的ParallelLoopState做到的,并且还新增了一个Stop。

三:ForEach

其实ForEach和for在本质上是一样的,你在源代码中会发现在底层都是调用一个方法的,而ForEach会在底层中调用for共同的函数之前还会执行

其他的一些逻辑,所以这就告诉我们,能用Parallel.For的地方就不要用Parallel.ForEach,其他的都一样了,这里就不赘述了。

时间: 2024-12-27 21:53:22

看看Parallel中高度封装的三个方法,Invoke,For和ForEach的相关文章

spring在xml文件中配置bean的三种方法

一.最常见,也是缺省,是调用spring的缺省工厂类 spring缺省工厂类:org.springframework.beans.factory.support.DefaultListableBeanFactory使用其静态方法preInstantiateSingletons() 配置文件中最普通最基本的定义一个普通bean<bean id="DvdTypeDAOBean" class="com.machome.dvd.impl.DvdTypeDAO" >

Openerp 中打开 URL 的三种 方法

来自:http://shine-it.net/index.php/topic,8013.0.html 最近总结了,Openerp 中打开 URL 的三种 方法: 一.在form view 添加 <a>标签 二.使用url widget, <field name="field_name" widget="url"/> 三.使用按钮,return { 'type': 'ir.actions.act_url', 'http://www.opener

C#中datatable导出excel(三种方法)

方法一:(拷贝直接可以使用,适合大批量资料, 上万笔)Microsoft.Office.Interop.Excel.Application appexcel = new Microsoft.Office.Interop.Excel.Application();SaveFileDialog savefiledialog = new SaveFileDialog();System.Reflection.Missing miss = System.Reflection.Missing.Value;ap

cocos2dx中创建动画的三种方法

1.最最原始的方法,先创建动画帧,再创建动画打包(animation),再创建动画(animate) 第一步: 创建动画帧:CCSpriteFrame,依赖于原始的资源图片(xx.png,xx.jpg) CCSpriteFrame *frame1=CCSpriteFrame::create("1.png"); CCSpriteFrame *frame2=CCSpriteFrame::create("2.png"); CCSpriteFrame *frame3=CCS

mfc 在VC的两个对话框类中传递参数的三种方法

弄了好久,今天终于把在VC中的对话框类之间传递参数的问题解决了,很开心,记录如下: 1. 我所建立的工程是一个基于MFC对话框的应用程序,一共有三个对话框,第一个对话框为主对话框,所对应的类为CTMDDDlg类.在主对话框上我放置了一个标签页(Tab Control)控件,其实现的功能是当单击标签提示A时进入页面A,即对话框A(所对应的类为CDialogChild1),单击B时进入对话框B(CDialogChild2). 整个工程的框架已经设计好了,在对话框A和对话框B上放置了许多控件,现在我想

ASP.NET中身份验证的三种方法

Asp.net的身份验证有有三种,分别是"Windows | Forms | Passport",其中又以Forms验证用的最多,也最灵活.Forms 验证方式对基于用户的验证授权提供了很好的支持,可以通过一个登录页面验证用户的身份,将此用户的身份发回到客户端的Cookie,之后此用户再访问这个web应用就会连同这个身份Cookie一起发送到服务端.服务端上的授权设置就可以根据不同目录对不同用户的访问授权进行控制了. 问题来了,在实际是用中我们往往需要的是基于角色,或者说基于用户组的验

android应用中去掉标题栏的三种方法

在android中去掉标题栏有三种方法,它们也有各自的特点. 1.在代码里实现 this.requestWindowFeature(Window.FEATURE_NO_TITLE);//去掉标题栏 记住:这句代码要写在setContentView()前面. 2.在清单文件(manifest.xml)里面实现 <application android:icon="@drawable/icon" android:label="@string/app_name" a

UILabel实现上下左右内边距和自适用高度的计算三种方法

p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 14.0px "PingFang SC"; color: #000000; background-color: rgba(0, 0, 0, 0) } p.p2 { margin: 0.0px 0.0px 0.0px 0.0px; font: 14.0px "PingFang SC"; color: #000000; background-color: rgba(0, 0, 0

linux中传文件的三种方法、windows

第一种方法:使用vsftpd服务 ftp配置文件主要内容:[[email protected] vsftpd]# cat vsftpd.conf|grep -v ^#|grep -v ^$anonymous_enable=YES #匿名登陆local_enable=YES #允许使用本地用户来登陆ftplocal_root=/var/ftp/pub #write_enable=YES #开放本地用户写的权限local_umask=022 #FTP上本地的文件权限,默认是077 anon_uplo