线程基类头文件:
#ifndef YTHREAD_H #define YTHREAD_H #include <iostream> using namespace std; #define CERR(str) cout<<"error:"<< str<<std::endl #define CINFO(str) cout<<"info:"<< str<<std::endl enum ResCode { RES_OK = 0, RES_ERR = -1, }; class TAutoLock { public: TAutoLock(pthread_mutex_t & lock) : m_lock(&lock) { pthread_mutex_lock(m_lock); } ~TAutoLock(void) { release(); } void release(void) { if (m_lock) { pthread_mutex_unlock(m_lock); m_lock = NULL; } } protected: mutable pthread_mutex_t * m_lock; }; class YThreadState { public: enum typeState { TS_NONE, TS_BUILD, TS_WAITTING, TS_RUNNING, TS_TOEND, TS_ENDING, TS_DEF_END }; //! return the state information static const char * getStateString(typeState nState); //! Constructor YThreadState(void) : m_nState(TS_NONE),m_nResult(RES_OK) { pthread_mutex_init(&m_LockState, NULL); pthread_cond_init(&m_ConditionState,NULL); } virtual ~YThreadState(void) { pthread_mutex_destroy(&m_LockState); pthread_cond_destroy(&m_ConditionState); //! Destructor } //! return the current status typeState getState(void) const { return m_nState; } protected: //! sub-class may decide whether to exit in function Run(); bool mustExit(void) const { return m_nState == TS_TOEND; } //! reset the object‘s state void setState(typeState state) { m_nState = state; } volatile typeState m_nState; int m_nResult; pthread_mutex_t m_LockState; pthread_cond_t m_ConditionState; }; class YThreadBase { public: YThreadBase(void); virtual ~YThreadBase(void) {} enum THREAD_PRIORITY { PRIORITY_HIGHEST = 7, PRIORITY_ABOVE_NORMAL = 1, PRIORITY_NORMAL = 0, PRIORITY_BELOW_NORMAL = -1, PRIORITY_LOWEST = -7, }; protected: #define ThreadFuncReturnCode void* typedef void * (* ThreadFunc)(void * arg); typedef pthread_t typeThreaID; typeThreaID m_nPthreadID; static int threadExit(int nValue); static int threadStartImp(ThreadFunc func,void * arg,int nStackSize,int nPrio,typeThreaID * pThreadID,bool boDetach); virtual int run(void) = 0; protected: int threadStart(ThreadFunc func,void * arg,int nStackSize,int nPriority, bool boDetach) { return threadStartImp(func,arg,nStackSize,nPriority,&m_nPthreadID, boDetach); } bool threadJoin(void ** status = NULL); bool threadDetach(void); void threadYield(void); }; class YThread:public YThreadBase, public YThreadState { public: YThread(void):YThreadBase(), YThreadState() {} virtual ~YThread(void) {} virtual void start(void); void startEx(int nStackSize = 64 * 1024,int nPriority = YThreadBase::PRIORITY_NORMAL); virtual void stop(void); virtual int stopEx(void); virtual void beforeStop(void) =0; unsigned int getThreadID(void) const { return (unsigned int)(m_nPthreadID); } protected: //bool suspend(int nSec = INFINITE_VALUE,int nNSec = INFINITE_VALUE); private: static ThreadFuncReturnCode threadProxy(void * arg); }; #endif // YTHREAD_H
线程基类源文件
#include "ythread.h" //#include "ttmutex.h" #include <assert.h> #include <pthread.h> const char * g_lpszThreadState[] = { "None, runnable object not build", //TS_NONE, 线程还没有建立 "Just Build the runnable object", //TS_BUILD, 线程刚刚建立完 "Waiting for task", //TS_WAITTING, 线程正在等待任务 "Running", //TS_RUNNING, 线程正在执行 "Will be end", //TS_TOEND, 线程准备要结束 "Closing the runnable object", //TS_ENDING, 线程正在结束 "Runnable object is Closed", //TS_END, 线程已经结束 "" //TS_DEF_END }; //------------------------------------------------------------------- const char * YThreadState::getStateString(typeState nState) { return g_lpszThreadState[static_cast<int>(nState)]; } YThreadBase::YThreadBase(void) : m_nPthreadID(0) {} int YThreadBase::threadStartImp(ThreadFunc func,void * arg,int nStackSize,int nPriority,typeThreaID * pThreadID,bool boDetach) { pthread_attr_t attr; pthread_attr_init(&attr); // initialize attr with default attributes pthread_attr_setstacksize (&attr,nStackSize); if (nPriority != 0) { pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED); } int nErrno = pthread_create(pThreadID, &attr, func, arg); pthread_attr_destroy(&attr); if (nErrno == 0 && boDetach) pthread_detach( *pThreadID ); return nErrno; } bool YThreadBase::threadDetach(void) { return pthread_detach(m_nPthreadID) == 0; } void YThreadBase::threadYield(void) { pthread_yield(); } bool YThreadBase::threadJoin(void ** pStatus) { return pthread_join(m_nPthreadID, pStatus) == 0; } int YThreadBase::threadExit(int nValue) { pthread_exit(reinterpret_cast<void *>(nValue)); return nValue; } //------------------------------------------------------------------- void YThread::start(void) { startEx(); } void YThread::startEx(int nStackSize,int nPriority) { TAutoLock _au(m_LockState); int res = YThreadBase::threadStart(threadProxy,this,nStackSize,nPriority,true); if (res<0) CERR("Start Thread Error"); pthread_cond_wait(&m_ConditionState, &m_LockState); } int YThread::stopEx(void) { TAutoLock _au(m_LockState); if (m_nState == TS_NONE || m_nState == TS_ENDING || m_nState == TS_TOEND) return m_nResult; m_nState = TS_TOEND; beforeStop(); while (m_nState != TS_NONE) pthread_cond_wait(&m_ConditionState, &m_LockState); return m_nResult; } void YThread::stop(void) { TAutoLock _au(m_LockState); if (m_nState == TS_NONE || m_nState == TS_ENDING || m_nState == TS_TOEND) return; m_nState = TS_TOEND; // beforeStop(); } ThreadFuncReturnCode YThread::threadProxy(void * arg) { YThread * pThread = reinterpret_cast<YThread *>(arg); pthread_mutex_lock(&pThread->m_LockState); pThread->m_nState = TS_BUILD; pthread_cond_signal(&pThread->m_ConditionState); pthread_mutex_unlock(&pThread->m_LockState); pThread->m_nResult= 0; try { pThread->m_nResult = pThread->run(); } catch(std::exception & se) { CERR(se.what()); } catch(...) { CERR("unkown exp in thread run"); } TAutoLock _au(pThread->m_LockState); pThread->m_nState = TS_NONE; pthread_cond_signal(&pThread->m_ConditionState); threadExit(pThread->m_nResult); return 0; }
线程池类头文件(包括了工作线程)
#ifndef YTHREADPOOL_H #define YTHREADPOOL_H #include "ythread.h" #include <list> struct MYData { std::string strData; int nNum; }; class YMYThreadPool; class YMYThread : public YThread { public: YMYThread(size_t nThreadNum, YMYThreadPool *pParent) :YThread(),m_nThreadNum(nThreadNum) ,m_pParent(pParent) { pthread_mutex_init(&m_lockForDataListAndCond, NULL); pthread_cond_init(&m_condForDataList,NULL); } ~YMYThread() { pthread_mutex_destroy(&m_lockForDataListAndCond); pthread_cond_destroy(&m_condForDataList); } static void initGobal(size_t nParentThreadCount, size_t nDataListLimits); virtual void beforeStop(void) ; virtual int run(void); bool addDataWithoutSigna(const MYData & mydata); bool addData(const MYData & mydata); protected: void setCurrentState(const std::string & strState) { m_tBeginTime = time(NULL); m_strCurrentState = strState; } virtual int handleDataList(void); void dealWithEachData(const MYData & itemdata); typedef std::list<MYData> typeDataList; typeDataList m_datalist; pthread_mutex_t m_lockForDataListAndCond; pthread_cond_t m_condForDataList; YMYThreadPool * m_pParent; size_t m_nThreadNum; //Thread number time_t m_tBeginTime; std::string m_strCurrentState; static size_t g_nDataListSizeLimits; static size_t g_nParentPoolThreadCount; }; class YMYThreadPool { public: YMYThreadPool(void); ~YMYThreadPool(void); void init(size_t nThreadCount,size_t nSunDataLimits); void start(void); void stop(void); YMYThread & getThread(const std::string & strSid); bool pushMission(const MYData & data); bool m_boToExit; private: YMYThread ** m_ppThreads; size_t m_nThreadCount; // TMutexEvent m_AllReadyEvent; bool m_boInit; }; #endif // YTHREADPOOL_H
线程池类源文件(包括工作线程的具体实现)
#include "ythreadpool.h" const int nLimitsDefault = 1000; const int nThreadCountDefault = 1; size_t YMYThread::g_nDataListSizeLimits = static_cast<size_t>(nLimitsDefault); size_t YMYThread::g_nParentPoolThreadCount = static_cast<size_t>(nThreadCountDefault); void YMYThread::initGobal(size_t nPoolThreadCount, size_t nDataListLimits) { g_nParentPoolThreadCount = nPoolThreadCount; g_nDataListSizeLimits = nDataListLimits; } void YMYThread::beforeStop(void) { TAutoLock au(m_lockForDataListAndCond); pthread_cond_signal(&m_condForDataList); } int YMYThread::run(void) { while(!m_pParent->m_boToExit) { try { setCurrentState( "Wait for mission" ); pthread_mutex_lock(&m_lockForDataListAndCond); if (m_datalist.empty()) pthread_cond_wait(&m_condForDataList, &m_lockForDataListAndCond); pthread_mutex_unlock(&m_lockForDataListAndCond); if ( m_pParent->m_boToExit ) break; setCurrentState( "Begin handle search missions" ); handleDataList(); } catch(...) { } } return int(); } bool YMYThread::addDataWithoutSigna(const MYData & data) { if (m_datalist.size() >= g_nDataListSizeLimits) return false; m_datalist.push_back(data); return true; } bool YMYThread::addData(const MYData & data) { TAutoLock au(m_lockForDataListAndCond); if (!addDataWithoutSigna(data)) return false; pthread_cond_signal(&m_condForDataList); return true; } void YMYThread::dealWithEachData(const MYData & data) { std::cout <<"deal each data;"<<std::endl; std::cout<<"thread num:"<<m_nThreadNum<<", data str:"<<data.strData<<", data num:"<<data.nNum <<std::endl; } int YMYThread::handleDataList(void) { if ( m_pParent->m_boToExit ) return int(); typeDataList datalist; //list of mission must to do pthread_mutex_lock(&m_lockForDataListAndCond); //Condition for the mission datalist.swap( m_datalist ); pthread_mutex_unlock(&m_lockForDataListAndCond); setCurrentState( "Handling mission." ); typeDataList::iterator it = datalist.begin(); while (it != datalist.end()) { dealWithEachData(*it); it++; } } /////////////////////////////////////////////////////////////////////// inline size_t getHashIndex(const std::string & strSID,size_t nThreadCount) { std::hash<std::string> hash_fn; //c++11的新特性 size_t nHashValue = hash_fn(strSID); return nHashValue % nThreadCount; } YMYThreadPool::YMYThreadPool():m_ppThreads(NULL),m_nThreadCount(0), m_boInit(false),m_boToExit(false) { } YMYThreadPool::~YMYThreadPool(void) { stop(); } void YMYThreadPool::start(void) { if (!m_boInit) return; for (size_t i = 0; i < m_nThreadCount; ++i) if ( m_ppThreads[i] != NULL ) m_ppThreads[i]->startEx( 2 * 1024 * 1024 ); } void YMYThreadPool::stop(void) { if (m_ppThreads == NULL) return; m_boToExit = true; for (size_t i = 0; i < m_nThreadCount; ++i) { if ( m_ppThreads[i] != NULL ) { m_ppThreads[i]->stopEx(); delete m_ppThreads[i]; } } delete []m_ppThreads; m_ppThreads = NULL; m_nThreadCount = 0; } void YMYThreadPool::init(size_t nThreadCount,size_t nSunDataLimits) { YMYThread::initGobal(nThreadCount, nSunDataLimits); m_nThreadCount = nThreadCount; m_ppThreads = new YMYThread *[nThreadCount]; for (size_t i = 0 ; i < m_nThreadCount; ++i) { m_ppThreads[i] = NULL; m_ppThreads[i] = new YMYThread(i,this/*,&m_AllReadyEvent*/); if (m_ppThreads[i] == NULL) CERR( "Memory not enough for Sub-Thread" ); } m_boInit = true; } bool YMYThreadPool::pushMission(const MYData & data) { if (!m_boInit) return false; unsigned nIndex = getHashIndex(data.strData, m_nThreadCount); return m_ppThreads[nIndex]->addData(data); } YMYThread & YMYThreadPool::getThread(const std::string & strSid) { std::hash<std::string> hash_fn; size_t nHash = hash_fn(strSid); return *(m_ppThreads[ nHash % m_nThreadCount ]); }
main函数,注意请用单例模式去创建线程池
#include "ythreadpool.h" #include <unistd.h> int main() { YMYThreadPool p; p.init(6, 100); p.start(); MYData da1, da2; da1.strData = "ds1"; da1.nNum = 11; da2.strData = "ds2"; da2.nNum = 22; sleep(2); p.pushMission(da1); p.pushMission(da2); sleep(2); p.stop(); return 0; }
时间: 2024-10-12 20:14:21