一个Windows C++的线程池的实现

此线程池所依赖的线程类,请参看《一个Windows C++的线程类实现》:

ThreadPoolExecutor.h

 1 #ifndef __THREAD_POOL_EXECUTOR__
 2 #define __THREAD_POOL_EXECUTOR__
 3
 4 #include "Thread.h"
 5 #include <set>
 6 #include <list>
 7 #include <windows.h>
 8
 9 class CThreadPoolExecutor
10 {
11 public:
12     CThreadPoolExecutor(void);
13     ~CThreadPoolExecutor(void);
14
15     /**
16       初始化线程池,创建minThreads个线程
17     **/
18     bool Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTaskse);
19
20     /**
21       执行任务,若当前任务列表没有满,将此任务插入到任务列表,返回true
22       若当前任务列表满了,但当前线程数量小于最大线程数,将创建新线程执行此任务,返回true
23       若当前任务列表满了,但当前线程数量等于最大线程数,将丢弃此任务,返回false
24     **/
25     bool Execute(Runnable * pRunnable);
26
27     /**
28       终止线程池,先制止塞入任务,
29       然后等待直到任务列表为空,
30       然后设置最小线程数量为0,
31       等待直到线程数量为空,
32       清空垃圾堆中的任务
33     **/
34     void Terminate();
35
36     /**
37       返回线程池中当前的线程数量
38     **/
39     unsigned int GetThreadPoolSize();
40
41 private:
42     /**
43       获取任务列表中的任务,若任务列表为空,返回NULL
44     **/
45     Runnable * GetTask();
46
47     static unsigned int WINAPI StaticThreadFunc(void * arg);
48
49 private:
50     class CWorker : public CThread
51     {
52     public:
53         CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask = NULL);
54         ~CWorker();
55         void Run();
56
57     private:
58         CThreadPoolExecutor * m_pThreadPool;
59         Runnable * m_pFirstTask;
60         volatile bool m_bRun;
61     };
62
63     typedef std::set<CWorker *> ThreadPool;
64     typedef std::list<Runnable *> Tasks;
65     typedef Tasks::iterator TasksItr;
66     typedef ThreadPool::iterator ThreadPoolItr;
67
68     ThreadPool m_ThreadPool;
69     ThreadPool m_TrashThread;
70     Tasks m_Tasks;
71
72     CRITICAL_SECTION m_csTasksLock;
73     CRITICAL_SECTION m_csThreadPoolLock;
74
75     volatile bool m_bRun;
76     volatile bool m_bEnableInsertTask;
77     volatile unsigned int m_minThreads;
78     volatile unsigned int m_maxThreads;
79     volatile unsigned int m_maxPendingTasks;
80 };
81
82 #endif 

