c#:ThreadPool实现并行分析,并实现线程同步结束

  • 背景:

一般情况下,经常会遇到一个单线程程序时执行对CPU,MEMORY,IO利用率上不来,且速度慢下问题;那么,怎么解决这些问题呢?

据我个人经验来说有以下两种方式:

1、并行、多线程(Parallel、Task、ThreadPool)

2、多进程MutilpleProcess

恰好工作中又一次遇到单线程程序性能低的问题,本次我主要想尝试使用ThreadPool来实现多线程,并且在实现多线程任务同步结束。

测试代码:

 1  static void Main(string[] args)
 2         {
 3             using (ManualResetEvent finish = new ManualResetEvent(false))
 4             {
 5                 int maxThreadCount = 100;
 6                 for (var i = 0; i < 100; i++) {
 7                     ThreadPool.QueueUserWorkItem((Object state)=> {
 8                         Console.WriteLine("task:{0}",state);
 9
10                         // 以原子操作的形式递减指定变量的值并存储结果。
11                         if (Interlocked.Decrement(ref maxThreadCount) == 0) {
12                             // 将事件状态设置为有信号,从而允许一个或多个等待线程继续执行。
13                             finish.Set();
14                         }
15                     }, i);
16                 }
17
18                 // 阻止当前线程,直到当前 System.Threading.WaitHandle 收到信号。
19                 finish.WaitOne();
20             }
21
22             Console.WriteLine("Complete!");
23             Console.ReadKey();
  • 实现多线程时,需要注意事项:

可是一般情况下遇到这种业务的情况下,只要修改多线程,必然会遇到某个对象不允许被多个线程操作的问题。

比如:

1、多个线程同时向一个文件中写入内容,这种情况一般使用锁来包成被访问对象的安全性。比如:互斥锁(lock、Mutex)、读写锁(ReadWriteLock)、Monitor、Semaphore(信号灯)、InterLocked(内存共享)等。

2、多个线程同时修改一个非线程安全集合对象(List,Collection,Dictionary,Bag,Queue,Stack,ArrayList,Array,HashTable等)时,往往会抛出异常。针对这种情况,需要使用命名空间System.Collections.Concurrent.*下支持线程安全的集合、字典、队列、栈等对象来替代。

  • 业务场景:

我们需要对一个多行文本文件进行解析,根据具体地址解析其中的经纬度信息。如果解析过程中解析失败的行,需要记录到一个_error.txt;解析成功的记录行,记录到_result.txt。使用单线程分析过程中已经遇到了性能低问题,需求解决方案是使用ThreadPool技术。

  • 业务实现:
  1         private static int maxThreadCount = 0;
  2         private static int fakeMaxThreadCount = int.MaxValue;
  3         private static ManualResetEvent finish = new ManualResetEvent(false);
  4         private static object errorLocker = new object();
  5         private static object resultLocker = new object();
  6         private static object maxThreadCountLcker = new object();
  7
  8         public void ParserFile(string filePath)
  9         {
 10             using (StreamWriter writerError = new StreamWriter(filePath + "_error"))
 11             {
 12                 using (StreamWriter writerResult = new StreamWriter(filePath + "_result"))
 13                 {
 14                     finish = new ManualResetEvent(false);
 15                     using (StreamReader reader = new StreamReader(filePath))
 16                     {
 17                         string line = reader.ReadLine();
 18                         while (line != null)
 19                         {
 20                             maxThreadCount++;
 21                             ThreadPool.QueueUserWorkItem(DoWork, new object[] { line, writerError, writerResult
 22 });
 23
 24                             line = reader.ReadLine();
 25                         }
 26                     }
 27
 28                     maxThreadCount++;
 29                     lock (maxThreadCountLcker)
 30                     {
 31                         fakeMaxThreadCount = maxThreadCount;
 32                     }
 33
 34                     ThreadPool.QueueUserWorkItem(DoWork, new object[] { });
 35
 36                     finish.WaitOne();
 37                     finish.Close();
 38                     finish.Dispose();
 39                 }
 40             }
 41         }
 42
 43
 44
 45         private void DoWork(object state)
 46         {
 47             object[] objectItem = state as object[];
 48             if (objectItem.Length != 3)
 49             {
 50                 if (Interlocked.Decrement(ref fakeMaxThreadCount) == 0)
 51                 {
 52                     finish.Set();
 53                 }
 54                 return;
 55             }
 56             string line = objectItem[0].ToString();
 57             StreamWriter writerError = objectItem[1] as StreamWriter;
 58             StreamWriter writerResult = objectItem[2] as StreamWriter;
 59
 60             try
 61             {
 62                 string[] fields = line.Split(new char[] { ‘|‘ });
 63
 64                 string imsi = fields[0];
 65                 string city = fields[1];
 66                 string county = fields[2];
 67                 string address = fields[3];
 68
 69                 // http://restapi.amap.com/v3/geocode/geo?key=7de8697669288fc848e12a08f58d995e&s=rsv3&city=**市&address=**省**市**区**路23号
 70                 string uri = " http://restapi.amap.com/v3/geocode/geo";
 71                 string parameter = string.Format("key={0}&s={1}&city={2}&address={3}", "7de8697669288fc848e12a08f58d995e", "rsv3", "**(市名称)", address);
 72
 73                 // {"status":"1","info":"OK","infocode":"10000","count":"1","geocodes":[{"formatted_address":"***省**市**区***路|23号","province":"***","citycode":"***","city":"***市","district":"***区","township":[],"neighborhood":{"name":[],"type":[]},"building":{"name":[],"type":[]},"adcode":"330105","street":[],"number":[],"location":"120.151367,30.362293","level":"门牌号"}]}
 74                 string result = GetRequesetContext(uri, parameter);
 75                 if (string.IsNullOrEmpty(result) || result.IndexOf("location") == -1)
 76                 {
 77                     lock (errorLocker)
 78                     {
 79                         writerError.WriteLine(result);
 80                     }
 81                 }
 82                 else
 83                 {
 84                     int indexCount = 0;
 85                     List<string> lnglatItems = new List<string>();
 86                     foreach (string resultItem in result.Split(new string[] { "\",\"", ",\"" }, StringSplitOptions.RemoveEmptyEntries))
 87                     {
 88                         if (resultItem.IndexOf("location") != -1)
 89                         {
 90                             indexCount++;
 91                             lnglatItems.Add(resultItem.Split(new char[] { ‘:‘ })[1].Replace("\"", string.Empty));
 92                         }
 93                     }
 94                     if (indexCount == 1)
 95                     {
 96                         lock (resultLocker)
 97                         {
 98                             writerResult.WriteLine(address + "|" + lnglatItems[0] + "|" + imsi);
 99                         }
100                     }
101                     else
102                     {
103                         lock (resultLocker)
104                         {
105                             writerError.WriteLine(address + "|" + string.Join(",", lnglatItems) + "|" + imsi);
106                         }
107                     }
108                 }
109             }
110             catch (Exception ex)
111             {
112                 logger.Error("{0}\r\n{1}", ex.Message, ex.StackTrace);
113                 lock (errorLocker)
114                 {
115                     writerError.WriteLine(line);
116                 }
117             }
118             finally
119             {
120                 lock (maxThreadCountLcker)
121                 {
122                     if (Interlocked.Decrement(ref fakeMaxThreadCount) == 0)
123                     {
124                         finish.Set();
125                     }
126                 }
127             }
128         }
时间: 2024-10-23 13:42:17

c#:ThreadPool实现并行分析,并实现线程同步结束的相关文章

分析.Net里线程同步机制

我们知道并行编程模型两种:一种是基于消息式的,第二种是基于共享内存式的. 前段时间项目中遇到了第二种 使用多线程开发并行程序共享资源的问题 ,今天以实际案例出发对.net里的共享内存式的线程同步机制做个总结,由于某些类库的应用属于基础,所以本次不对基本使用做出讲解,基本使用 MSDN是最好的教程. 一.volatile关键字      基本介绍: 封装了 Thread.VolatileWrite() 和  Thread.VolatileRead()的实现 ,主要作用是强制刷新高速缓存.     

C#并行编程-线程同步原语(Barrier,CountdownEvent,ManualResetEventSlim,SemaphoreSlim,SpinLock,SpinWait,Monitor,volatile)

菜鸟学习并行编程,参考<C#并行编程高级教程.PDF>,如有错误,欢迎指正. 背景 有时候必须访问变量.实例.方法.属性或者结构体,而这些并没有准备好用于并发访问,或者有时候需要执行部分代码,而这些代码必须单独运行,这是不得不通过将任务分解的方式让它们独立运行. 当任务和线程要访问共享的数据和资源的时候,您必须添加显示的同步,或者使用原子操作或锁. 之前的.NET Framework提供了昂贵的锁机制以及遗留的多线程模型,新的数据结构允许细粒度的并发和并行化,并且降低一定必要的开销,这些数据结

C#并行编程-线程同步原语

原文:C#并行编程-线程同步原语 菜鸟学习并行编程,参考<C#并行编程高级教程.PDF>,如有错误,欢迎指正. 背景 有时候必须访问变量.实例.方法.属性或者结构体,而这些并没有准备好用于并发访问,或者有时候需要执行部分代码,而这些代码必须单独运行,这是不得不通过将任务分解的方式让它们独立运行. 当任务和线程要访问共享的数据和资源的时候,您必须添加显示的同步,或者使用原子操作或锁. 之前的.NET Framework提供了昂贵的锁机制以及遗留的多线程模型,新的数据结构允许细粒度的并发和并行化,

8天玩转并行开发——第七天 简要分析任务与线程池

原文:8天玩转并行开发--第七天 简要分析任务与线程池 其实说到上一篇,我们要说的task的知识也说的差不多了,这一篇我们开始站在理论上了解下“线程池”和“任务”之间的关系,不管是 说线程还是任务,我们都不可避免的要讨论下线程池,然而在.net 4.0以后,线程池引擎考虑了未来的扩展性,已经充分利用多核微处理器 架构,只要在可能的情况下,我们应该尽量使用task,而不是线程池. 首先看一下task的结构 从图中我们可以看出Task.Factory.StartNew()貌似等同于用ThreadPo

《Java源码分析》:线程池 ThreadPoolExecutor

<Java源码分析>:线程池 ThreadPoolExecutor ThreadPoolExecutor是ExecutorService的一张实现,但是是间接实现. ThreadPoolExecutor是继承AbstractExecutorService.而AbstractExecutorService实现了ExecutorService接口. 在介绍细节的之前,先介绍下ThreadPoolExecutor的结构 1.线程池需要支持多个线程并发执行,因此有一个线程集合Collection来执行

并发,并行,进程,线程,同步,异步

一个应用程序至少有一个进程,一个进程至少有一个线程. 并发,在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一个时刻点上只有一个程序在处理机上运行. 并发当有多个线程在操作时,如果系统只有一个CPU,则它根本不可能真正同时进行一个以上的线程,它只能把CPU运行时间划分成若干个时间段,再将时间 段分配给各个线程执行,在一个时间段的线程代码运行时,其它线程处于挂起状..这种方式我们称之为并发(Concurrent). 并行:当系统有一

Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理

相关文章目录: Java线程池ThreadPoolExecutor使用和分析(一) Java线程池ThreadPoolExecutor使用和分析(二) - execute()原理 Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理 以下是本文的目录大纲: 一.shutdown()  --  温柔的终止线程池 interruptIdleWorkers()  --  中断空闲worker tryTerminate()  --  尝试终止线程池 二.shutdown

C#并行编程(6):线程同步面面观

理解线程同步 线程的数据访问 在并行(多线程)环境中,不可避免地会存在多个线程同时访问某个数据的情况.多个线程对共享数据的访问有下面3种情形: 多个线程同时读取数据: 单个线程更新数据,此时其他线程读取数据: 多个线程同时更新数据. 显而易见,多个线程同时读取数据是不会产生任何问题的.仅有一个线程更新数据的时候,貌似也没有问题,但真的没有问题吗?多个线程同时更新数据,很明显,你可能把我的更改覆盖掉了,数据从此不再可信. 什么是线程同步 为了解决多线程同时访问共享数据可能导致数据被破坏的问题,我们

Linux多线程实现及线程同步函数分析

在Linux中,多线程的本质仍是进程,它与进程的区别: 进程:独立地址空间,拥有PCB 线程:也有PCB,但没有独立的地址空间(共享) 线程的特点: 1,线程是轻量级进程,有PCB,创建线程使用的底层函数和进程一样,都是clone 2,从内核看进程和线程是一样的,都有各自不同的PCB 3,进程可以蜕变成线程 4,在LINUX中,线程是最小的执行单位,进程是最小的分配资源单位 查看指定线程的LWP号命令: ps -Lf pid 线程优点: 提高程序并发性 开销小 数据通信,共享数据方便 线程缺点: