Parallel

介绍
C# 4.0 的新特性之并行运算

  • Parallel.For - for 循环的并行运算
  • Parallel.ForEach - foreach 循环的并行运算
  • Parallel.Invoke - 并行调用多个任务
  • Task - 任务,基于线程池。其使我们对并行编程变得更简单,且不用关心底层是怎么实现的
  • PLINQ - 用于对内存中的数据做并行运算,也就是说其只支持 LINQ to Object 的并行运算

示例
1、Parallel.For 的 Demo
Parallel/ParallelFor.aspx.cs

代码

using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using System.Web.UI;
using System.Web.UI.WebControls;

namespace CSharp.Parallel
{
    public partial class ParallelFor : System.Web.UI.Page
    {
        protected void Page_Load(object sender, EventArgs e)
        {
            Normal();
            ParallelForDemo();
        }

private void Normal()
        {
            DateTime dt = DateTime.Now;

for (int i = 0; i < 20; i++)
            {
                GetData(i);
            }

Response.Write((DateTime.Now - dt).TotalMilliseconds.ToString());
            Response.Write("<br />");
            Response.Write("<br />");
        }

private void ParallelForDemo()
        {
            DateTime dt = DateTime.Now;

// System.Threading.Tasks.Parallel.For - for 循环的并行运算
            System.Threading.Tasks.Parallel.For(0, 20, (i) => { GetData(i); });

Response.Write((DateTime.Now - dt).TotalMilliseconds.ToString());
            Response.Write("<br />");
        }

private int GetData(int i)
        {
            System.Threading.Thread.Sleep(100);
            Response.Write(i.ToString());
            Response.Write("<br />");
            return i;
        }
    }
}

/*
运行结果:
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2000.0514

0
13
1
19
7
12
18
6
2
8
10
14
4
16
5
3
15
17
9
11
300.0077
*/

2、Parallel.ForEach 的 Demo
Parallel/ParallelForEach.aspx.cs

代码

using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using System.Web.UI;
using System.Web.UI.WebControls;

namespace CSharp.Parallel
{
    public partial class ParallelForEach : System.Web.UI.Page
    {
        private List<int> _data = new List<int>();

protected void Page_Load(object sender, EventArgs e)
        {
            InitData();

Normal();
            ParallelForEachDemo();
        }

private void InitData()
        {
            _data.Clear();
            for (int i = 0; i < 20; i++)
            {
                _data.Add(i);
            }
        }

private void Normal()
        {
            DateTime dt = DateTime.Now;

for (int i = 0; i < 20; i++)
            {
                GetData(i);
            }

Response.Write((DateTime.Now - dt).TotalMilliseconds.ToString());
            Response.Write("<br />");
            Response.Write("<br />");
        }

private void ParallelForEachDemo()
        {
            DateTime dt = DateTime.Now;

// System.Threading.Tasks.Parallel.ForEach - foreach 循环的并行运算
            System.Threading.Tasks.Parallel.ForEach(_data, (index) => { GetData(index); });

Response.Write((DateTime.Now - dt).TotalMilliseconds.ToString());
            Response.Write("<br />");
        }

private int GetData(int i)
        {
            System.Threading.Thread.Sleep(100);
            Response.Write(i.ToString());
            Response.Write("<br />");
            return i;
        }
    }
}

/*
运行结果:
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2000.0514

0
6
12
18
1
2
7
13
19
4
3
8
14
9
5
15
10
16
11
17
600.0154
*/

3、Parallel.Invoke 的 Demo
Parallel/ParallelInvoke.aspx.cs

代码

using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using System.Web.UI;
using System.Web.UI.WebControls;

using System.Threading;

