一.并行LINQ
System.Linq名称空间中包含的类ParallelEnumerable可以分解查询的工作,使其分布在多个线程上。
尽管Enumerable类给IEnumerable<T>接口定义了扩展方法,但ParallelEnumerable类的大多数扩展方法是ParallerQuery<TSource>类的扩展。例如,AsParallel()方法,它扩展了IEnumerable<T>接口,返回ParallelQuery<T>类,所以正常的集合类可以以平行方式查询。
1.并行查询
下面演示并行LINQ(Parallel LINQ,PLINQ):
//用随机值填充一个大型的int集合 // Enumerable.Range(0, arraySize),生成指定范围内的整数的空序列。 //Select(x => r.Next(140)),用小于140的数填充集合 static IEnumerable<int> SampleData() { const int arraySize = 100000000; var r = new Random(); return Enumerable.Range(0, arraySize).Select(x => r.Next(140)).ToList(); } static void IntroParallel() { var data = SampleData(); var watch = new Stopwatch(); //非并行LINQ watch.Start(); var q1 = (from x in data where Math.Log(x) < 4 select x).Average(); watch.Stop(); Console.WriteLine("sync {0}, result: {1}", watch.ElapsedMilliseconds, q1); watch.Reset(); //使用data.AsParallel()进行并行LINQ watch.Start(); var q2 = (from x in data.AsParallel() where Math.Log(x) < 4 select x).Average(); watch.Stop(); Console.WriteLine("async {0}, result: {1}", watch.ElapsedMilliseconds, q2); }
输出;
发现并行查询时间用的少,在并行查询时CPU利用率达到100%
与LINQ基础(二)(http://www.cnblogs.com/afei-24/p/6845551.html)中的LINQ查询一样,编译器会修改语法,以调用AsParallel,Where(),Select(),Average()方法:
var q2 = data.AsParallel().Where(x => Math.Log(x)<4).Select(x => x).Average();
AsParallel()方法用ParallerEnumerable类定义,以扩展IEnumerable<T>接口,所以可以对简单的数组调用它。AsParallel()方法返回ParallerQuery<T>。因为返回的类型,所以编译器选择的Where()方法是ParallerEnumerable.Where(),而不是Enumerable.Where()。
对于PrarllelEnumerable类,查询是分区的,以便多个线程可以同时处理该查询。集合可以分为多个部分,其中每个部分由不同的线程处理。完成分区的工作后,就需要合并,获得所有部分的总和。
2.分区器
AsParallel()方法不仅扩展了IEnumerable<T>接口,还扩展了Partitioner类。通过它可以影响要创建的分区。
Partitioner类用System,Collection.Concurrent名称空间定义,并且有不同的变体。Create()方法接受实现了IList<T>类的数组或对象,以及Boolean类型的参数,返回一个不同的Partitioner类型。Create()方法有多个重载版本。
var q2 = (from x in Partitioner.Create(data).AsParallel()
where Math.Log(x) < 4
select x).Average();
也可以对AsParallel()方法接着调用WithExecutionMode()和WithDegreeOfParallelism()方法,来影响并行机制。WithExecutionMode()方法可以传递ParallelExecutionMode的一个Default值或者ForceParallelism值。默认情况下,并行LINQ避免使用系统开销很高的并行机制。WithDegreeOfParallelism()方法,可以传递一个整数值,以指定应并行运行的最大任务数。如果查询不应使用全部CPU,这个方法很有用。
3.取消
要取消长时间运行的查询,可以给查询添加WithCancellation()方法,并传递一个CancellationToken令牌作为参数。CancellationToken令牌从CancellationTokenSource类中创建。
举个例子,下面的查询在单独的线程中运行,如果取消了查询,在该线程中捕获一个OperationCanceledException类型的异常。在主线程中,可以调用CancellationTokenSource类的Cancle()方法取消任务。
var data = SampleData(); var watch = new Stopwatch(); watch.Start(); Console.WriteLine("filled array"); var sum1 = (from x in data where Math.Log(x) < 4 select x).Average(); Console.WriteLine("sync result {0}", sum1); var cts = new CancellationTokenSource(); Task.Factory.StartNew(() => { try { var res = (from x in data.AsParallel().WithCancellation(cts.Token) where Math.Log(x) < 4 select x).Average(); Console.WriteLine("query finished, result: {0}", res); } catch (OperationCanceledException ex) { Console.WriteLine(ex.Message); } }); watch.Stop(); Console.WriteLine("async {0}, result: {1}", watch.ElapsedMilliseconds, "res"); Console.WriteLine("query started"); Console.Write("cancel? "); string input = Console.ReadLine(); if (input.ToLower().Equals("y")) { cts.Cancel(); Console.WriteLine("sent a cancel"); } Console.WriteLine("press return to exit"); Console.ReadLine();
二.表达式树
在LINQ To Object 中,扩展方法需要将一个委托类型作为参数,这样就可以将lambda表达式赋予参数。lambda表达式也可以赋予Expression<T>类型的参数,C#编译器根据类型给lambda表达式定义不同的行为。如果类型是Expression<T>,编译器就从lambda表达式中创建一个表达式树,并存储在程序集中。这样就可以在运行期间分析表达式树,并进行优化,以便查询数据源。
var racers = from r in Formula1.GetChampions()
where r.Wins > 15 && (r.Country == "Brazil" || r.Country == "Austria")
select r;
这个查询表达式使用了扩展方法Where(),Select()方法。Enumerable类定义了Where()方法,并将委托类型Func<T,bool>作为参数谓词:
public static IEnumerable<TSource> Where<TSource>(this IEnumerable<TSource> source,Func<TSource,bool> predicate);
这样,就可以把lambda表达式赋予委托predicate。
除了使用委托之外,编译器还会把表达式树放在程序集中。表达式树可以在运行期间读取。表达式树从派生自抽象基类Expression的类中构建。Expression和Expression<T>不同。继承自Expression类的表达式类有BinaryExpression,ConstantExpression,InvocationExpression等。编译器会从lambda表达式中创建表达式树。
例如,lambda表达式r.Country == "Brazil"使用了ParameterExpression,MemberExpression,ConstantExpression,MethodCallExpression,来创建一个表达式树,并将该树存储在程序集中,之后在运行期间使用这个树,创建一个用于底层数据源的优化查询:
//DisplayTree方法在控制台上图形化的显示表达式树。其中传递一个Expression对象,并根据表达式的类型,把表达式的一些信息写到控制台上 private static void DisplayTree(int indent, string message, Expression expression) { string output = String.Format("{0} {1} ! NodeType: {2}; Expr: {3} ", "".PadLeft(indent, ‘>‘), message, expression.NodeType, expression); indent++; switch (expression.NodeType) { case ExpressionType.Lambda: Console.WriteLine(output); LambdaExpression lambdaExpr = (LambdaExpression)expression; foreach (var parameter in lambdaExpr.Parameters) { DisplayTree(indent, "Parameter", parameter); } DisplayTree(indent, "Body", lambdaExpr.Body); break; case ExpressionType.Constant: ConstantExpression constExpr = (ConstantExpression)expression; Console.WriteLine("{0} Const Value: {1}", output, constExpr.Value); break; case ExpressionType.Parameter: ParameterExpression paramExpr = (ParameterExpression)expression; Console.WriteLine("{0} Param Type: {1}", output, paramExpr.Type.Name); break; case ExpressionType.Equal: case ExpressionType.AndAlso: case ExpressionType.GreaterThan: BinaryExpression binExpr = (BinaryExpression)expression; if (binExpr.Method != null) { Console.WriteLine("{0} Method: {1}", output, binExpr.Method.Name); } else { Console.WriteLine(output); } DisplayTree(indent, "Left", binExpr.Left); DisplayTree(indent, "Right", binExpr.Right); break; case ExpressionType.MemberAccess: MemberExpression memberExpr = (MemberExpression)expression; Console.WriteLine("{0} Member Name: {1}, Type: {2}", output, memberExpr.Member.Name, memberExpr.Type.Name); DisplayTree(indent, "Member Expr", memberExpr.Expression); break; default: Console.WriteLine(); Console.WriteLine("{0} {1}", expression.NodeType, expression.Type.Name); break; } } static void Main() { Expression<Func<Racer, bool>> expression = r => r.Country == "Brazil" && r.Wins > 6; DisplayTree(0, "Lambda", expression); }
输出:
使用Expression<T>类型的一个例子是ADO.NET EF 和WCF数据服务的客户端提供程序。这些技术用Expression<T>参数定义了扩展方法。这样,访问数据库的LINQ提供程序就可以读取表达式,创建一个运行期间优化的查询,从数据库中获取数据。
后面会单独介绍表达式树的使用。
三.LINQ提供程序
.NET包含几个LINQ提供程序。LINQ提供程序为特定的数据源实现了标准的查询操作符。LINQ提供程序也许会实现比LINQ定义的更多扩展方法,但至少要实现标准操作符。LINQ To XML实现了一些专门用于XML的方法,后面会详细介绍。
LINQ提供程序的实现方案是根据名称空间和第一个参数的类型来选择的。实现扩展方法的类的名称空间必须是开放的,否则扩展方法就不在作用域内。在LINQ to Objects中定义的Where()方法的参数和LINQ To Entities中定义的Where()方法的参数不同:
LINQ to Objects中定义的Where()方法:
public static IEnumerable<TSource> Where<TSource>(this IEnumerable<TSource> source,Func<TSource,bool> predicate);
LINQ To Entities中定义的Where()方法:
public static IQueryable<TSource> Where<TSource>(this IQueryable<TSource> source,Expression<Func<TSource,bool>> predicate);
这两个类都在System.Linq的Syste,.Core程序集中实现。无论是用 Func<TSource,bool>传递参数,还是用Expression<Func<TSource,bool>>参数传递,lambda表达式都相同。只是编译器的行为不同,它根据source参数来选择。编译器根据其参数选择最匹配的方法。在ADO.NET EF中定义的ObjectContext类CreateQuery<T>()方法返回一个实现了IQueryable<TSource>接口的ObjectQuery<T>对象,因此EF使用Querable类的Where()方法。