并行开发 —— 第三篇 plinq的使用

1:AsParallel(并行化)

下面我们模拟给ConcurrentDictionary灌入1500w条记录,看看串行和并行效率上的差异,注意我的老爷机是2个硬件线程。

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Collections.Concurrent;
using System.Collections.Generic;

using System.Linq;

class Program
{
    static void Main(string[] args)
    {
        var dic = LoadData();

        Stopwatch watch = new Stopwatch();

        watch.Start();

        //串行执行
        var query1 = (from n in dic.Values
                      where n.Age > 20 && n.Age < 25
                      select n).ToList();

        watch.Stop();

        Console.WriteLine("串行计算耗费时间:{0}", watch.ElapsedMilliseconds);

        watch.Restart();

        var query2 = (from n in dic.Values.AsParallel()
                      where n.Age > 20 && n.Age < 25
                      select n).ToList();

        watch.Stop();

        Console.WriteLine("并行计算耗费时间:{0}", watch.ElapsedMilliseconds);

        Console.Read();
    }

    public static ConcurrentDictionary<int, Student> LoadData()
    {
        ConcurrentDictionary<int, Student> dic = new ConcurrentDictionary<int, Student>();

        //预加载1500w条记录
        Parallel.For(0, 15000000, (i) =>
        {
            var single = new Student()
            {
                ID = i,
                Name = "hxc" + i,
                Age = i % 151,
                CreateTime = DateTime.Now.AddSeconds(i)
            };
            dic.TryAdd(i, single);
        });

        return dic;
    }

    public class Student
    {
        public int ID { get; set; }

        public string Name { get; set; }

        public int Age { get; set; }

        public DateTime CreateTime { get; set; }
    }
}

执行的结果还是比较震撼的,将近7倍,这是因为plinq的查询引擎会尽量利用cpu的所有硬件线程

2:常用方法的使用

<1> orderby

有时候我们并不是简单的select一下就ok了,可能需要将结果进行orderby操作,并行化引擎会把要遍历的数据分区,然后在每个区上进行

orderby操作,最后来一个总的orderby,这里很像算法中的“归并排序”。

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Collections.Concurrent;
using System.Collections.Generic;

using System.Linq;

class Program
{
    static void Main(string[] args)
    {
        var dic = LoadData();

        var query1 = (from n in dic.Values.AsParallel()
                      where n.Age > 20 && n.Age < 25
                      select n).ToList();

        Console.WriteLine("默认的时间排序如下:");
        query1.Take(10).ToList().ForEach((i) =>
        {
            Console.WriteLine(i.CreateTime);
        });

        var query2 = (from n in dic.Values.AsParallel()
                      where n.Age > 20 && n.Age < 25
                      orderby n.CreateTime descending
                      select n).ToList();

        Console.WriteLine("排序后的时间排序如下:");
        query2.Take(10).ToList().ForEach((i) =>
        {
            Console.WriteLine(i.CreateTime);
        });

        Console.Read();
    }

    public static ConcurrentDictionary<int, Student> LoadData()
    {
        ConcurrentDictionary<int, Student> dic = new ConcurrentDictionary<int, Student>();

        //预加载1500w条记录
        Parallel.For(0, 15000000, (i) =>
        {
            var single = new Student()
            {
                ID = i,
                Name = "hxc" + i,
                Age = i % 151,
                CreateTime = DateTime.Now.AddSeconds(i)
            };
            dic.TryAdd(i, single);
        });

        return dic;
    }

    public class Student
    {
        public int ID { get; set; }

        public string Name { get; set; }

        public int Age { get; set; }

        public DateTime CreateTime { get; set; }
    }
}

<2> sum(),average()等等这些聚合函数的效果跟orderby类型一样,都是实现了类型归并排序的效果,这里就不举例子了。

3:指定并行度,这个我在前面文章也说过,为了不让并行计算占用全部的硬件线程,或许可能要留一个线程做其他事情。

var query2 = (from n in dic.Values.AsParallel()
                     .WithDegreeOfParallelism(Environment.ProcessorCount - 1)                                             where n.Age > 20 && n.Age < 25
                     orderby n.CreateTime descending
                     select n).ToList();