ThreadPoolExecutor .cpp

  1 #include "stdafx.h"
  2
  3 #include "ThreadPoolExecutor.h"
  4
  5 CThreadPoolExecutor::CWorker::CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask) :
  6 m_pThreadPool(pThreadPool),
  7 m_pFirstTask(pFirstTask),
  8 m_bRun(true)
  9 {
 10
 11 }
 12
 13 CThreadPoolExecutor::CWorker::~CWorker()
 14 {
 15 }
 16
 17 /**
 18   执行任务的工作线程。
 19   当前没有任务时,
 20   如果当前线程数量大于最小线程数量,减少线程,
 21   否则,执行清理程序,将线程类给释放掉
 22 **/
 23 void CThreadPoolExecutor::CWorker::Run()
 24 {
 25     Runnable * pTask = NULL;
 26     while(m_bRun)
 27     {
 28         if(NULL == m_pFirstTask)
 29         {
 30             pTask = m_pThreadPool->GetTask();
 31         }
 32         else
 33         {
 34             pTask = m_pFirstTask;
 35             m_pFirstTask = NULL;
 36         }
 37
 38         if(NULL == pTask)
 39         {
 40             EnterCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));
 41             if(m_pThreadPool->GetThreadPoolSize() > m_pThreadPool->m_minThreads)
 42             {
 43                 ThreadPoolItr itr = m_pThreadPool->m_ThreadPool.find(this);
 44                 if(itr != m_pThreadPool->m_ThreadPool.end())
 45                 {
 46                     m_pThreadPool->m_ThreadPool.erase(itr);
 47                     m_pThreadPool->m_TrashThread.insert(this);
 48                 }
 49                 m_bRun = false;
 50             }
 51             else
 52             {
 53                 ThreadPoolItr itr = m_pThreadPool->m_TrashThread.begin();
 54                 while(itr != m_pThreadPool->m_TrashThread.end())
 55                 {
 56                     (*itr)->Join();
 57                     delete (*itr);
 58                     m_pThreadPool->m_TrashThread.erase(itr);
 59                     itr = m_pThreadPool->m_TrashThread.begin();
 60                 }
 61             }
 62             LeaveCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));
 63             continue;
 64         }
 65         else
 66         {
 67             pTask->Run();
 68             pTask = NULL;
 69         }
 70     }
 71 }
 72
 73 /////////////////////////////////////////////////////////////////////////////////////////////
 74
 75 CThreadPoolExecutor::CThreadPoolExecutor(void) :
 76 m_bRun(false),
 77 m_bEnableInsertTask(false)
 78 {
 79     InitializeCriticalSection(&m_csTasksLock);
 80     InitializeCriticalSection(&m_csThreadPoolLock);
 81 }
 82
 83 CThreadPoolExecutor::~CThreadPoolExecutor(void)
 84 {
 85     Terminate();
 86     DeleteCriticalSection(&m_csTasksLock);
 87     DeleteCriticalSection(&m_csThreadPoolLock);
 88 }
 89
 90 bool CThreadPoolExecutor::Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTasks)
 91 {
 92     if(minThreads == 0)
 93     {
 94         return false;
 95     }
 96     if(maxThreads < minThreads)
 97     {
 98         return false;
 99     }
100     m_minThreads = minThreads;
101     m_maxThreads = maxThreads;
102     m_maxPendingTasks = maxPendingTasks;
103     unsigned int i = m_ThreadPool.size();
104     for(; i<minThreads; i++)
105     {
106         //创建线程
107         CWorker * pWorker = new CWorker(this);
108         if(NULL == pWorker)
109         {
110             return false;
111         }
112         EnterCriticalSection(&m_csThreadPoolLock);
113         m_ThreadPool.insert(pWorker);
114         LeaveCriticalSection(&m_csThreadPoolLock);
115         pWorker->Start();
116     }
117     m_bRun = true;
118     m_bEnableInsertTask = true;
119     return true;
120 }
121
122 bool CThreadPoolExecutor::Execute(Runnable * pRunnable)
123 {
124     if(!m_bEnableInsertTask)
125     {
126         return false;
127     }
128     if(NULL == pRunnable)
129     {
130         return false;
131     }
132     if(m_Tasks.size() >= m_maxPendingTasks)
133     {
134         if(m_ThreadPool.size() < m_maxThreads)
135         {
136             CWorker * pWorker = new CWorker(this, pRunnable);
137             if(NULL == pWorker)
138             {
139                 return false;
140             }
141             EnterCriticalSection(&m_csThreadPoolLock);
142             m_ThreadPool.insert(pWorker);
143             LeaveCriticalSection(&m_csThreadPoolLock);
144             pWorker->Start();
145         }
146         else
147         {
148             return false;
149         }
150     }
151     else
152     {
153         EnterCriticalSection(&m_csTasksLock);
154         m_Tasks.push_back(pRunnable);
155         LeaveCriticalSection(&m_csTasksLock);
156     }
157     return true;
158 }
159
160 Runnable * CThreadPoolExecutor::GetTask()
161 {
162     Runnable * Task = NULL;
163     EnterCriticalSection(&m_csTasksLock);
164     if(!m_Tasks.empty())
165     {
166         Task = m_Tasks.front();
167         m_Tasks.pop_front();
168     }
169     LeaveCriticalSection(&m_csTasksLock);
170     return Task;
171 }
172
173 unsigned int CThreadPoolExecutor::GetThreadPoolSize()
174 {
175     return m_ThreadPool.size();
176 }
177
178 void CThreadPoolExecutor::Terminate()
179 {
180     m_bEnableInsertTask = false;
181     while(m_Tasks.size() > 0)
182     {
183         Sleep(1);
184     }
185     m_bRun = false;
186     m_minThreads = 0;
187     m_maxThreads = 0;
188     m_maxPendingTasks = 0;
189     while(m_ThreadPool.size() > 0)
190     {
191         Sleep(1);
192     }
193     EnterCriticalSection(&m_csThreadPoolLock);
194     ThreadPoolItr itr = m_TrashThread.begin();
195     while(itr != m_TrashThread.end())
196     {
197         (*itr)->Join();
198         delete (*itr);
199         m_TrashThread.erase(itr);
200         itr = m_TrashThread.begin();
201     }
202     LeaveCriticalSection(&m_csThreadPoolLock);
203 }  

调用:

 1 #include "stdafx.h"
 2 #include "Thread.h"
 3 #include "ThreadPoolExecutor.h"
 4
 5 class R : public Runnable
 6 {
 7 public:
 8     ~R()
 9     {
10         printf("~R/n");
11     }
12     void Run()
13     {
14         printf("Hello World\n");
15     }
16 };
17
18
19 int _tmain(int argc, _TCHAR* argv[])
20 {
21     /*R r;
22     CThread * t = NULL;
23     t = new CThread(&r);
24     t->Start();
25     t->Join();
26     getchar();*/
27     CThreadPoolExecutor * pExecutor = new CThreadPoolExecutor();
28     pExecutor->Init(1, 10, 50);
29     R r;
30     for(int i=0;i<100;i++)
31     {
32         while(!pExecutor->Execute(&r))
33         {
34         }
35     }
36     pExecutor->Terminate();
37     delete pExecutor;
38     getchar();
39     return 0;
40     return 0;
41 }