namespace CSharp.Parallel
{
    public partial class ParallelInvoke : System.Web.UI.Page
    {
        protected void Page_Load(object sender, EventArgs e)
        {
            var tasks = new Action[] { () => Task1(), () => Task2(), () => Task3() };

// System.Threading.Tasks.Parallel.Invoke - 并行调用多个任务
            System.Threading.Tasks.Parallel.Invoke(tasks);
        }

private void Task1()
        {
            Thread.Sleep(3000);
            Response.Write("Task1 - " + "ThreadId:" + Thread.CurrentThread.ManagedThreadId.ToString() + " - " + DateTime.Now.ToString("HH:mm:ss"));
            Response.Write("<br />");
        }

private void Task2()
        {
            System.Threading.Thread.Sleep(3000);
            Response.Write("Task2 - " + "ThreadId:" + Thread.CurrentThread.ManagedThreadId.ToString() + " - " + DateTime.Now.ToString("HH:mm:ss"));
            Response.Write("<br />");
        }

private void Task3()
        {
            System.Threading.Thread.Sleep(3000);
            Response.Write("Task3 - " + "ThreadId:" + Thread.CurrentThread.ManagedThreadId.ToString() + " - " + DateTime.Now.ToString("HH:mm:ss"));
            Response.Write("<br />");
        }
    }
}

/*
运行结果:
Task2 - ThreadId:26 - 09:11:58
Task1 - ThreadId:25 - 09:11:58
Task3 - ThreadId:24 - 09:11:58
*/

4、Task 的 Demo
Parallel/ParallelTask.aspx.cs

代码

/*
Task - 任务,基于线程池。其使我们对并行编程变得更简单,且不用关心底层是怎么实现的
*/

using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using System.Web.UI;
using System.Web.UI.WebControls;

using System.Threading;
using System.Threading.Tasks;

namespace CSharp.Parallel
{   
    public partial class ParallelTask : System.Web.UI.Page
    {
        protected void Page_Load(object sender, EventArgs e)
        {
            /*
             * CancellationTokenSource - 取消任务的操作需要用到的一个类
             *     Token - 一个 CancellationToken 类型的对象,用于通知取消指定的操作
             *     IsCancellationRequested - 是否收到了取消操作的请求
             *     Cancel() - 结束任务的执行
             * ParallelOptions - 并行运算选项
             *     CancellationToken - 设置一个 Token,用于取消任务时的相关操作
             *     MaxDegreeOfParallelism - 指定一个并行循环最多可以使用多少个线程
             */

CancellationTokenSource cts = new CancellationTokenSource();
            ParallelOptions pOption = new ParallelOptions() { CancellationToken = cts.Token };
            pOption.MaxDegreeOfParallelism = 10;

Response.Write("开始执行,3.5 秒后结束");
            Response.Write("<br />");

/*
             * Task - 任务类
             *     Factory.StartNew() - 创建并开始一个或一批新任务
             *     ContinueWith() - 此任务完成后执行指定的另一个任务
             *     AsyncState - 此任务的上下文对象
             *     Wait() - 阻塞,直到任务完成
             */

Task task0 = Task.Factory.StartNew(() =>
            {
                Thread.Sleep(3500);
                cts.Cancel();
                Response.Write("结束");
                Response.Write("<br />");

});

// 通过 System.Threading.Tasks.Parallel.Invoke 执行任务的时候,可以加入 ParallelOptions 参数,用于对此并行运算做一些配置
            System.Threading.Tasks.Parallel.Invoke(pOption,
                () => Task1(pOption.CancellationToken),
                () => Task2(pOption.CancellationToken));

/*
             * 一个 Task 内可以包含多个 Task
            Task tasks = new Task(() => 
            {
                Task.Factory.StartNew(() => Method()); 
                Task.Factory.StartNew(() => Method2()); 
                Task.Factory.StartNew(() => Method3()); 
            }); 
            tasks.Start(); 
            // 阻塞,直到整个任务完成
            tasks.Wait(); 
            */

/*
             * 带返回值的 Task
            Func<object, long> fun = delegate(object state)
            {
                return 1.0;
            };
            Task<long> tsk = new Task<long>(fun, "state");
            tsk.Start();
            Response.Write(tsk.Result.ToString()); 
            */
        }
       
        private void Task1(CancellationToken token)
        {
            // 每隔 1 秒执行一次,直到此任务收到了取消的请求
            // 注意:虽然此处是其他线程要向主线程(UI线程)上输出信息,但因为使用了 Task ,所以不用做任何处理
            while (!token.IsCancellationRequested)
            {
                Response.Write("Task1 - " + "ThreadId: " + Thread.CurrentThread.ManagedThreadId.ToString());
                Response.Write("<br />");
                Thread.Sleep(1000);
            }

}
        private void Task2(CancellationToken token)
        {
            while (!token.IsCancellationRequested)
            {
                Response.Write("Task2 - " + "ThreadId: " + Thread.CurrentThread.ManagedThreadId.ToString());
                Response.Write("<br />");
                Thread.Sleep(1000);
            }
        }
    }
}

/*
运行结果:
开始执行,3.5 秒后结束
Task2 - ThreadId: 6
Task1 - ThreadId: 48
Task1 - ThreadId: 48
Task2 - ThreadId: 6
Task2 - ThreadId: 6
Task1 - ThreadId: 48
Task2 - ThreadId: 6
Task1 - ThreadId: 48
结束
*/

5、PLINQ 的 Demo
Parallel/ParallelPLINQ.aspx.cs

代码

/*
PLINQ - 用于对内存中的数据做并行运算,也就是说其只支持 LINQ to Object 的并行运算
*/

using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using System.Web.UI;
using System.Web.UI.WebControls;

namespace CSharp.Parallel
{
    public partial class ParallelPLINQ : System.Web.UI.Page
    {
        protected void Page_Load(object sender, EventArgs e)
        {
            List<int> list = new List<int>();
            for (int i = 0; i < 100; i++)
            {
                list.Add(i);
            }

// AsParallel() - 并行运算
            // AsSequential() - 串行运算
            // AsOrdered() - 保持数据的原有顺序(AsSequential()指的是串行运算;AsOrdered()指的是如果在并行运算的前提下,它会把结果先缓存,然后排序,最后再把排序后的数据做输出)
            // AsUnordered() - 可以不必保持数据的原有顺序
            // WithDegreeOfParallelism() - 明确地指出需要使用多少个线程来完成工作
            // WithCancellation(new CancellationTokenSource().Token) - 指定一个 CancellationToken 类型的参数

ParallelQuery nums = from num in list.AsParallel<int>().AsOrdered<int>()
                                 where num % 10 == 0
                                 select num;

foreach (var num in nums)
            {
                Response.Write(num.ToString());
                Response.Write("<br />");
            }

// 聚合方法也可以做并行运算
            Response.Write(list.AsParallel().Average().ToString());
            Response.Write("<br />");

// 自定义聚合方法做并行运算的 Demo(实现一个取集合的平均值的功能)
            double myAggregateResult = list.AsParallel().Aggregate(
                // 聚合变量的初始值
                0d,

// 在每个数据分区上,计算此分区上的数据
                // 第一个参数:对应的数据分区的计算结果;第二个参数:对应的数据分区的每个数据项
                (value, item) => 
                {
                    double result = value + item;
                    return result; 
                },

// 根据每个数据分区上的计算结果,再次做计算
                // 第一个参数:全部数据的计算结果;第二个参数:每个数据分区上的计算结果
                (value, data) =>
                {
                    double result = value + data;
                    return result;
                },

// 根据全部数据的计算结果再次计算,得到最终的聚合结果
                (result) => result / list.Count
            );

Response.Write(myAggregateResult.ToString());
        } 
    }
}

/*
运行结果:
0
10
20
30
40
50
60
70
80
90
49.5
49.5 
*/

注:关于并行运算的实例可以参考
http://code.msdn.microsoft.com/ParExtSamples

时间: 2024-10-24 16:06:41

Parallel的相关文章

大量数据快速插入方法探究[nologging+parallel+append]

大量数据快速插入方法探究 快速插入千万级别的数据,无非就是nologging+parallel+append. 1     环境搭建 构建一个千万级别的源表,向一个空表insert操作. 参考指标:insert动作完成的实际时间. SQL> drop table test_emp cascadeconstraints purge; Table dropped. SQL> create table test_emp as select *from emp; Table created. SQL&

