Nutshell.ThreadWorkerPool .Net线程池设计

功能描述:

  1. 支持创建多个线程池,并统一管理
  2. 支持不同线程池的容量控制,以及最少活动线程的设置
  3. 支持不同线程池中活动线程的闲时设置,即线程空闲时间到期后即自动被回收

结构设计:

  • ThreadWorkerPoolManager: 线程池管理器,用于统一创建,获取,销毁线程池,使用单例模式
  • ThreadWorkerPool: 线程池,用于管理指定数量的线程,由ThreadWorkerPoolManager管理,自身无法创建与销毁
  • TheadWorkerPoolItem: 线程池项,用于包装线程工作器,协助ThreadWorkerPool更好的管理线程,例如取出,放回,闲时的控制
  • TheadWorker: 线程工作器,用于包装系统线程System.Threading.Thread,使其可以重复使用,减少Thrad创建和销毁的性能开销

  结构关系图:

  

 详细设计:

  ThreadWoker

  要点设计:

  1. 完成一次任务后,System.Threading.Thread不能被系统销毁, 默认情况下new Thread(ThreadStart start).Start(), 当ThreadStart委托的任务完成后,系统将销毁该线程,也就是说创建一个System.Threading.Thread实例只能使用一次;为了使线程能被重复使用,ThreadWoker将使用 while+sleeping 的方式对系统线程进行包装,同时使用AutoResetEvent代替Thread.Sleep(timeout)来达到更佳的控制
  2. 闲时设计,线程资源是极其宝贵的系统资源,如果线程池中存在大量的空闲线程这是一种浪费,极端情况下将影响系统的稳定性和工作效率;ThreadWorker将使用AutoResetEvent和事件通知的方式来代替在线程池中定期轮询检查的方式,每完成一个任务将重新开始空闲时间的计算,如果ThreadWorker在线程池中被取出,那么ThreadWorker空闲时间将永远不会到期,直到ThreadWorker被返回线程池后才重新开始空闲时间的计算

  状态图:

  

  关键代码:

  

 1         private void ThreadWorking()
 2         {
 3             while (_status != ThreadWorkerStatus.Abort)
 4             {
 5                 //WaitOne 返回false表示等待超时,true接到取消等待的通知
 6                 //这里利用AutoResetEvent.WaitOne的特性来设计闲时控制,false表示空闲到期,true表示新的任务开始
 7                 if (!_waitEvent.WaitOne(_idleTime))
 8                 {
 9                     if (!_isCanIdleExpired) //_isCanIdleExpired变量控制是否允许超时,例如被取出后将不能超时
10                         continue;
11
12                     _status = ThreadWorkerStatus.Abort;
13                     _waitEvent.Close();
14                     _waitEvent.Dispose();
15                     if (OnIdleExpired != null)
16                         OnIdleExpired(this, null); //空闲到期事件通知
17                     return;
18                 }
19                 else if (_status == ThreadWorkerStatus.Abort)
20                     return;
21
22                 try
23                 {
24                     Working();
25                 }
26                 catch (Exception ex)
27                 {
28                     _logger.TraceEvent(TraceEventType.Error, (int)TraceEventType.Error, ex.ToString());
29                 }
30                 finally
31                 {
32                     _status = ThreadWorkerStatus.Idle;
33                     if (OnWorkCompleted != null)
34                         OnWorkCompleted(this, null); //任务完成事件通知
35                 }
36             }
37         }
 1      public void Work()
 2         {
 3             if (_status == ThreadWorkerStatus.Abort)
 4                 throw new InvalidOperationException("this ThreadWorker was Abort!");
 5
 6             if (_status == ThreadWorkerStatus.Working)
 7                 throw new InvalidOperationException("this ThreadWorker was working, unable to duplicate work!");
 8
 9             _status = ThreadWorkerStatus.Working;
10             _waitEvent.Set(); //通知线程有个新的工作要开始
11         }

  ThreadWorkerPoolItem

  要点设计:

  1. 链接ThreadWorker和线程池,线程池通过ThreadWorkerPoolItem控制ThreadWorker在线程池的取出,放回,销毁
  2. 通过订阅ThreadWorker的空闲到期事件OnIdleExpired,来完成线程池对线程的移除
  3. 通过订阅ThreadWorker的任务完成事件OnWorkCompleted,来完成线程返回线程池的操作
  4. 提供剩余空闲时间查询,来为线程池提供更优线程取出方案

  完整代码:

 1     public sealed class ThreadWorkerPoolItem
 2     {
 3         private ThreadWorkerPoolItemStatus _status;
 4         private readonly ThreadWorkerBase _threadWorker;
 5         private readonly ThreadWorkerPoolBase _threadWorkerPool;
 6         private readonly int _idleTime;
 7         private DateTime _startIdleTime;
 8
 9         internal ThreadWorkerPoolItem(ThreadWorkerPoolBase pool, ThreadWorkerBase threadWorker, ThreadWorkerPoolSettings poolSettings)
10         {
11             _threadWorkerPool = pool;
12             _threadWorker = threadWorker;
13             _threadWorker.OnIdleExpired += _threadWorker_OnIdleExpired;
14             _threadWorker.OnWorkCompleted += _threadWorker_OnWorkCompleted;
15             _threadWorker.Start();
16             _status = ThreadWorkerPoolItemStatus.Idle;
17             _idleTime = poolSettings.IdleTime;
18         }
19
20         void _threadWorker_OnWorkCompleted(object sender, EventArgs args)
21         {
22             _threadWorkerPool.Return(this);
23         }
24
25         void _threadWorker_OnIdleExpired(object sender, EventArgs args)
26         {
27             _threadWorkerPool.Remove(this);
28         }
29
30         internal ThreadWorkerPoolItemStatus Status
31         {
32             get
33             {
34                 if (_threadWorker.Status == ThreadWorkerStatus.Abort || _status == ThreadWorkerPoolItemStatus.Abort)
35                     return ThreadWorkerPoolItemStatus.Abort;
36
37                 return _status;
38             }
39         }
40
41         internal int SurplusIdleTime
42         {
43             get
44             {
45                 if (_status == ThreadWorkerPoolItemStatus.Take || _idleTime == -1)
46                     return -1;
47
48                 int idledTime = (int)(_startIdleTime - DateTime.Now).TotalMilliseconds;
49                 if (idledTime >= _idleTime)
50                     return 0;
51
52                 return idledTime;
53             }
54         }
55
56         internal void SetTake()
57         {
58             _threadWorker.IsCanIdleExpried = false;
59             _status = ThreadWorkerPoolItemStatus.Take;
60         }
61
62         internal void SetIdle()
63         {
64             _startIdleTime = DateTime.Now;
65             _status = ThreadWorkerPoolItemStatus.Idle;
66             _threadWorker.IsCanIdleExpried = true;
67         }
68
69         internal ThreadWorkerBase ThreadWorker
70         {
71             get { return _threadWorker; }
72         }
73     }

  ThreadWorkerPool

  要点设计:

  1. 使用Lock配合ThreadWorkerPoolItem的状态来确保多线程下,每次取出的都是空闲的ThreadWorker
  2. 取出的超时设计,由于线程池有容量控制,高并发下必然导致线程池满负荷,提供超时设置,有利于使用者自行控制满负荷情况下的处理;ThreadWorkerPool将使用while+sleeping的方式,同时使用AutoResetEvent代替Thread.Sleep(timeout)来达到更佳的控制,当一个线程被放回线程池时,另一等待获取者立即获取,而无需等待下一次轮询的到来

  关键代码:

 1         protected bool TryTake(int timeout, out ThreadWorkerBase threadWorker)
 2         {
 3             threadWorker = null;
 4             lock (_takeLocker)
 5             {
 6                 ThreadWorkerPoolItem worker = null;
 7                 DateTime startWaitTime;
 8                 while (!_isDestoryed)
 9                 {
10                     worker = _threadWorkerList.Where(e => e.Status == Core.ThreadWorkerPoolItemStatus.Idle).OrderByDescending(e => e.SurplusIdleTime).FirstOrDefault();
11                     if (worker == null)
12                     {
13                         if (_threadWorkerList.Count < _settings.MaxThreadWorkerCount)
14                         {
15                             worker = this.CreatePoolItem(_threadWorkerList.Count + 1, _settings.IdleTime);
16                             worker.SetTake();
17                             _threadWorkerList.Add(worker);
18                             threadWorker = worker.ThreadWorker;
19                             return true;
20                         }
21
22                         startWaitTime = DateTime.Now;
23                         if (!_takeWaitEvent.WaitOne(timeout))
24                         {
25                             threadWorker = null;
26                             return false;
27                         }
28
29                         if (timeout != -1)
30                         {
31                             timeout = timeout - (int)(DateTime.Now - startWaitTime).TotalMilliseconds;
32                             if (timeout <= 0)
33                             {
34                                 threadWorker = null;
35                                 return false;
36                             }
37                         }
38                         continue;
39                     }
40
41                     threadWorker = worker.ThreadWorker;
42                     worker.SetTake();
43                     return true;
44                 }
45
46                 threadWorker = null;
47                 return false;
48             }
49         }
1         internal void Return(ThreadWorkerPoolItem item)
2         {
3             item.SetIdle();
4             _takeWaitEvent.Set();
5         }

  ThreadWorkerPoolManager使用单例模式管理,代码过于简单这里就不贴了......

  有兴趣的同学可以点击这里进行下载源码查看:Nutshell.ThreadWorkerPool.zip

时间: 2024-10-08 08:32:40

Nutshell.ThreadWorkerPool .Net线程池设计的相关文章

Net线程池设计

Net线程池设计 功能描述: 支持创建多个线程池,并统一管理 支持不同线程池的容量控制,以及最少活动线程的设置 支持不同线程池中活动线程的闲时设置,即线程空闲时间到期后即自动被回收 结构设计: ThreadWorkerPoolManager: 线程池管理器,用于统一创建,获取,销毁线程池,使用单例模式 ThreadWorkerPool: 线程池,用于管理指定数量的线程,由ThreadWorkerPoolManager管理,自身无法创建与销毁 TheadWorkerPoolItem: 线程池项,用

线程池设计

1.创建一个阻塞队列 (1)可以使用链表实现 (2)设置一个最大容量 (3)定义一个插入方法,队列满则阻塞:定义一个移除方法,队列空则阻塞. 2.创建一组线程 (1)每个线程运行时从阻塞队列获取任务执行. (2)线程可以被中断. 3.执行和终止方法实现 (1)调用执行方法,线程池往阻塞队列插入一个任务,然后线程池中的空闲线程不停循环获取任务去执行. (2)调用终止方法,运行中的线程会继续执行到结束,阻塞中的线程不会再执行. 原文地址:https://www.cnblogs.com/xy80hou

缓冲池,线程池,连接池

SSH:[email protected]:unbelievableme/object-pool.git   HTTPS:https://github.com/unbelievableme/object-pool.git 缓冲池 设计要点:包含三个队列:空缓冲队列(emq),装满输入数据的输入的队列(inq),装满输出数据的输出队列(outq),输入程序包括收容输入(hin),提取输入(sin),输出程序包括收容输出(hout)和提取输出(sout). 注意点:输入程序和输出程序会对缓冲区并发访

nginx线程池源码解析

周末看了nginx线程池部分的代码,顺手照抄了一遍,写成了自己的版本.实现上某些地方还是有差异的,不过基本结构全部摘抄. 在这里分享一下.如果你看懂了我的版本,也就证明你看懂了nginx的线程池. 本文只列出了关键数据结构和API,重在理解nginx线程池设计思路.完整代码在最后的链接里. 1.任务节点 typedef void (*CB_FUN)(void *); //任务结构体 typedef struct task { void *argv; //任务函数的参数(任务执行结束前,要保证参数

线程池原理及创建并C++实现

本文给出了一个通用的线程池框架,该框架将与线程执行相关的任务进行了高层次的抽象,使之与具体的执行任务无关.另外该线程池具有动态伸缩性,它能根据执行任务的轻重自动调整线程池中线程的数量.文章的最后,我们给出一个简单示例程序,通过该示例程序,我们会发现,通过该线程池框架执行多线程任务是多么的简单. 为什么需要线程池 目前的大多数网络服务器,包括Web服务器.Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短. 传统多线程方案中我们采用的

线程池原理及创建(C++实现)

http://www.cnblogs.com/lidabo/p/3328402.html 本文给出了一个通用的线程池框架,该框架将与线程执行相关的任务进行了高层次的抽象,使之与具体的执行任务无关.另外该线程池具有动态伸缩性,它能根据执行任务的轻重自动调整线程池中线程的数量.文章的最后,我们给出一个简单示例程序,通过该示例程序,我们会发现,通过该线程池框架执行多线程任务是多么的简单. 为什么需要线程池 目前的大多数网络服务器,包括Web服务器.Email服务器以及数据库服务器等都具有一个共同点,就

简单易用的线程池实现

0 前言 最近在写MySQL冷备server的一个模块,稍微接触到一点线程池的东西,自己也就想尝试写一个简单的线程池练练手. 这个线程池在创建时,即按照最大的线程数生成线程. 然后作业任务通过add_task接口往线程池中加入需要运行的任务,再调用线程池的run函数开始运行所有任务,每个线程从任务队列中读取任务,处理完一个任务后再读取新的任务,直到最终任务队列为空. 1 线程池设计 简单描述如下(假设任务类名为CTasklet): 1.CThreadPool<CTasklet> thread_

LINUX c++线程池框架

版权声明:原文地址及作者不详,如有侵权,请联系: 本文给出了一个通用的线程池框架,该框架将与线程执行相关的任务进行了高层次的抽象,使之与具体的执行任务无关.另外该线程池具有动态伸缩性,它能根据执行任务的轻重自动调整线程池中线程的数量.文章的最后,我们给出一个简单示例程序,通过该示例程序,我们会发现,通过该线程池框架执行多线程任务是多么的简单. 为什么需要线程池 目前的大多数网络服务器,包括Web服务器.Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,

深入解析线程池的使用

为什么需要线程池 目前的大多数网络服务器,包括Web服务器.Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短. 传 统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务.任务执行完毕后,线程退出,这就是是"即时创建,即 时销毁"的策略.尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于 不停的创建线程,