.Net多线程编程—Parallel LINQ、线程池

Parallel LINQ

1 System.Linq.ParallelEnumerable

重要方法概览:

1)public static ParallelQuery<TSource> AsParallel<TSource>(this IEnumerable<TSource> source);启用查询的并行化

2)public static ParallelQuery<TSource> AsOrdered<TSource>(this ParallelQuery<TSource> source);启用将数据源视为“已经排序”的处理方法,重写默认的将数据源视为“未经排序”的处理方法。只可以对由 AsParallel、ParallelEnumerable.Range和 ParallelEnumerable.Repeat 返回的泛型序列调用 AsOrdered。

3) public static ParallelQuery<TSource> WithExecutionMode<TSource>(this ParallelQuery<TSource> source, ParallelExecutionMode executionMode);设置查询的执行模式

4)public static double Average(this ParallelQuery<double> source);计算序列平均值

5) public static decimal? Max(this ParallelQuery<decimal?> source);计算序列最大值

6) public static decimal? Min(this ParallelQuery<decimal?> source);计算序列中最小值

7)public static decimal? Sum(this ParallelQuery<decimal?> source);求和

8)public static TResult Aggregate<TSource, TAccumulate, TResult>(this ParallelQuery<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> func, Func<TAccumulate, TResult> resultSelector);对一个序列并行应用累加器函数。 将指定的种子值用作累加器的初始值,并使用指定的函数选择结果值。

9)public static ParallelQuery<TSource> WithCancellation<TSource>(this ParallelQuery<TSource> source, CancellationToken cancellationToken);设置要与查询关联的 System.Threading.CancellationToken。

10)public static ParallelQuery<TSource> WithDegreeOfParallelism<TSource>(this ParallelQuery<TSource> source, int degreeOfParallelism);设置要在查询中使用的并行度。

11)public static void ForAll<TSource>(this ParallelQuery<TSource> source, Action<TSource> action);对 source 中的每个元素并行调用指定的操作。

12)public static ParallelQuery<TSource> WithMergeOptions<TSource>(this ParallelQuery<TSource> source, ParallelMergeOptions mergeOptions);设置此查询的合并选项,它指定查询对输出进行缓冲处理的方式。

说明:

1)PLINQ实现了全部的LINQ操作符,并添加了部分并行操作符。

2)不论是并发集合或传统集合都可使用PLINQ。

3)默认情况下,执行PLINQ时,.NET尽量避免高开销并行化算法;若想强制并行执行,可使用ParallelExecutionMode.ForceParallelism。

4)根据可用内核数,PLINQ将接受的数据源分解为多份,然后在不同的内核上处理每一份。且对每一份的执行没有固定顺序。

5)PLINQ查询有延缓执行的效果,因此要捕获查询所产生的结果在被消费者消费时产生的异常。

6)Aggregate的重载方法之一可以将数据源序列分区成几个子序列(分区)。 对分区内的每个元素执行 updateAccumulatorFunc,得到每个分区的单个累积结果。 然后,在每个分区的结果上调用 combineAccumulatorsFunc 来产生一个元素。 最后,combineAccumulatorsFunc 产生的元素通过 resultSelector 函数进行转换即可获得最终结果。

2 使用示例

定义List<T> list = ......,代码中的condition为筛选条件。

1)排序

 1 //确保运算获得的数据输出顺序与原集合一致
 2 var resLinq1 = from item in list.AsParallel().AsOrdered()
 3                           where (condition)
 4                           select item;
 5 var res1 = list.AsParallel().AsOrdered().Where(m=>m>4);
 6 //按升序排列使用ascending,降序排列使用descending
 7 var resLinq2 = from item in list.AsParallel()
 8                            where (condition)
 9                            orderby item descending
10                            select item;
11
12 //升序排列使用OrderBy,降序排列使用OrderByDescending
13 var res2 = list.AsParallel().Where(item =>condition).OrderByDescending(m => m);

2)设置查询的执行模式

 1 //强制并行化整个查询
 2 var resLinq3 = from item in list.AsParallel().
 3 WithExecutionMode(ParallelExecutionMode.ForceParallelism)
 4                            where (condition)
 5                            orderby item descending
 6                            select item;
 7
 8 var res3 = list.AsParallel().
 9 WithExecutionMode(ParallelExecutionMode.ForceParallelism).
10 Where(condition).OrderByDescending(m => m);

3)规约操作

假定这里的list中的元素为数字。

1 var resLinq4 = (from item in list.AsParallel()
2                             where (condition)
3                             select item).Average();
4 var res4 = list.AsParallel().Where(item => condition).Average();
5
6 var resLinq5 = (from item in list.AsParallel()
7                             where (condition)
8                             select item).Max();
9 var res5 = list.AsParallel().Where(item => condition).Max();

4)自定义聚集函数

假定这里的list中的元素为数字。

 1 //方差计算公式:S2 = ((X1-A)2+(X2-A)2+...+(Xn-A)2)/N,其中A为平均值,N为序列中元素个数,Xi为序列中第i个元素
 2 //sum 求和部分结果,item:集合list中的元素,result:经计算后得到的方差值。
 3 var average = list.AsParallel().Average();
 4 var res6 = list.AsParallel().Aggregate(
 5                 0d,
 6                 (sum, item) => sum + Math.Pow((item - average),2),
 7                 (result)=>result/list.Count
 8                 );
 9 //与上面结果相同,只不过多了combineAccumulatorsFunc函数
10 var res7 = list.AsParallel().Aggregate(
11                 0d,
12                 (sum, item) => sum + Math.Pow((item - average), 2),
13                 (total,thisTask)=>total+thisTask,
14                 (result) => result / list.Count

5)取消并行操作

 1 CancellationTokenSource cts = new CancellationTokenSource();
 2 CancellationToken ct = cts.Token;
 3 CancelParallel(ct,list);
 4
 5 private static void CancelParallel(CancellationToken ct,List<int> list)
 6 {
 7             //conditionExec此条件为真时才取消并行操作
 8             if (conditionExec)
 9             {
10                 ct.ThrowIfCancellationRequested();
11             }
12             var average = list.AsParallel().Average();
13             list.AsParallel().WithCancellation(ct).Aggregate(
14                            0d,
15                            (sum, item) => sum + Math.Pow((item - average), 2),
16                            (total, thisTask) => total + thisTask,
17                            (result) => result / list.Count
18                            );
19 }

6)指定并行度

1 int maxDegreeOfParallelism = Environment.ProcessorCount;
2 var res8 = list.AsParallel().WithDegreeOfParallelism(maxDegreeOfParallelism).Aggregate(
3                 0d,
4                 (sum, item) => sum + Math.Pow((item - average), 2),
5                 (result) => result / list.Count
6                 );

7)使用ForAll

1 ConcurrentBag<T> bag = new ConcurrentBag<T>();
2 list.AsParallel().ForAll(item =>
3 {
4         //对元素处理后加如集合
5         bag.Add(itemAfter);
6 });

8)异常处理

使用AggregateException处理异常,具体示例见Tasks.Parallel部分。

线程池

1 CLR 4线程池引擎与线程

  • CLR线程池引擎管理着一个池的线程,这些线程可以处理工作项。线程池引擎会每隔一段时间创建出额外的空闲线程,这些空闲线程以FIFO的顺序将工作项从队列中取出,并且开始执行这些工作项。
  • CLR线程池引擎创建一个托管线程需要数千CPU周期,并且消耗内存。
  • CLR线程池引擎维护了最低数量的闲置工作线程,通常等于逻辑内核数。
  • CLR线程池引擎管理的都是后台线程,即所有前台线程都退出了,后台线程不会维持应用程序继续运行。

2 全局队列与局部队列

  • 使用使用TPL创建任务时,一个新的工作项会被加入到线程池全局队列中,当线程池中所有可用的工作线程都在执行工作项时,新加入线程池全局队列的工作相必须等待,直到有可用的工作项。
  • 线程池中每一个分配给了任务的线程都有自己的局部队列,这样可以减少对全局队列的争用。局部对列通常以LIFO的顺序抽取任务并执行。

3 Threading.ThreadPool

与使用任务将工作项加入队列相比,创建Task实例有一定的开销,但可以利用一些取消标记等。

//使用QueueUserWorkItem方法将任务加入队列中。

ThreadPool.QueueUserWorkItem((state) => { //具体业务 });

//workerThreads线程池中辅助线程的最大数目,completionPortThreads线程池中异步 I/O 线程的最大数目

ThreadPool.GetMaxThreads(out workerThreads, out completionPortThreads);

//workerThreads线程池根据需要创建的最少数量的辅助线程

//completionPortThreads线程池根据需要创建的最少数量的异步 I/O 线程

ThreadPool.GetMinThreads(out workerThreads, out completionPortThreads);

-----------------------------------------------------------------------------------------

转载与引用请注明出处。

时间仓促,水平有限,如有不当之处,欢迎指正。

时间: 2024-08-15 14:47:17

.Net多线程编程—Parallel LINQ、线程池的相关文章

多线程编程学习笔记——线程池(一)

接上文 多线程编程学习笔记——线程同步(一) 接上文 多线程编程学习笔记——线程同步(二) 接上文 多线程编程学习笔记——线程同步(三) 创建多线程操作是非常昂贵的,所以每个运行时间非常短的操作,创建多线程进行操作,可能并不能提高效率,反而降低了效率. 如果你有非常多的执行时间非常短的操作,那么适合作用线程池来提高效率,而不是自行创建多线程. 线程池,就是我们先分配一些资源到池子里,当我们需要使用时,则从池子中获取,用完了,再放回池子里. .NET中的线程池是受CLR管理的,TheadTool类

多线程编程——4种线程池

1.分类 newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程. newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待. newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行. newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO

多线程编程学习五(线程池的创建)

一.概述 New Thread的弊端如下:       a.每次New Thread新建对象性能差.       b.线程缺乏统一的管理,可能无限制的新建线程,相互之间竞争,极可能占用过多的系统资源导致死机 或者 OOM.       c.缺乏更多功能,如定时执行.定期执行.线程中断. Java提供的四种线程池的好处在于:       a.重用存在的线程,减少对象创建.消亡的开销,性能佳.       b.可有效控制最大并发线程数.提供系统资源的使用率,同时避免过多资源竞争,避免堵塞.     

C#多线程之旅(3)——线程池

v博客前言 先交代下背景,写<C#多线程之旅>这个系列文章主要是因为以下几个原因:1.多线程在C/S和B/S架构中用得是非常多的;2.而且多线程的使用是非常复杂的,如果没有用好,容易造成很多问题. v写在前面 多线程,有利也有弊,使用需谨慎. v正文开始 原文地址:C#多线程之旅(3)——线程池 C#多线程之旅目录: C#多线程之旅(1)——介绍和基本概念 C#多线程之旅(2)——创建和开始线程 C#多线程之旅(3)——线程池 C#多线程之旅(4)——同步本质 ...... 一.介绍 无论你什

Java多线程编程基础之线程对象

在进入java平台的线程对象之前,基于基础篇(一)的一些问题,我先插入两个基本概念. [线程的并发与并行] 在单CPU系统中,系统调度在某一时刻只能让一个线程运行,虽然这种调试机制有多种形式(大多数是时间片轮巡为主),但无论如何,要通过不断切换需要运行的线程让其运行的方式就叫并发(concurrent).而在多CPU系统中,可以让两个以上的线程同时运行,这种可以同时让两个以上线程同时运行的方式叫做并行(parallel). 在上面包括以后的所有论述中,请各位朋友谅解,我无法用最准确的词语来定义储

多线程编程学习笔记——线程同步(三)

接上文 多线程编程学习笔记——线程同步(一) 接上文 多线程编程学习笔记——线程同步(二) 七.使用Barrier类 Barrier类用于组织多个线程及时在某个时刻会面,其提供一个回调函数,每次线程调用了SignalAndWait方法后该回调函数就会被执行. 1.代码如下: using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; //

多线程编程(进程和线程)

多线程编程(进程和线程) 1.进程:指一个内存中运行的应用程序,每个进程都有自己独立的一块内存空间,一个进程可以启动多个线程. 2.线程:指程序中一个执行流程,一个进程中可以运行多个线程. 一.创建线程(两种方式) 二.线程的5种状态( New,Runnable,Running,Block,Dead ): 三.线程的优先级 四.守护线程 /精灵线程/后台线程 五.方法 六.同步代码锁(synchronized) 一.创建线程(两种方式): 方式1:采用继承Thread的方法 第一,继承 Thre

JAVA程序设计(18.1)----- 1多线程轮流打印 线程调度 线程池 synchronized wait notify 内部类

1.两个线程 一个打印A 一个打印B 另两个线程轮流进行打印工作 多线程初级应用 线程调度  线程池(预先建立N个线程,需要的程序直接调用,执行完毕后归还回线程池,典型的以空间换时间 synchronized wait notify  内部类使用 package com.lovo; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 类:测试 wait notify 用

C# 多线程的自动管理(线程池) 基于Task的方式

C# 多线程的自动管理(线程池) 在多线程的程序中,经常会出现两种情况:    1. 应用程序中线程把大部分的时间花费在等待状态,等待某个事件发生,然后给予响应.这一般使用 ThreadPool(线程池)来解决.     2. 线程平时都处于休眠状态,只是周期性地被唤醒.这一般使用 Timer(定时器)来解决. ThreadPool 类提供一个由系统维护的线程池(可以看作一个线程的容器),该容器需要 Windows 2000 以上系统支持,因为其中某些方法调用了只有高版本的Windows 才有的