何时使用 Parallel.ForEach,何时使用 PLINQ

翻译自:When Should I Use Parallel.ForEach? When Should I Use PLINQ?

原作者: Pamela Vagata, Parallel Computing Platform Group, Microsoft Corporation

原文pdf:http://download.csdn.net/detail/sqlchen/7509513

====================================================================

简介

当需要为多核机器进行优化的时候,最好先检查下你的程序是否有处理能够分割开来进行并行处理。(例如,有一个巨大的数据集合,其中的元素需要一个一个进行彼此独立的耗时计算)。

.net framework 4 中提供了 Parallel.ForEach 和 PLINQ 来帮助我们进行并行处理,本文探讨这两者的差别及适用的场景。

Parallel.ForEach

Parallel.ForEach 是 foreach 的多线程实现,他们都能对 IEnumerable<T> 类型对象进行遍历,Parallel.ForEach 的特殊之处在于它使用多线程来执行循环体内的代码段。

Parallel.ForEach 最常用的形式如下:

public static ParallelLoopResult ForEach<TSource>(
    IEnumerable<TSource> source,
    Action<TSource> body)

PLINQ

PLINQ 也是一种对数据进行并行处理的编程模型,它通过 LINQ 的语法来实现类似 Parallel.ForEach 的多线程并行处理。

场景一:简单数据 之 独立操作的并行处理(使用 Parallel.ForEach)

示例代码:

public static void IndependentAction(IEnumerable<T> source, Action<T> action)
{
    Parallel.ForEach(source, element => action(element));
}

理由:

1. 虽然 PLINQ 也提供了一个类似的 ForAll 接口,但它对于简单的独立操作太重量化了。

2. 使用 Parallel.ForEach 你还能够设定 ParallelOptions.MaxDegreeOfParalelism 参数(指定最多需要多少个线程),这样当 ThreadPool 资源匮乏(甚至当可用线程数<MaxDegreeOfParalelism)的时候, Parallel.ForEach 依然能够顺利运行,并且当后续有更多可用线程出现时,Parallel.ForEach 也能及时地利用这些线程。PLINQ 只能通过WithDegreeOfParallelism 方法来要求固定的线程数,即:要求了几个就是几个,不会多也不会少。

场景二:顺序数据 之 并行处理(使用 PLINQ 来维持数据顺序)

当输出的数据序列需要保持原始的顺序时采用 PLINQ 的 AsOrdered 方法非常简单高效。

示例代码:

public static void GrayscaleTransformation(IEnumerable<Frame> Movie)
{
    var ProcessedMovie =
        Movie
        .AsParallel()
        .AsOrdered()
        .Select(frame => ConvertToGrayscale(frame));

    foreach (var grayscaleFrame in ProcessedMovie)
    {
        // Movie frames will be evaluated lazily
    }
}

理由:

1. Parallel.ForEach 实现起来需要绕一些弯路,首先你需要使用以下的重载在方法:

public static ParallelLoopResult ForEach<TSource >(
    IEnumerable<TSource> source,
    Action<TSource, ParallelLoopState, Int64> body)

这个重载的 Action 多包含了 index  参数,这样你在输出的时候就能利用这个值来维持原先的序列顺序。请看下面的例子:

public static double [] PairwiseMultiply(double[] v1, double[] v2)
{
    var length = Math.Min(v1.Length, v2.Lenth);
    double[] result = new double[length];
    Parallel.ForEach(v1, (element, loopstate, elementIndex) =>
        result[elementIndex] = element * v2[elementIndex]);
    return result;
}

你可能已经意识到这里有个明显的问题:我们使用了固定长度的数组。如果传入的是 IEnumerable 那么你有4个解决方案:

(1) 调用 IEnumerable.Count() 来获取数据长度,然后用这个值实例化一个固定长度的数组,然后使用上例的代码。

(2) The second option would be to materialize the original collection before using it; in the event that your input data set is prohibitively large, neither of the first two options will be feasible.(没看懂贴原文)

(3) 第三种方式是采用返回一个哈希集合的方式,这种方式下通常需要至少2倍于传入数据的内存,所以处理大数据时请慎用。

(4) 自己实现排序算法(保证传入数据与传出数据经过排序后次序一致)

2. 相比之下 PLINQ 的 AsOrdered 方法如此简单,而且该方法能处理流式的数据,从而允许传入数据是延迟实现的(lazy materialized)

场景三:流数据 之 并行处理(使用 PLINQ)

PLINQ 能输出流数据,这个特性在一下场合非常有用:

1. 结果集不需要是一个完整的处理完毕的数组,即:任何时间点下内存中仅保持数组中的部分信息

2. 你能够在一个单线程上遍历输出结果(就好像他们已经存在/处理完了)

示例:

public static void AnalyzeStocks(IEnumerable<Stock> Stocks)
{
    var StockRiskPortfolio =
        Stocks
        .AsParallel()
        .AsOrdered()
        .Select(stock => new { Stock = stock, Risk = ComputeRisk(stock)})
        .Where(stockRisk => ExpensiveRiskAnalysis(stockRisk.Risk));

    foreach (var stockRisk in StockRiskPortfolio)
    {
        SomeStockComputation(stockRisk.Risk);
        // StockRiskPortfolio will be a stream of results
    }
}

这里使用一个单线程的 foreach 来对 PLINQ 的输出进行后续处理,通常情况下 foreach 不需要等待 PLINQ 处理完所有数据就能开始运作。

PLINQ 也允许指定输出缓存的方式,具体可参照 PLINQ 的 WithMergeOptions 方法,及 ParallelMergeOptions 枚举

场景四:处理两个集合(使用 PLINQ)

PLINQ 的 Zip 方法提供了同时遍历两个集合并进行结合元算的方法,并且它可以与其他查询处理操作结合,实现非常复杂的机能。

示例:

public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)
{
    return
        a
        .AsParallel()
        .AsOrdered()
        .Select(element => ExpensiveComputation(element))
        .Zip(
            b
            .AsParallel()
            .AsOrdered()
            .Select(element => DifferentExpensiveComputation(element)),
            (a_element, b_element) => Combine(a_element,b_element));
}

示例中的两个数据源能够并行处理,当双方都有一个可用元素时提供给 Zip 进行后续处理(Combine)。

Parallel.ForEach 也能实现类似的 Zip 处理:

public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)
{
    var numElements = Math.Min(a.Count(), b.Count());
    var result = new T[numElements];
    Parallel.ForEach(a,
        (element, loopstate, index) =>
        {
            var a_element = ExpensiveComputation(element);
            var b_element = DifferentExpensiveComputation(b.ElementAt(index));
            result[index] = Combine(a_element, b_element);
        });
    return result;
}

当然使用 Parallel.ForEach 后你就得自己确认是否要维持原始序列,并且要注意数组越界访问的问题。

场景五:线程局部变量

Parallel.ForEach 提供了一个线程局部变量的重载,定义如下:

public static ParallelLoopResult ForEach<TSource, TLocal>(
    IEnumerable<TSource> source,
    Func<TLocal> localInit,
    Func<TSource, ParallelLoopState, TLocal,TLocal> body,
    Action<TLocal> localFinally)

使用的示例:

public static List<R> Filtering<T,R>(IEnumerable<T> source)
{
    var results = new List<R>();
    using (SemaphoreSlim sem = new SemaphoreSlim(1))
    {
        Parallel.ForEach(source,
            () => new List<R>(),
            (element, loopstate, localStorage) =>
            {
                bool filter = filterFunction(element);
                if (filter)
                    localStorage.Add(element);
                return localStorage;
            },
            (finalStorage) =>
            {
                lock(myLock)
                {
                    results.AddRange(finalStorage)
                };
            });
    }
    return results;
}

线程局部变量有什么优势呢?请看下面的例子(一个网页抓取程序):

public static void UnsafeDownloadUrls ()
{
    WebClient webclient = new WebClient();
    Parallel.ForEach(urls,
        (url,loopstate,index) =>
        {
            webclient.DownloadFile(url, filenames[index] + ".dat");
            Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
        });
}

通常第一版代码是这么写的,但是运行时会报错“System.NotSupportedException -> WebClient does not support concurrent I/O operations.”。这是因为多个线程无法同时访问同一个 WebClient 对象。所以我们会把 WebClient 对象定义到线程中来:

public static void BAD_DownloadUrls ()
{
    Parallel.ForEach(urls,
        (url,loopstate,index) =>
        {
            WebClient webclient = new WebClient();
            webclient.DownloadFile(url, filenames[index] + ".dat");
            Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
        });
}

修改之后依然有问题,因为你的机器不是服务器,大量实例化的 WebClient 迅速达到你机器允许的虚拟连接上限数。线程局部变量可以解决这个问题:

public static void downloadUrlsSafe()
{
    Parallel.ForEach(urls,
        () => new WebClient(),
        (url, loopstate, index, webclient) =>
        {
            webclient.DownloadFile(url, filenames[index]+".dat");
            Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
            return webclient;
        },
            (webclient) => { });
}

这样的写法保证了我们能获得足够的 WebClient 实例,同时这些 WebClient 实例彼此隔离仅仅属于各自关联的线程。

虽然 PLINQ 提供了 ThreadLocal<T> 对象来实现类似的功能:

public static void downloadUrl()
{
    var webclient = new ThreadLocal<WebClient>(()=> new WebClient ());
    var res =
        urls
        .AsParallel()
        .ForAll(
            url =>
            {
                webclient.Value.DownloadFile(url, host[url] +".dat"));
                Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
            });
}

但是请注意:ThreadLocal<T> 相对而言开销更大!

场景五:退出操作 (使用 Parallel.ForEach)

Parallel.ForEach 有个重载声明如下,其中包含一个 ParallelLoopState 对象:

public static ParallelLoopResult ForEach<TSource >(
    IEnumerable<TSource> source,
    Action<TSource, ParallelLoopState> body)

ParallelLoopState.Stop() 提供了退出循环的方法,这种方式要比其他两种方法更快。这个方法通知循环不要再启动执行新的迭代,并尽可能快的推出循环。

ParallelLoopState.IsStopped 属性可用来判定其他迭代是否调用了 Stop 方法。

示例:

public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>
{
    var matchFound = false;
    Parallel.ForEach(TSpace,
        (curValue, loopstate) =>
            {
                if (curValue.Equals(match) )
                {
                    matchFound = true;
                    loopstate.Stop();
                }
            });
    return matchFound;
}

ParallelLoopState.Break() 通知循环继续执行本元素前的迭代,但不执行本元素之后的迭代。最前调用 Break 的起作用,并被记录到 ParallelLoopState.LowestBreakIteration 属性中。这种处理方式通常被应用在一个有序的查找处理中,比如你有一个排序过的数组,你想在其中查找匹配元素的最小 index,那么可以使用以下的代码:

public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>
{
    var loopResult = Parallel.ForEach(source,
        (curValue, loopState, curIndex) =>
        {
            if (curValue.Equals(match))
            {
                loopState.Break();
            }
         });
    var matchedIndex = loopResult.LowestBreakIteration;
    return matchedIndex.HasValue ? matchedIndex : -1;
}

虽然 PLINQ 也提供了退出的机制(cancellation token),但相对来说退出的时机并没有 Parallel.ForEach 那么及时。

何时使用 Parallel.ForEach,何时使用 PLINQ,布布扣,bubuko.com

时间: 2024-09-30 20:55:23

何时使用 Parallel.ForEach,何时使用 PLINQ的相关文章

Parallel.Foreach的全部知识要点【转】

简介 当需要为多核机器进行优化的时候,最好先检查下你的程序是否有处理能够分割开来进行并行处理.(例如,有一个巨大的数据集合,其中的元素需要一个一个进行彼此独立的耗时计算). .net framework 4 中提供了 Parallel.ForEach 和 PLINQ 来帮助我们进行并行处理,本文探讨这两者的差别及适用的场景. Parallel.ForEach Parallel.ForEach 是 foreach 的多线程实现,他们都能对 IEnumerable<T> 类型对象进行遍历,Para

Parallel for-each loops in .NET C# z

An IEnumerable object An Action of T which is used to process each item in the list List<string> dataList = new List<string> { "this", "is", "random", "sentence", "hello", "goodbye" }

Parallel ForEach For 多线程并行计算使用注意

之前用DataTable进行遍历,出现索引超出范围问题,因为List<T>也只支持单线程,改用 ConcurrentBag解决问题.在Parallel ForEach/For 外的变量要避免同时操作同一个变量造成 数据不一致的情况. List<T> 转为 ConcurrentBag ConcurrentBag<T> concT= new ConcurrentBag<T>(this.db.T.ToList());

Parallel.ForEach , ThreadPool.QueueUserWorkItem

1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading; 6 using System.Threading.Tasks; 7 8 namespace ParallelThreadPool 9 { 10 class Program 11 { 12 static void Main(string[] args) 13 {

C#多线程 为多核处理器而生的多线程方法Parallel.For和Parallel.ForEach

1.在.net4.0中,有了一个新的类库:任务并行库.它极大地简化了并行编程且内容丰富.这里仅介绍其中最简单的 Parallel.For循环和Parallel.ForEach循环.它们位于System.Threading.Tasks命名空间.它们是两个方法,这两个方法将迭代分别放在不同的处理器上并行处理,如果机器是多处理器或多核处理器,这样就会使性能大大提升. 2.例子用Parallel.For做计算,为数组赋值并打印,用Parallel.ForEach计算字符串数组每个元素的长度,运行结果:

C#并发实战Parallel.ForEach使用

前言:最近给客户开发一个伙食费计算系统,大概需要计算2000个人的伙食.需求是按照员工的预定报餐计划对消费记录进行检查,如有未报餐有刷卡或者有报餐没刷卡的要进行一定的金额扣减等一系列规则.一开始我的想法比较简单,直接用一个for循环搞定,统计结果倒是没问题,但是计算出来太慢了需要7,8分钟.这样系统服务是报超时错误的,让人觉得有点不太爽.由于时间也不多就就先提交给用户使用了,后面逻辑又增加了,计算时间变长,整个计算一遍居然要将近10分钟了.这个对用户来说是能接收的(原来自己手算需要好几天呢),但

Parallel.ForEach 使用多线遍历循环

Parallel.ForEach相对于foreach是多线程,并行操作;foreach是单线程品德操作. static void Main(string[] args) { Console.WriteLine("Hello World!"); List<UserInfo> lst = new List<UserInfo> { }; UserInfo[] array = new UserInfo[] {}; for (int i = 1; i <= 10;

.NET并行编程1 -

设计模式--.net并行编程,清华大学出版的中译本. 相关资源地址主页面: http://parallelpatterns.codeplex.com/ 代码下载: http://parallelpatterns.codeplex.com/releases/view/50473 书籍在线地址: https://msdn.microsoft.com/en-us/library/ff963553.aspx 使用并行编程的一些示例: https://code.msdn.microsoft.com/Par

《Effective C#》快速笔记(四)- 使用框架

.NET 是一个类库,你了解的越多,自己需要编写的代码就越少. 目录 三十.使用重写而不是事件处理函数 三十一.使用 IComparable<T> 和 IComparer<T> 实现顺序关系 三十二.避免使用 ICloneable 接口 三十三.仅用 new 修饰符处理基类更新 三十四.避免重载基类中定义的方法 三十五.PLINQ 如何实现并行算法 三十六.理解 PLINQ 在 I/O 密集场景 三十七.注意并行算法中的异常 三十.使用重写而不是事件处理函数 1.处理系统之中触发的