// 线程任务基类和线程池基类文件 #ifndef _ITHREADPOOL_H #define _ITHREADPOOL_H class IHandle { public: IHandle(){} virtual ~IHandle(){} public: virtual void Run() = 0; }; class IThreadPool { public: IThreadPool(){} virtual ~IThreadPool(){} public: virtual void SetMaxNum(int iMaxNum) = 0; public: virtual int Start() = 0; virtual void Stop() = 0; public: virtual int Dispatch(IHandle *pHandle) = 0; }; #endif // 线程池实现类头文件 #ifndef _THREADPOOL_H #define _THREADPOOL_H #include <Windows.h> #include "CircleQueue.h" #include "IThreadPool.h" class CThreadPool : public IThreadPool { public: CThreadPool(); virtual ~CThreadPool(); public: void SetMaxNum(int iMaxNum); public: int Start(); void Stop(); public: int Dispatch(IHandle *pHandle); private: static DWORD WINAPI ThreadFun(LPVOID lParam); private: bool m_bIsRun; int m_iMaxNum; HANDLE m_hStopEvent; CircleQueue m_Queue; CRITICAL_SECTION m_CS; CONDITION_VARIABLE m_ConVar; }; #endif // 线程池 实现类源文件 #include "ThreadPool.h" #include <iostream> using namespace std; const int OK = 0; const int ERR = -1; CThreadPool::CThreadPool() : m_bIsRun(false), m_iMaxNum(0) { } CThreadPool::~CThreadPool() { } void CThreadPool::SetMaxNum(int iMaxNum) { m_iMaxNum = iMaxNum; } int CThreadPool::Start() { int iRev = ERR; // 初始化 m_bIsRun = true; InitializeConditionVariable(&m_ConVar); InitializeCriticalSection(&m_CS); // 创建线程 HANDLE hThread = NULL; for (int i = 0; i < m_iMaxNum; i++) { hThread = CreateThread(NULL, 0, ThreadFun, this, 0, NULL); CloseHandle(hThread); } // 创建停止事件 m_hStopEvent = CreateEvent(NULL, FALSE, FALSE, NULL); if (m_hStopEvent != NULL) { iRev = OK; } return iRev; } void CThreadPool::Stop() { m_bIsRun = false; WakeAllConditionVariable(&m_ConVar); WaitForSingleObject(m_hStopEvent, INFINITE); CloseHandle(m_hStopEvent); m_hStopEvent = NULL; DeleteCriticalSection(&m_CS); } int CThreadPool::Dispatch(IHandle *pHandle) { EnterCriticalSection(&m_CS); m_Queue.push(pHandle); LeaveCriticalSection(&m_CS); WakeConditionVariable(&m_ConVar); return 0; } DWORD WINAPI CThreadPool::ThreadFun(LPVOID lParam) { IHandle *pIHandle = NULL; CThreadPool *pThreadPool = (CThreadPool *)lParam; while(pThreadPool->m_bIsRun) { EnterCriticalSection(&pThreadPool->m_CS); while(pThreadPool->m_Queue.getLength() == 0) { SleepConditionVariableCS(&pThreadPool->m_ConVar, &pThreadPool->m_CS, INFINITE); if (!pThreadPool->m_bIsRun) break; } pIHandle = (IHandle *)pThreadPool->m_Queue.pop(); LeaveCriticalSection(&pThreadPool->m_CS); if (!pThreadPool->m_bIsRun) { delete pIHandle; break; } if (pIHandle != NULL) { pIHandle->Run(); delete pIHandle; } } EnterCriticalSection(&pThreadPool->m_CS); if (--pThreadPool->m_iMaxNum == 0) { SetEvent(pThreadPool->m_hStopEvent); cout << pThreadPool->m_iMaxNum << endl; } LeaveCriticalSection(&pThreadPool->m_CS); return 0; } // 这里还使用到一个环形队列 // 头文件 #ifndef _CIRCLEQUEUE_H #define _CIRCLEQUEUE_H class CircleQueue { public: CircleQueue(); virtual ~CircleQueue(); public: void push(void * item); void * pop(); void * top(); int getLength(); private: void ** mEntries; unsigned int mHead; // 可以读的位置 unsigned int mTail; // 可以写的位置 unsigned int mCount; // 元素的个数 unsigned int mMaxCount; // 总的数量 }; #endif // 环形队列源文件 #include "CircleQueue.h" #include "stdlib.h" #include "memory.h" CircleQueue :: CircleQueue() { mMaxCount = 8; mEntries = (void**)malloc( sizeof( void * ) * mMaxCount ); mHead = mTail = mCount = 0; } CircleQueue :: ~CircleQueue() { free( mEntries ); mEntries = NULL; } void CircleQueue :: push( void * item ) { if( mCount >= mMaxCount ) { mMaxCount = ( mMaxCount * 3 ) / 2 + 1; void ** newEntries = (void**)malloc( sizeof( void * ) * mMaxCount ); unsigned int headLen = 0, tailLen = 0; if( mHead < mTail ) { headLen = mTail - mHead; } else { headLen = mCount - mTail; tailLen = mTail; } memcpy( newEntries, &( mEntries[ mHead ] ), sizeof( void * ) * headLen ); if( tailLen ) { memcpy( &( newEntries[ headLen ] ), mEntries, sizeof( void * ) * tailLen ); } mHead = 0; mTail = headLen + tailLen; free( mEntries ); mEntries = newEntries; } mEntries[ mTail++ ] = item; mTail = mTail % mMaxCount; mCount++; } void * CircleQueue :: pop() { void * ret = NULL; if( mCount > 0 ) { ret = mEntries[ mHead++ ]; mHead = mHead % mMaxCount; mCount--; } return ret; } void * CircleQueue :: top() { return mCount > 0 ? mEntries[ mHead ] : NULL; } int CircleQueue :: getLength() { return mCount; } // main 函数实现 这里没有删除对象 #include <iostream> #include "ThreadPool.h" using namespace std; class TestHandle : public IHandle { public: virtual void Run() { cout << GetCurrentThreadId() << "---------"; cout << "Hello" << endl; } }; int main() { CThreadPool threadPool; threadPool.SetMaxNum(10); threadPool.Start(); for (int i = 0; i < 100; i++) { threadPool.Dispatch(new TestHandle()); } Sleep(1000*5); threadPool.Stop(); return 0; }
时间: 2024-10-13 21:34:35