Linq并行执行

一、     并行LINQ

  1. 1.  并行查询

.NET4在System.Linq名称空间中包含一个新类ParalleIEnumerable ,可以分解查询的工作使其分布在多个线程上。尽管Enmerable类给IEnunerable<T>接口定义了扩展方法,但

ParalleIEnumerable 类的大多数扩展方法是ParallelQuery<TSource>类的扩展。一个重要的例外是AsParallel()方法,它扩展了IEnumerable<TSource>接口,返回ParallelQuery<TSource>类,所以正常的集合类可以以平行方式查询。

例:

const int arraySize = 100000000;

var data = new int[arraySize];

var r = new Random();

for (int i = 0; i < arraySize; i++)

{

data[i] = r.Next(40);

}

现在可以使用LINQ查询筛选数据,获取筛选数据的总和。该查询用where子句定义了一个筛选器,仅会中对应值小于20的项,接着调用聚合函数Sum()方法 。与前面的LINQ查询的唯一区别是,这次调用了AsParallel()方法。

var sum = (from x in data.AsParallel()

where x < 20

select x).Sum();

与前面的LINQ查询一样,编译器会修改语法,以调用AsParallel()、Where()、Select()和Sum()方法。AsParallel()方法用ParallelEnumerable类定义,以扩展IEnumerable<T>接口,所以对简单的数据调用它。AsParallel()方法返回ParallelQuery<TSource>。因为返回的类型,所以编译器选择的Where()方法是ParallelEnumerable.Where(),而不是Enumerable.Where()。在下面的代码中Select()和Sum()方法也来自ParallelEnumerable类。与Enumerable类的实现代码相反,对于ParallelEnumerable类,查询是分区的,以便多个线程可以同时处理该查询。数组可以分为多个部分,其中每个部分由不同的线程处理,以筛选其余项。完成分区的工作后,就需要合并,获得所有部分的总和。

var sum=data.AsParallel().Where(x=>x<20).Select(x=>x).Sum();

运行这行代码就会启动任务管理器,这样就可以看出系统的所有CPU都在忙碌。如果删除AsParallel()方法,就不可能使用多个CPU。当然,如果系统上没有多个CPU,就不会看到并行版本带来改进。

  1. 2.  分区器

AsParallel()方法不仅扩展了IEnumerable<T>接口,还扩展了Partition类。通过它,可以影响要创建的分区。

Partitioner类用System.Collection.Concurrent命名空间定义,并且有不同变体。Create方法接受实现了IList<T>类的数组或对象。根据这一点,以及类型的参数loadBalance和该方法的一些重载版本,会返回一个不同的Partitioner类型。对于数组,.Net4包含派生自抽象基类OrderablePartitioner<TSource>的DynamicPartitionerForArray<TSource>类和StaticPartitionerForArray<TSource>类。

var q1 = (from x in Partitioner.Create(data).AsParallel()

where x < 20

select x).Sum();

也可以调用WithExecutionMode()和WithDegreeOfParallelism()方法可以传递ParallelExecutionMode的一个Default值或者ForceParallelism值。默认情况下,并行LINQ避免使用系统开销很高的并行机制。对于WithDegreeOfParallelism()方法,可以传递一个整数值,以指定并行运行的最大任务数。

例:

const int arraySize = 100000000;

var data = new int[arraySize];

var r = new Random();

for (int i = 0; i < arraySize; i++)

{

data[i] = r.Next(40);

}

Stopwatch watch = new Stopwatch();

watch.Start();

//一种写法,没有添加动态负载均衡,执行完所需要的时间1300毫秒

var q1 = (from x in Partitioner.Create(data).AsParallel()

where x < 80

select x).Sum();

//第二种写法,添加了动态负载均衡,执行完所需要的时间为660毫秒。

var q1 = (from x in Partitioner.Create(data,true).AsParallel()

where x < 80

select x).Sum();

watch.Stop();

Console.WriteLine(watch.ElapsedMilliseconds.ToString());

  1. 3.  取消

.Net提供了一种标准方式,来取消长时间运行的任务,这也适用于并行LINQ。要取消长时间的查询,可以给查询添加WithCancellation()方法,并传递一个CancellationToken令牌作为参数。CancellationToken令牌从CancellationTokenSource类中创建。该查询在单独的线程中运行,在该线程中,捕获一个OperationCancelException类型的异常。如果取消了查询,就触发这个异常。在主线程中,调用CancellationTokenSource类的Cancel()方法可以取消任务。

const int arraySize = 100000000;

var data = new int[arraySize];

var r = new Random();

for (int i = 0; i < arraySize; i++)

{

data[i] = r.Next(40);

}

var cts = new CancellationTokenSource();

new Thread(() =>

{

try

{

var sum = (from x in data.AsParallel().WithCancellation(cts.Token)

where x < 80

select x).Sum();

Console.WriteLine("query finished, sum: {0}", sum);

}

catch (OperationCanceledException ex)

{

Console.WriteLine(ex.Message);

}

}).Start();

Console.WriteLine("query started");

Console.Write("cancel? ");

int input = Console.Read();

if (input == ‘Y‘ || input == ‘y‘)

{

// cancel!

cts.Cancel();

}

时间: 2024-08-30 17:30:47

Linq并行执行的相关文章

.Net多线程编程—Parallel LINQ、线程池

Parallel LINQ 1 System.Linq.ParallelEnumerable 重要方法概览: 1)public static ParallelQuery<TSource> AsParallel<TSource>(this IEnumerable<TSource> source);启用查询的并行化 2)public static ParallelQuery<TSource> AsOrdered<TSource>(this Paral

扩展、委托、Lambda、linq

1.扩展 扩展是一个很有用的功能.如果你有一个类.不能修改,同时你又想给他加一个方法.这个过程就是扩展.扩展就是扩展方法. 例1: 类People public class People { public string Name { get; set; } public int age { get; set; } } 增加一个扩展: 1.扩展必须是一个静态的类 2.扩展方法必须是一个静态的方法 3.被扩展的对象必须是前面加 this People people public static cla

PLINQ 并行操作Linq

C#并行编程-PLINQ:声明式数据并行 目录 C#并行编程-相关概念 C#并行编程-Parallel C#并行编程-Task C#并行编程-并发集合 C#并行编程-线程同步原语 C#并行编程-PLINQ:声明式数据并行 背景 通过LINQ可以方便的查询并处理不同的数据源,使用Parallel LINQ (PLINQ)来充分获得并行化所带来的优势. PLINQ不仅实现了完整的LINQ操作符,而且还添加了一些用于执行并行的操作符,与对应的LINQ相比,通过PLINQ可以获得明显的加速,但是具体的加

.NET深入解析LINQ框架(五:IQueryable、IQueryProvider接口详解)

阅读目录: 1.环路执行对象模型.碎片化执行模型(假递归式调用) 2.N层对象执行模型(纵横向对比链式扩展方法) 3.LINQ查询表达式和链式查询方法其实都是空壳子 4.详细的对象结构图(对象的执行原理) 5.IQueryable<T>与IQueryProvider一对一的关系能否改成一对多的关系 6.完整的自定义查询 1]. 环路执行对象模型.碎片化执行模型(假递归式调用) 这个主题扯的可能有点远,但是它关系着整个LINQ框架的设计结构,至少在我还没有搞懂LINQ的本意之前,在我脑海里一直频

.NET深入解析LINQ框架(一:LINQ优雅的前奏)

阅读目录: 1.LINQ简述 2.LINQ优雅前奏的音符 2.1.隐式类型 (由编辑器自动根据表达式推断出对象的最终类型) 2.2.对象初始化器 (简化了对象的创建及初始化的过程) 2.3.Lambda表达式 (对匿名方法的改进,加入了委托签名的类型推断并很好的与表达式树的结合) 2.4.扩展方法 (允许在不修改类型的内部代码的情况下为类型添加独立的行为) 2.5.匿名类型 (由对象初始化器推断得出的类型,该类型在编译后自动创建) 2.6.表达式目录树(用数据结构表示程序逻辑代码) 3.LINQ

.NET深入解析LINQ框架(二:LINQ优雅的前奏)

阅读目录: 1.LINQ框架的主要设计模型 1.1.链式设计模式 (以流水线般的链接方式设计系统逻辑) 1.2.链式查询方法(逐步加工查询表达式中的每一个工作点) 2.LINQ框架的核心设计原理 2.1.托管语言之上的语言(LINQ查询表达式) 2.2.托管语言构造的基础(LINQ依附通用接口与查询操作符对应的方法对接) 2.3.深入IEnumerable.IEnumerable<T>.Enumerable(LINQ to Object框架的入口) 2.4.深入IQueryable.IQuer

.NET深入解析LINQ框架(四:IQueryable、IQueryProvider接口详解)

阅读目录: 1.开篇介绍 2.扩展Linq to Object (应用框架具有查询功能) 2.1.通过添加IEnumerable<T>对象的扩展方法 2.2.通过继承IEnumerable<T>接口 2.3.详细的对象结构图 3.实现IQueryable<T> .IQueryProvider接口 3.1.延迟加载IEnumertor<T>对象(提高系统性能) 3.2.扩展方法的扩展对象之奥秘(this IQueryable<TSource> so

.NET深入解析LINQ框架(三:LINQ优雅的前奏)

阅读目录: 1.动态LINQ查询(动态构建Expression<T>表达式树) 2.DLR动态语言运行时(基于CLR之上的动态语言运行时) 1].动态LINQ查询(动态构建Expression<T>表达式树) 什么是动态LINQ查询?LINQ的编写是静态的,因为C#是基于静态类型系统原理设计的,在编写时已经确定类型,也就是在编译时就已经知道将要执行什么样的查询,条件是什么.排序方式是什么等等.那么很大一部分应用场合中我们需要根据用户的选择来查询数据源,以往我们都是通过判断的方式来拼

.NET深入解析LINQ框架(六:LINQ执行表达式)

阅读目录: 1.LINQ执行表达式 在看本篇文章之前我假设您已经具备我之前分析的一些原理知识,因为这章所要讲的内容是建立在之前的一系列知识点之上的,为了保证您的阅读顺利建议您先阅读本人的LINQ系列文章的前几篇或者您已经具备比较深入的LINQ原理知识体系,防止耽误您的宝贵时间. 到目前为止我们对LINQ的执行原理已经很清楚了,从它的前期构想到它真正为我们所用都有足够的证据,但是似乎问题并没有我们想的那么简单,问题总是在我们使用中频频出现尤其是新技术的使用,当然有问题才能有进步. 一:LINQ执行