4: 了解ParallelEnumerable类

首先这个类是Enumerable的并行版本,提供了很多用于查询实现的一组方法,截个图,大家看看是不是很熟悉,要记住,他们都是并行的。

下面列举几个简单的例子。

class Program
{
    static void Main(string[] args)
    {
        ConcurrentBag<int> bag = new ConcurrentBag<int>();

        var list = ParallelEnumerable.Range(0, 10000);

        list.ForAll((i) =>
        {
            bag.Add(i);
        });

        Console.WriteLine("bag集合中元素个数有:{0}", bag.Count);

        Console.WriteLine("list集合中元素个数总和为:{0}", list.Sum());

        Console.WriteLine("list集合中元素最大值为:{0}", list.Max());

        Console.WriteLine("list集合中元素第一个元素为:{0}", list.FirstOrDefault());

        Console.Read();
    }
}

5: plinq实现MapReduce算法

mapReduce是一个非常流行的编程模型,用于大规模数据集的并行计算,非常的牛X啊,记得mongodb中就用到了这个玩意。

map:  也就是“映射”操作,可以为每一个数据项建立一个键值对,映射完后会形成一个键值对的集合。

reduce:“化简”操作,我们对这些巨大的“键值对集合“进行分组,统计等等。

具体大家可以看看百科:http://baike.baidu.com/view/2902.htm

下面我举个例子,用Mapreduce来实现一个对age的分组统计。

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Collections.Concurrent;

using System.Collections.Generic;

using System.Linq;

class Program
{
    static void Main(string[] args)
    {
        List<Student> list = new List<Student>()
        {
            new Student(){ ID=1, Name="jack", Age=20},
            new Student(){ ID=1, Name="mary", Age=25},
            new Student(){ ID=1, Name="joe", Age=29},
            new Student(){ ID=1, Name="Aaron", Age=25},
        };

        //这里我们会对age建立一组键值对
        var map = list.AsParallel().ToLookup(i => i.Age, count => 1);

        //化简统计
        var reduce = from IGrouping<int, int> singleMap
                     in map.AsParallel()
                     select new
                     {
                         Age = singleMap.Key,
                         Count = singleMap.Count()
                     };

        ///最后遍历
        reduce.ForAll(i =>
        {
            Console.WriteLine("当前Age={0}的人数有:{1}人", i.Age, i.Count);
        });
    }

    public class Student
    {
        public int ID { get; set; }

        public string Name { get; set; }

        public int Age { get; set; }

        public DateTime CreateTime { get; set; }
    }
}

转自:http://www.cnblogs.com/huangxincheng/archive/2012/04/04/2431616.html

时间: 2024-11-08 23:09:29

并行开发 —— 第三篇 plinq的使用的相关文章

并行开发学习随笔1——plinq并行

这两天在看园友的文章 <8天玩转并行开发——第三天 plinq的使用> 对里面的第一个实例亲手实践了一下,发现了一点有意思的事情. 测试环境:.net 4.5 64位(如果是32位的,测试千万数据时会爆出out of memory的错误) 在我的机器上,千万数据的测试结果: 百万数据的测试结果: 十万数据的测试结果: 可以看出,到底使用串行还是并行应该根据数据量来决定,两者的大致就在几十万数据的时候性能基本接近.当然这个结果不是固定的,应该是与机器的配置以及测试时的系统环境有比较大的关系,实际

(C/C++)基于SharpUI控件库的插件式框架开发--第三篇框架基础服务库

一个框架基础的东西,一般也是操作的最基础的类,比如char.int.bool等,有时出现内存泄露的问题导致错误的抛出,但是C++开发有的时候就算是抛出异常,那也是靠经验来积累才能非常快速准确的找出错误所在,这就需要在框架中需要添加日志管理的接口,日志管理的好处就是开发者自身在找异常时提供参考,另一个就是如果用户操作时出现问题,也可将日志反馈,帮助快速解决问题:总之了为了更好的扩展完善我的框架,我详细列一下这个基础服务库(XPCore)包含内容: 虽说sharpui控件库内封闭好string类,但

基于GBT28181:SIP协议组件开发-----------第三篇SIP注册流程分析实现

上两章节简要的讲解了SIP组件开发接口和开发环境的搭建.在本节将实现Linux 32平台的UAS和UAC,当然该UAS和UAC只实现了注册功能,并且是基于自主开发SIP组件libGBT28181SipComponent.so的,没有这个组件是运行不了的.其他功能在后续章节中讲解. 首先简单讲解一下GBT28181关于注册描述 一. GBT28181注册的流程如下图 电力系统注册稍微复杂点,但原来基本相同.多了个刷新注册的过程. 二.GBT28181关于注册的解释如下 三.SIP协议简介 一个合法

PowerBI开发 第三篇:报表设计技巧

最近做了几个PowerBI报表,对PowerBI的设计有了更深的理解,对数据的塑形(sharp data),不仅可以在Data Source中实现,例如在TSQL查询脚本中,而且可以在PowerBI中实现,例如,向数据模型中添加自定义字段,或者在报表数据显示时,根据数据表之间的关系做数据的统计.本文主要介绍数据的塑形和UI设计的微调. 一,创建数据列 PowerBI报表的数据分为数据源(Data Source),数据模型(Data Model),Query,数据从Data Source加载到Da

API开发第三篇:PHP的设计模式之完美的单例模式

今天来说一说单例模式. 由于我以前是做java开发的,在使用单例模式的时候,首先想到的想用饿汉式,然后发现在PHP中,有这样一个特性:因为PHP不支持在类定义时给类的成员变量赋予非基本类型的值.如表达式,new操作等等.所以了饿汉式这个就不行了.转而想要确保这个单例模式的原子性,发现PHP中也没有像JAVA中的线程安全问题.嘿嘿,你说PHP好不好?那么OK接下来就试试PHP的懒汉式单例模式了. 先不说,我先上我第一个版本的单例模式代码: // 定义私有静态变量.此种方式为:懒汉式单例(PHP中只

Python开发第三篇

函数 一.函数参数传值 形参:函数在定义的时候给定的参数 实参:函数在运行时赋给的参数: 1 def func(i):#i为定义时的参数,为形参 2 pass 3 func(name)#name为运行时的参数,为实参,实参与形参的名字可以相同 传值方式: 位置传值:按照定义时的顺序,用实参给形参赋值 1 def func(x,y,z): 2 print("x->",x) 3 print("y->",y) 4 print("z->"

软件工程迭代开发第三篇

今天将之前的用户界面显示的内容集合到了一个函数中,并且增加了屏幕左上角的角色状态栏,并且加入了魔法这项状态.界面与下图所示: 因为还没有技能,所以魔法还不能减.这也是为之后加入技能做出准备.图中灰色的框是头像框,灰色方块是默认的头像. 整合代码如下: /*绘制UI界面*/ void Player::paintUI() { //计算用户界面的x,y值 double x, y; x = wx; y = wy; if (x < 1) x = 1; else if (x > mapmax - 1) x

PowerBI开发 第八篇:查询参数

在PowerBI Desktop中,用户可以定义一个或多个查询参数(Query Parameter),通常的用法是通过查询参数定义数据查询(Query),在数据模型中创建关系,通过DAX表达式引用参数.在查询编辑器(Query Editor)中,用户通过菜单“Manage Parameters”创建和管理参数,参数有Name属性,参数的数据类型,以及参数的类型(List of Values,ListQuery),当前值(Current Value)等属性,用于手动枚举参数的值,或则指定一个Lis

并行计算复习————第三篇 并行计算理论基础:并行数值算法

第三篇 并行计算理论基础:并行数值算法 注:此篇较水,=.= Ch9 稠密矩阵运算 9.1 矩阵的划分 矩阵的划分一般分为带状划分和棋盘划分,在此基础上又有循环划分的变体: 带状划分:把矩阵的若干行或若干列连续地划分给一个处理器 循环带状划分:把矩阵的若干行或若干列间断且等间隔地划分给一个处理器 棋盘划分:把方阵连续地划分成若干子方阵,每个处理器指派一个子方阵 循环棋盘划分:把方阵间断且等间隔地划分成若干子方阵,每个处理器指派一个子方阵 一般情况下,棋盘划分的划分方法能够开发出更高并行度的算法