此线程池所依赖的线程类,请参看《一个Windows C++的线程类实现》:
ThreadPoolExecutor.h
1 #ifndef __THREAD_POOL_EXECUTOR__ 2 #define __THREAD_POOL_EXECUTOR__ 3 4 #include "Thread.h" 5 #include <set> 6 #include <list> 7 #include <windows.h> 8 9 class CThreadPoolExecutor 10 { 11 public: 12 CThreadPoolExecutor(void); 13 ~CThreadPoolExecutor(void); 14 15 /** 16 初始化线程池,创建minThreads个线程 17 **/ 18 bool Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTaskse); 19 20 /** 21 执行任务,若当前任务列表没有满,将此任务插入到任务列表,返回true 22 若当前任务列表满了,但当前线程数量小于最大线程数,将创建新线程执行此任务,返回true 23 若当前任务列表满了,但当前线程数量等于最大线程数,将丢弃此任务,返回false 24 **/ 25 bool Execute(Runnable * pRunnable); 26 27 /** 28 终止线程池,先制止塞入任务, 29 然后等待直到任务列表为空, 30 然后设置最小线程数量为0, 31 等待直到线程数量为空, 32 清空垃圾堆中的任务 33 **/ 34 void Terminate(); 35 36 /** 37 返回线程池中当前的线程数量 38 **/ 39 unsigned int GetThreadPoolSize(); 40 41 private: 42 /** 43 获取任务列表中的任务,若任务列表为空,返回NULL 44 **/ 45 Runnable * GetTask(); 46 47 static unsigned int WINAPI StaticThreadFunc(void * arg); 48 49 private: 50 class CWorker : public CThread 51 { 52 public: 53 CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask = NULL); 54 ~CWorker(); 55 void Run(); 56 57 private: 58 CThreadPoolExecutor * m_pThreadPool; 59 Runnable * m_pFirstTask; 60 volatile bool m_bRun; 61 }; 62 63 typedef std::set<CWorker *> ThreadPool; 64 typedef std::list<Runnable *> Tasks; 65 typedef Tasks::iterator TasksItr; 66 typedef ThreadPool::iterator ThreadPoolItr; 67 68 ThreadPool m_ThreadPool; 69 ThreadPool m_TrashThread; 70 Tasks m_Tasks; 71 72 CRITICAL_SECTION m_csTasksLock; 73 CRITICAL_SECTION m_csThreadPoolLock; 74 75 volatile bool m_bRun; 76 volatile bool m_bEnableInsertTask; 77 volatile unsigned int m_minThreads; 78 volatile unsigned int m_maxThreads; 79 volatile unsigned int m_maxPendingTasks; 80 }; 81 82 #endif
ThreadPoolExecutor .cpp
1 #include "stdafx.h" 2 3 #include "ThreadPoolExecutor.h" 4 5 CThreadPoolExecutor::CWorker::CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask) : 6 m_pThreadPool(pThreadPool), 7 m_pFirstTask(pFirstTask), 8 m_bRun(true) 9 { 10 11 } 12 13 CThreadPoolExecutor::CWorker::~CWorker() 14 { 15 } 16 17 /** 18 执行任务的工作线程。 19 当前没有任务时, 20 如果当前线程数量大于最小线程数量,减少线程, 21 否则,执行清理程序,将线程类给释放掉 22 **/ 23 void CThreadPoolExecutor::CWorker::Run() 24 { 25 Runnable * pTask = NULL; 26 while(m_bRun) 27 { 28 if(NULL == m_pFirstTask) 29 { 30 pTask = m_pThreadPool->GetTask(); 31 } 32 else 33 { 34 pTask = m_pFirstTask; 35 m_pFirstTask = NULL; 36 } 37 38 if(NULL == pTask) 39 { 40 EnterCriticalSection(&(m_pThreadPool->m_csThreadPoolLock)); 41 if(m_pThreadPool->GetThreadPoolSize() > m_pThreadPool->m_minThreads) 42 { 43 ThreadPoolItr itr = m_pThreadPool->m_ThreadPool.find(this); 44 if(itr != m_pThreadPool->m_ThreadPool.end()) 45 { 46 m_pThreadPool->m_ThreadPool.erase(itr); 47 m_pThreadPool->m_TrashThread.insert(this); 48 } 49 m_bRun = false; 50 } 51 else 52 { 53 ThreadPoolItr itr = m_pThreadPool->m_TrashThread.begin(); 54 while(itr != m_pThreadPool->m_TrashThread.end()) 55 { 56 (*itr)->Join(); 57 delete (*itr); 58 m_pThreadPool->m_TrashThread.erase(itr); 59 itr = m_pThreadPool->m_TrashThread.begin(); 60 } 61 } 62 LeaveCriticalSection(&(m_pThreadPool->m_csThreadPoolLock)); 63 continue; 64 } 65 else 66 { 67 pTask->Run(); 68 pTask = NULL; 69 } 70 } 71 } 72 73 ///////////////////////////////////////////////////////////////////////////////////////////// 74 75 CThreadPoolExecutor::CThreadPoolExecutor(void) : 76 m_bRun(false), 77 m_bEnableInsertTask(false) 78 { 79 InitializeCriticalSection(&m_csTasksLock); 80 InitializeCriticalSection(&m_csThreadPoolLock); 81 } 82 83 CThreadPoolExecutor::~CThreadPoolExecutor(void) 84 { 85 Terminate(); 86 DeleteCriticalSection(&m_csTasksLock); 87 DeleteCriticalSection(&m_csThreadPoolLock); 88 } 89 90 bool CThreadPoolExecutor::Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTasks) 91 { 92 if(minThreads == 0) 93 { 94 return false; 95 } 96 if(maxThreads < minThreads) 97 { 98 return false; 99 } 100 m_minThreads = minThreads; 101 m_maxThreads = maxThreads; 102 m_maxPendingTasks = maxPendingTasks; 103 unsigned int i = m_ThreadPool.size(); 104 for(; i<minThreads; i++) 105 { 106 //创建线程 107 CWorker * pWorker = new CWorker(this); 108 if(NULL == pWorker) 109 { 110 return false; 111 } 112 EnterCriticalSection(&m_csThreadPoolLock); 113 m_ThreadPool.insert(pWorker); 114 LeaveCriticalSection(&m_csThreadPoolLock); 115 pWorker->Start(); 116 } 117 m_bRun = true; 118 m_bEnableInsertTask = true; 119 return true; 120 } 121 122 bool CThreadPoolExecutor::Execute(Runnable * pRunnable) 123 { 124 if(!m_bEnableInsertTask) 125 { 126 return false; 127 } 128 if(NULL == pRunnable) 129 { 130 return false; 131 } 132 if(m_Tasks.size() >= m_maxPendingTasks) 133 { 134 if(m_ThreadPool.size() < m_maxThreads) 135 { 136 CWorker * pWorker = new CWorker(this, pRunnable); 137 if(NULL == pWorker) 138 { 139 return false; 140 } 141 EnterCriticalSection(&m_csThreadPoolLock); 142 m_ThreadPool.insert(pWorker); 143 LeaveCriticalSection(&m_csThreadPoolLock); 144 pWorker->Start(); 145 } 146 else 147 { 148 return false; 149 } 150 } 151 else 152 { 153 EnterCriticalSection(&m_csTasksLock); 154 m_Tasks.push_back(pRunnable); 155 LeaveCriticalSection(&m_csTasksLock); 156 } 157 return true; 158 } 159 160 Runnable * CThreadPoolExecutor::GetTask() 161 { 162 Runnable * Task = NULL; 163 EnterCriticalSection(&m_csTasksLock); 164 if(!m_Tasks.empty()) 165 { 166 Task = m_Tasks.front(); 167 m_Tasks.pop_front(); 168 } 169 LeaveCriticalSection(&m_csTasksLock); 170 return Task; 171 } 172 173 unsigned int CThreadPoolExecutor::GetThreadPoolSize() 174 { 175 return m_ThreadPool.size(); 176 } 177 178 void CThreadPoolExecutor::Terminate() 179 { 180 m_bEnableInsertTask = false; 181 while(m_Tasks.size() > 0) 182 { 183 Sleep(1); 184 } 185 m_bRun = false; 186 m_minThreads = 0; 187 m_maxThreads = 0; 188 m_maxPendingTasks = 0; 189 while(m_ThreadPool.size() > 0) 190 { 191 Sleep(1); 192 } 193 EnterCriticalSection(&m_csThreadPoolLock); 194 ThreadPoolItr itr = m_TrashThread.begin(); 195 while(itr != m_TrashThread.end()) 196 { 197 (*itr)->Join(); 198 delete (*itr); 199 m_TrashThread.erase(itr); 200 itr = m_TrashThread.begin(); 201 } 202 LeaveCriticalSection(&m_csThreadPoolLock); 203 }
调用:
1 #include "stdafx.h" 2 #include "Thread.h" 3 #include "ThreadPoolExecutor.h" 4 5 class R : public Runnable 6 { 7 public: 8 ~R() 9 { 10 printf("~R/n"); 11 } 12 void Run() 13 { 14 printf("Hello World\n"); 15 } 16 }; 17 18 19 int _tmain(int argc, _TCHAR* argv[]) 20 { 21 /*R r; 22 CThread * t = NULL; 23 t = new CThread(&r); 24 t->Start(); 25 t->Join(); 26 getchar();*/ 27 CThreadPoolExecutor * pExecutor = new CThreadPoolExecutor(); 28 pExecutor->Init(1, 10, 50); 29 R r; 30 for(int i=0;i<100;i++) 31 { 32 while(!pExecutor->Execute(&r)) 33 { 34 } 35 } 36 pExecutor->Terminate(); 37 delete pExecutor; 38 getchar(); 39 return 0; 40 return 0; 41 }
from:http://blog.csdn.net/huyiyang2010/article/details/5809919
时间: 2024-10-10 00:39:23