from:http://blog.csdn.net/huyiyang2010/article/details/5809919

时间: 2024-10-10 00:39:23

一个Windows C++的线程池的实现的相关文章

线程池? 如何设计一个动态大小的线程池,有哪些方法?

[线程池?  如何设计一个动态大小的线程池,有哪些方法?] 线程池:顾名思义就是事先创建若干个可执行的线程放入一个池(容器)中, 需要的时候从池中获取线程不用自行创建,使用完毕不需要销毁线程而是放回池中, 从而减少创建和销毁线程对象的开销. 系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互.此时,使用线程池可以很好地提高性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池. 与数据库连接池相似,线程池在系统启动时即创建大量空闲的线程,程序将一个Runnable

一个最简单的线程池

import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /**  * 一个最简单的线程池,这个模型很简单,但是很有用  *  * @author leizhimin 2014/8/22 20:21  */ public class Test3 {     private static final ExecutorService threadPool = Executors.newFix

windows下利用线程池完成多任务的分配和运行

在做项目的过程中有时候为了提升效率,用了多线程的方法来对任务进行分割和应用,后来发现,采用线程池的方法能更好的利用线程资源来计算任务,网上有很多关于如何运行线程池的例子,msdn上也给出了对应的例子:https://msdn.microsoft.com/en-us/library/windows/desktop/ms686980(v=vs.85).aspx 感兴趣的话大家可以去看看,这里我给出一个简单的demo,利用线程池单次调用多次调用,例子如下: [cpp] view plain copy

WIndows编程之线程池的使用

不得不说,做C++服务器程序开发,要是不理解线程池,不懂线程池,做C++服务器端的程序就没有任何意义.特别就是上次我因为理解错了线程池而做错了一件事,而被指导人批了一顿,至今记忆犹新,所以趁着周末学了下线程池的使用,小有成绩. 先看一种比较简单的线程池的实现. 1 #include <windows.h> 2 #include <string> 3 #include <iostream> 4 5 using namespace std; 6 7 #define BEGI

一个简单的python线程池框架

初学python,实现了一个简单的线程池框架,线程池中除Wokers(工作线程)外,还单独创建了一个日志线程,用于日志的输出.线程间采用Queue方式进行通信. 代码如下: 1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 __author__ = "pandaychen" 5 6 import Queue 7 import sys 8 import os 9 import threading 10 import time 11

一个Linux下C线程池的实现

在传统服务器结构中, 常是 有一个总的 监听线程监听有没有新的用户连接服务器, 每当有一个新的 用户进入, 服务器就开启一个新的线程用户处理这 个用户的数据包.这个线程只服务于这个用户 , 当 用户与服务器端关闭连接以后, 服务器端销毁这个线程.然而频繁地开辟与销毁线程极大地占用了系统的资源.而且在大量用户的情况下, 系统为了开辟和销毁线程将浪费大量的时间和资源.线程池提供了一个解决外部大量用户与服务器有限资源的矛盾, 线程池和传统的一个用户对应一个线程的处理方法不同, 它的基本思想就是在程序

一个Windows C++的线程类实现(封装API,形成一个类,但不完善。其实可以学习一下Delphi的TThread的写法)

Thread.h [cpp] view plain copy #ifndef __THREAD_H__ #define __THREAD_H__ #include <string> #include   <windows.h> #include   <process.h> class Runnable { public: virtual ~Runnable() {}; virtual void Run() = 0; }; class CThread : public R

windows线程池四种情形(win核心读书笔记)

windows线程池四种情形(win核心读书笔记) Mircosoft从Windows2000引入线程池API,并在Vista后对线程池重新构架,引入新的线程池API.以下所有线程池函数均适用于Vista以后的版本. 用Windows提供的线程池函数有以下几个好处:1,不必要用CreateThread创建线程:2,不必要管理自己线程:3,Windows封装好的线程池,效率高,性能优越. 1 异步方式调用函数 这种方式和我们用CreateThread创建线程的用法差不多,给定一个线程函数模板实现功

Windows线程池

本文主要是参考 博客:http://blog.csdn.net/ithzhang/article/details/8373243 以及自己的一些心得而来. 我们自己也可以创建线程,但是涉及到线程的编码操作比较复杂,容易出现差错.为了简化程序员的工作,Windows提供了一个线程池机制来简化线程的创建.销毁以及日常管理.这个新线程池可能不适用于所有的情况,但大多数情况下它都能够满足我们的需要. 这个线程池能够帮助我们做一下事情: 一:以异步的方式调用一个函数. 二:每隔一段时间调用一个函数. 三: