C#并发实战Parallel.ForEach使用

前言:最近给客户开发一个伙食费计算系统,大概需要计算2000个人的伙食。需求是按照员工的预定报餐计划对消费记录进行检查,如有未报餐有刷卡或者有报餐没刷卡的要进行一定的金额扣减等一系列规则。一开始我的想法比较简单,直接用一个for循环搞定,统计结果倒是没问题,但是计算出来太慢了需要7,8分钟。这样系统服务是报超时错误的,让人觉得有点不太爽。由于时间也不多就就先提交给用户使用了,后面逻辑又增加了,计算时间变长,整个计算一遍居然要将近10分钟了。这个对用户来说是能接收的(原来自己手算需要好几天呢),但是我自己接受不了,于是就开始优化了,怎么优化呢,用多线程呗。

一提到多线程,最先想到的是Task了,毕竟.net4.0以上Task封装了很多好用的方法。但是Task毕竟是多开一些线程去执行任务,最后整合结果,这样可以快一些,但我想更加快速一些,于是想到了另外一个对象:Parallel。之前在维护代码是确实有遇到过别人写的Parallel.Invoke,只是指定这个函数的作用是并发执行多项任务,如果遇到多个耗时的操作,他们之间又不贡献变量这个方法不错。我的情况是要并发执行一个集合,于是就用了List.ForAll 这个方法其实是拓展方法,完整的调用为:List.AsParallel().ForAll,需要先转换成支持并发的集合,等同于Parallel.ForEach,目的是对集合里面的元素并发执行一系列操作。

于是乎,把原来的foreach换成了List.AsParallel().ForAll,运行起来,果然速度惊人,不到两分钟就插入结果了,但最后却是报主键重复的错误,这个错误的原因是,由于使用了并发,这个时候变量自增,其实是在强着自增,当多个线程同时获取到了id值,都去自增然后就重复了,举个例子如下:

            int num = 1;
            List<int> list = new List<int>();
            for (int i = 1; i <= 2000; i++)
            {
                list.Add(i);
            }
            Console.WriteLine($"num初始值为:" + num.ToString());
            list.AsParallel().ForAll(n =>
            {
                num++;
            });
            Console.WriteLine($"不加锁,并发{list.Count}次后为:" + num.ToString());
            Console.ReadKey();

这段代码是让一个变量执行2000次自增,正常结果应该是2001,但实际结果如下:

有经验的同学,立马能想到需要加锁了,C#内置了很多锁对象,如lock 互斥锁,Interlocked 内部锁,Monitor 这几个比较常见,lock内部实现其实就是使用了Monitor对象。对变量自增,Interlocked对象提供了,变量自增,自减、或者相加等方法,我们使用自增方法Interlocked.Increment,函数定义为:int Increment(ref int num),该对象提供原子性的变量自增操作,传入目标数值,返回或者ref num都是自增后的结果。 在之前的基础上我们增加一些代码:

           num = 1;
            Console.WriteLine($"num初始值为:" + num.ToString());
            list.AsParallel().ForAll(n =>
            {
                Interlocked.Increment(ref num);
            });
            Console.WriteLine($"使用内部锁,并发{list.Count}次后为:" + num.ToString());
            Console.ReadKey();

我们来看运行结果:

加了锁之后ID重复算是解决了,其实别高兴太早,由于正常的环境有了ID我们还有用这些ID来构建对象呢,于是又写了写代码,用集合来添加这些ID,为了更真实的模拟生产环境,我在forAll里面又加了一层循环代码如下:

            num = 1;
            Random random = new Random();
            var total = 0;
            var m = new ConcurrentBag<int>();
            list.AsParallel().ForAll(n =>
            {
                var c = random.Next(1, 50);
                Interlocked.Add(ref total, c);
                for (int i = 0; i < c; i++)
                {
                    Interlocked.Increment(ref num);
                    m.Add(num);
                }
            });
            Console.WriteLine($"使用内部锁,并发+内部循环{list.Count}次后为:" + num.ToString());
            Console.WriteLine($"实际值为:{total + 1}");
            var l = m.GroupBy(n => n).Where(o => o.Count() > 1);
            Console.WriteLine($"并发里面使用安全集合ConcurrentBag添加num,集合重复值:{l.Count()}个");
            Console.ReadKey();

上面的代码里面我用到了线程安全集合ConcurrentBag<T>它的命名空间是:using System.Collections.Concurrent,尽管使用了线程安全集合,但是在并发面前仍然是不安全的,到了这里其实比较郁闷了,自增加锁,安全集合内部应该也使用了锁,但还是重复了。有点说不过去了,想想多线程执行时有个上下文对象,即当多个线程同时执行任务,共享了变量他们一开始传进去的对象数值应该是相同的,由于变量自增时加了锁,所以ID是不会重复了。我猜测问题应该出在Add方法了,就是说当num值自增后还没有来得及传出去就已经执行了Add方法,故添加了重复变量。于是乎,我重新写了段代码,让ID自增和集合添加都放到锁里面:

            num = 1;
            total = 0;
            using (var q = new BlockingCollection<int>())
            {
                list.AsParallel().ForAll(n =>
                {
                    var c = random.Next(1, 50);
                    Interlocked.Add(ref total, c);
                    for (int i = 0; i < c; i++)
                    {

                       // Task.Delay(100);
                        q.Add(Interlocked.Increment(ref num));

                        //可控
                        //lock (objLock)
                        //{
                        //    num++;
                        //    q.Add(num);
                        //}
                    }

                });
                q.CompleteAdding();
                Console.WriteLine($"num累计值为:{total},并发之后值为:{num}");
                var x = q.GroupBy(n => n).Where(o => o.Count() > 1);
                Console.WriteLine($"并发使用安全集合BlockingCollection+Interlocked添加num,集合重复值:{x.Count()}个");
                Console.ReadKey();
            }

这里我测试了另外一个线程安全的集合BlockingCollection,关于这个集合的使用请自行查找MSDN文档,上面的关键代码直接添加安全集合的返回值,可以保证集合不会重复,但其实下面的lock更适用与正式环境,因为我们添加的一般都是对象不会是基础类型数值,运行结果如下:

至此,我们的问题解决了,计算时间由原来的9分多降至110秒左右,可见Parallel的处理还是很给力的,唯一不足的是,很占CPU,执行计算后CPU达到了88%。附上计算结果:

优化前后对比

总结:C#安全集合在并发的情况下其实不一定是安全的,还是需要结合实际应用场景和验证结果为准。Parallel.ForEach在对循环数量可观的情况下是可以去使用的,如果有共享变量,一定要配合锁做同步处理。还是得慎用这个方法,如果方法内部有操作数据库的记得增加事务处理,否则就呵呵了。

原文地址:https://www.cnblogs.com/heweijian/p/11330282.html

时间: 2024-08-26 23:45:50

C#并发实战Parallel.ForEach使用的相关文章

何时使用 Parallel.ForEach,何时使用 PLINQ

翻译自:When Should I Use Parallel.ForEach? When Should I Use PLINQ? 原作者: Pamela Vagata, Parallel Computing Platform Group, Microsoft Corporation 原文pdf:http://download.csdn.net/detail/sqlchen/7509513 ======================================================

Parallel.Foreach的全部知识要点【转】

简介 当需要为多核机器进行优化的时候,最好先检查下你的程序是否有处理能够分割开来进行并行处理.(例如,有一个巨大的数据集合,其中的元素需要一个一个进行彼此独立的耗时计算). .net framework 4 中提供了 Parallel.ForEach 和 PLINQ 来帮助我们进行并行处理,本文探讨这两者的差别及适用的场景. Parallel.ForEach Parallel.ForEach 是 foreach 的多线程实现,他们都能对 IEnumerable<T> 类型对象进行遍历,Para

java并发实战--java线程的带来的问题

最近小编被一家互联网企业给敲响了警钟,感觉在java面试当中java并发问题是每一家企业都关心的问题.所以准备恶补这方面的漏洞.决定在工作之余,好好学习<java并发实战>这本书,欢迎各位大神前来吐槽和分享. 线程带来的风险问题 1.安全性问题 当用户在执行如下程序是会产生安全性问题: public class UnSafeSquence{ private int value; public int getNext(){ return value++; } } 如果执行单个线程是不会有问题的,

Parallel for-each loops in .NET C# z

An IEnumerable object An Action of T which is used to process each item in the list List<string> dataList = new List<string> { "this", "is", "random", "sentence", "hello", "goodbye" }

Parallel ForEach For 多线程并行计算使用注意

之前用DataTable进行遍历,出现索引超出范围问题,因为List<T>也只支持单线程,改用 ConcurrentBag解决问题.在Parallel ForEach/For 外的变量要避免同时操作同一个变量造成 数据不一致的情况. List<T> 转为 ConcurrentBag ConcurrentBag<T> concT= new ConcurrentBag<T>(this.db.T.ToList());

Parallel.ForEach , ThreadPool.QueueUserWorkItem

1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading; 6 using System.Threading.Tasks; 7 8 namespace ParallelThreadPool 9 { 10 class Program 11 { 12 static void Main(string[] args) 13 {

C#多线程 为多核处理器而生的多线程方法Parallel.For和Parallel.ForEach

1.在.net4.0中,有了一个新的类库:任务并行库.它极大地简化了并行编程且内容丰富.这里仅介绍其中最简单的 Parallel.For循环和Parallel.ForEach循环.它们位于System.Threading.Tasks命名空间.它们是两个方法,这两个方法将迭代分别放在不同的处理器上并行处理,如果机器是多处理器或多核处理器,这样就会使性能大大提升. 2.例子用Parallel.For做计算,为数组赋值并打印,用Parallel.ForEach计算字符串数组每个元素的长度,运行结果:

Java秒杀系统方案优化---高性能高并发实战

Java秒杀系统方案优化---高性能高并发实战网盘地址:https://pan.baidu.com/s/1htNv2zq 密码: ssyt备用地址(腾讯微云):https://share.weiyun.com/889808c023b6e9d9f504399a5b07276f 密码:1WaUHB 亮眼的!高并发秒杀系统核心技术 课程以"秒杀"场景为例,但技术都是通用的,举一反三,方得始终应对大并发:多层次多粒度缓存+消息队列异步+服务器分布式部署 专业的压测工具:有依有据,鉴证系统的优化

Netty Redis 亿级流量 高并发 实战 (长文 修正版)

目录 疯狂创客圈 Java 分布式聊天室[ 亿级流量]实战系列之 -30[ 博客园 总入口 ] 写在前面 1.1. 快速的能力提升,巨大的应用价值 1.1.1. 飞速提升能力,并且满足实际开发要求 1.1.2. 越来越多.大量的应用场景 1.2. 高并发架构中的6大集群 1.2.1. 支撑亿级流量的IM整体架构 1.2.2. IM通讯协议介绍 1.2.3. 长连接和短连接 1.2.4. 技术选型 1.3. 基于Redis 设计分布式Session 1.3.1. SessionLocal本地会话