SMON: Parallel transaction recovery tried 引发的问题--转载

SMON: Parallel transaction recovery tried 这个一般是在具有在跑大数据量的 transaction的时候kill掉了进程而导致 smon 去清理 回滚段时导致的. 这个在业务高峰期的时候,如果发现这个,有可能导致 SMON 占用了 100% cpu 而导致 系统 hang 在那边.即使你shutdown immediate ,oracle 也会等待 smon 清理完毕才能关机,而这个等待过程也许是漫长的.如果你 shutdown abort,那么oracl

【狼窝乀野狼】Parallel浅尝辄止

前段时间看到园子里面有同学在用Parallel进行批量插入数据库.后面也有很多同学针对这一事件给出了自己的看法和见解.我在这里不评论内容的好坏,至少能将自己东西总结分享这个是要靠勇气和毅力. 闲话少说,我在最近看崔鹏飞的github的时候,发现他对这块也做了一定的总结,那么我就他这块进行板书与展示.案例是怎么回事呢?话说我有一个公司,里面需要统计一下总收入,另外有一个公司被我收购了,我一起计算总收入.当一天我收购了N个公司,计算总收入的时候,我们采用并行计算. 1 internal class

Parallel并行化编程

在很多场景中我们需要通过并行化的方式来提高程序运行的速度,比较典型的需求就是并行下载.前期遇到一个需求是要批量下载瓦片,每次大概下载上百万个瓦片,要想提高瓦片的下载速度,只能通过并行化的方式,下面把我解决此问题的思路和代码总结如下: 第一步确定线程个数(ThreadCount),这个要根据网络情况和硬件配置进行确定,可以做成一个配置项由用户自行确定. 第二步将任务分成ThreadCount个,此步需要注意处理任务数较少(小于线程个数)以及任务数除不尽ThreadCount的情况.处理方式为任务数

Parallel Programming-Task Base

Parallel.For/ForEach是数据层面的并行,本文所讲的Task是将不同的操作并行执行,本文主要内容: Task的工作模型 初始化Task 完成Task 取消Task 一.Task工作模型 .Net中Task的工作模式是Fork/Join或者Master/Worker模式.核心思想是Master负责接受Client的请求,并且负责将请求分配给最终的Wroker,Worker执行完自己的工作后分别返回给Master.由Master汇总最终的结果并且返回给Client. 在.Net的Ta

Parallel类

1,用Parallel.For()方法循环     class Program     {         static void Main(string[] args)         {             //=============================第一种             Parallel.For(0,//第一个参数:开始索引                 10,//第二个参数:最大索引                 (i, parallelLoopSta

分布式并行计算方案:parallel computing by kafka-storm 发布了

HighAvailabilityToolkit1.3 发布了 version 1.3,如何在分布式集群中,充分利用多节点,对大数据进行拆分,实现并行计算,"parallel computing by kafka-storm " 提供了一种很好的思路. 源码 : https://github.com/yfwangpeng/HighAvailabilityToolkit 微博:http://weibo.com/58wp58

Method and apparatus for an atomic operation in a parallel computing environment

A method and apparatus for a?atomic?operation?is described. A method comprises receiving a first program unit in a parallel computing environment, the first program unit including a memory update?operation?to be performed atomically, the memory updat

何时使用 Parallel.ForEach,何时使用 PLINQ

翻译自:When Should I Use Parallel.ForEach? When Should I Use PLINQ? 原作者: Pamela Vagata, Parallel Computing Platform Group, Microsoft Corporation 原文pdf:http://download.csdn.net/detail/sqlchen/7509513 ======================================================

c#执行并行任务之Parallel与TaskFactory

任务:几千条(大量)数据往服务器数据库填写.要求单开线程执行,分割成小数据包,多线程运行. 实现方法:Parallel与TaskFactory都可以. 主要代码: Parallel: Barrier _bar; int _maxLength = 20, _maxChannel = 2;//同时最多2条通道,每条通道最多20个数据 bool _isCancel = false; private void btnWrite_Click(object sender, EventArgs e) { va