C++线程池2013年的博客迁移

线程基类头文件:

#ifndef YTHREAD_H
#define YTHREAD_H

#include <iostream>
using namespace std;
#define  CERR(str) cout<<"error:"<< str<<std::endl
#define  CINFO(str) cout<<"info:"<< str<<std::endl
enum ResCode
{
    RES_OK = 0,
    RES_ERR = -1,
};

class TAutoLock
{
public:
    TAutoLock(pthread_mutex_t & lock) : m_lock(&lock) { pthread_mutex_lock(m_lock); }
    ~TAutoLock(void) { release(); }

    void release(void)
    {
        if (m_lock)
        {
            pthread_mutex_unlock(m_lock);
            m_lock = NULL;
        }
    }
protected:
    mutable pthread_mutex_t	*	m_lock;
};

class YThreadState
{
public:
    enum typeState
    {
        TS_NONE,
        TS_BUILD,
        TS_WAITTING,
        TS_RUNNING,
        TS_TOEND,
        TS_ENDING,
        TS_DEF_END
    };

    //! return the state information
    static const char * getStateString(typeState nState);

    //! Constructor
    YThreadState(void) : m_nState(TS_NONE),m_nResult(RES_OK)
    {
        pthread_mutex_init(&m_LockState, NULL);
         pthread_cond_init(&m_ConditionState,NULL);
    }

    virtual ~YThreadState(void)
    {
        pthread_mutex_destroy(&m_LockState);
        pthread_cond_destroy(&m_ConditionState);
    //! Destructor
    }

    //! return the current status
    typeState		getState(void) const { return m_nState; }

protected:
    //! sub-class may decide whether to exit in function Run();
    bool			mustExit(void) const { return m_nState == TS_TOEND; }
    //! reset the object‘s state
    void			setState(typeState state) { m_nState = state; }

    volatile typeState	m_nState;

    int			m_nResult;

    pthread_mutex_t						m_LockState;
    pthread_cond_t					m_ConditionState;
};

class YThreadBase
{
public:
    YThreadBase(void);
    virtual ~YThreadBase(void) {}

    enum	THREAD_PRIORITY
    {
        PRIORITY_HIGHEST		=	7,
        PRIORITY_ABOVE_NORMAL	=	1,
        PRIORITY_NORMAL			=	0,
        PRIORITY_BELOW_NORMAL	=	-1,
        PRIORITY_LOWEST			=	-7,
    };
protected:

    #define ThreadFuncReturnCode void*
    typedef void * (* ThreadFunc)(void * arg);
    typedef pthread_t		typeThreaID;

    typeThreaID	m_nPthreadID;
    static int	threadExit(int nValue);
    static int threadStartImp(ThreadFunc func,void * arg,int nStackSize,int nPrio,typeThreaID * pThreadID,bool boDetach);
     virtual int	run(void) = 0;
protected:
    int threadStart(ThreadFunc func,void * arg,int nStackSize,int nPriority, bool boDetach)
        { return threadStartImp(func,arg,nStackSize,nPriority,&m_nPthreadID, boDetach); }
    bool	threadJoin(void ** status = NULL);
    bool	threadDetach(void);
    void threadYield(void);
};

class YThread:public YThreadBase, public YThreadState
{
public:
    YThread(void):YThreadBase(), YThreadState()  {}

    virtual ~YThread(void) {}

    virtual void	start(void);
    void	startEx(int nStackSize = 64 * 1024,int nPriority = YThreadBase::PRIORITY_NORMAL);

    virtual void stop(void);
    virtual int	stopEx(void);
    virtual void	beforeStop(void) =0;
    unsigned int	getThreadID(void) const { return (unsigned int)(m_nPthreadID); }

protected:
    //bool			suspend(int nSec = INFINITE_VALUE,int nNSec = INFINITE_VALUE);
private:
    static ThreadFuncReturnCode	threadProxy(void * arg);
};

#endif // YTHREAD_H

线程基类源文件

#include "ythread.h"
//#include "ttmutex.h"

#include <assert.h>

#include <pthread.h>

const char * g_lpszThreadState[] = {
    "None, runnable object not build",	//TS_NONE,			线程还没有建立
    "Just Build the runnable object",	//TS_BUILD,			线程刚刚建立完
    "Waiting for task",					//TS_WAITTING,		线程正在等待任务
    "Running",							//TS_RUNNING,		线程正在执行
    "Will be end",						//TS_TOEND,			线程准备要结束
    "Closing the runnable object",		//TS_ENDING,		线程正在结束
    "Runnable object is Closed",			//TS_END,			线程已经结束
    ""									//TS_DEF_END
};

//-------------------------------------------------------------------

const char * YThreadState::getStateString(typeState nState)
{
    return g_lpszThreadState[static_cast<int>(nState)];
}

YThreadBase::YThreadBase(void) : m_nPthreadID(0) {}

int YThreadBase::threadStartImp(ThreadFunc func,void * arg,int nStackSize,int nPriority,typeThreaID * pThreadID,bool boDetach)
{
    pthread_attr_t attr;
    pthread_attr_init(&attr); // initialize attr with default attributes
    pthread_attr_setstacksize (&attr,nStackSize);

    if (nPriority != 0)
    {
        pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED);
    }
    int nErrno = pthread_create(pThreadID, &attr, func, arg);
    pthread_attr_destroy(&attr);
    if (nErrno == 0 && boDetach)
        pthread_detach( *pThreadID );
    return nErrno;
}

bool YThreadBase::threadDetach(void)
{
    return pthread_detach(m_nPthreadID) == 0;
}

void YThreadBase::threadYield(void)
{
    pthread_yield();
}

bool YThreadBase::threadJoin(void ** pStatus)
{
    return pthread_join(m_nPthreadID, pStatus) == 0;
}

int YThreadBase::threadExit(int nValue)
{
    pthread_exit(reinterpret_cast<void *>(nValue));
    return nValue;
}

//-------------------------------------------------------------------

void YThread::start(void)
{
    startEx();
}

void YThread::startEx(int nStackSize,int nPriority)
{
    TAutoLock _au(m_LockState);
    int res = YThreadBase::threadStart(threadProxy,this,nStackSize,nPriority,true);
    if (res<0)
       CERR("Start Thread Error");
    pthread_cond_wait(&m_ConditionState, &m_LockState);
}

int YThread::stopEx(void)
{
    TAutoLock _au(m_LockState);
    if (m_nState == TS_NONE || m_nState == TS_ENDING || m_nState == TS_TOEND)
        return m_nResult;
    m_nState = TS_TOEND;

    beforeStop();

    while (m_nState != TS_NONE)
        pthread_cond_wait(&m_ConditionState, &m_LockState);
    return m_nResult;
}

void YThread::stop(void)
{
    TAutoLock _au(m_LockState);
    if (m_nState == TS_NONE || m_nState == TS_ENDING || m_nState == TS_TOEND)
        return;
    m_nState = TS_TOEND;

//        beforeStop();
}

ThreadFuncReturnCode YThread::threadProxy(void * arg)
{
    YThread * pThread = reinterpret_cast<YThread *>(arg);

   pthread_mutex_lock(&pThread->m_LockState);
    pThread->m_nState = TS_BUILD;
    pthread_cond_signal(&pThread->m_ConditionState);
   pthread_mutex_unlock(&pThread->m_LockState);

    pThread->m_nResult= 0;

    try
    {
        pThread->m_nResult = pThread->run();
    }

    catch(std::exception & se)
    {
        CERR(se.what());
    }
    catch(...)
    {
        CERR("unkown exp in thread run");
    }

     TAutoLock _au(pThread->m_LockState);
     pThread->m_nState = TS_NONE;
     pthread_cond_signal(&pThread->m_ConditionState);
     threadExit(pThread->m_nResult);
     return 0;
}

线程池类头文件(包括了工作线程)

#ifndef YTHREADPOOL_H
#define YTHREADPOOL_H

#include "ythread.h"
#include <list>

struct MYData
{
    std::string strData;
    int  nNum;
};

class YMYThreadPool;
class YMYThread : public YThread
{
public:
    YMYThread(size_t nThreadNum, YMYThreadPool *pParent) :YThread(),m_nThreadNum(nThreadNum) ,m_pParent(pParent)
    {
        pthread_mutex_init(&m_lockForDataListAndCond, NULL);
         pthread_cond_init(&m_condForDataList,NULL);
    }

    ~YMYThread()
    {
        pthread_mutex_destroy(&m_lockForDataListAndCond);
        pthread_cond_destroy(&m_condForDataList);
    }
    static void initGobal(size_t nParentThreadCount, size_t nDataListLimits);

    virtual void	beforeStop(void) ;
    virtual int run(void);

    bool	addDataWithoutSigna(const MYData & mydata);
    bool	addData(const MYData & mydata);

protected:

    void setCurrentState(const std::string & strState)
    {
        m_tBeginTime = time(NULL);
        m_strCurrentState = strState;
    }

    virtual int handleDataList(void);
    void dealWithEachData(const MYData & itemdata);

    typedef std::list<MYData> typeDataList;
    typeDataList m_datalist;
    pthread_mutex_t					m_lockForDataListAndCond;
    pthread_cond_t				m_condForDataList;

    YMYThreadPool * m_pParent;
    size_t					m_nThreadNum;		//Thread number

    time_t m_tBeginTime;
    std::string m_strCurrentState;

    static size_t			g_nDataListSizeLimits;
    static size_t          g_nParentPoolThreadCount;
};

class YMYThreadPool
{
public:
    YMYThreadPool(void);
    ~YMYThreadPool(void);

    void			init(size_t nThreadCount,size_t nSunDataLimits);
    void			start(void);
    void			stop(void);
    YMYThread &	getThread(const std::string & strSid);

    bool pushMission(const MYData & data);

    bool				m_boToExit;

private:
    YMYThread	**	m_ppThreads;
    size_t				m_nThreadCount;
//    TMutexEvent 		m_AllReadyEvent;

    bool				m_boInit;

};

#endif // YTHREADPOOL_H

线程池类源文件(包括工作线程的具体实现)

#include "ythreadpool.h"

const int nLimitsDefault = 1000;
const int nThreadCountDefault = 1;
size_t YMYThread::g_nDataListSizeLimits = static_cast<size_t>(nLimitsDefault);
size_t YMYThread::g_nParentPoolThreadCount = static_cast<size_t>(nThreadCountDefault);

void YMYThread::initGobal(size_t nPoolThreadCount, size_t nDataListLimits)
{
    g_nParentPoolThreadCount = nPoolThreadCount;
    g_nDataListSizeLimits = nDataListLimits;
}

void YMYThread::beforeStop(void)
{
    TAutoLock au(m_lockForDataListAndCond);
    pthread_cond_signal(&m_condForDataList);
}

int YMYThread::run(void)
{
    while(!m_pParent->m_boToExit)
    {
        try
        {
            setCurrentState( "Wait for mission" );
            pthread_mutex_lock(&m_lockForDataListAndCond);
            if (m_datalist.empty())
                pthread_cond_wait(&m_condForDataList, &m_lockForDataListAndCond);
            pthread_mutex_unlock(&m_lockForDataListAndCond);

            if ( m_pParent->m_boToExit )
                break;

            setCurrentState( "Begin handle search missions" );
            handleDataList();
        }
        catch(...)
        {
        }
    }
    return int();
}
bool	YMYThread::addDataWithoutSigna(const MYData & data)
{
    if (m_datalist.size() >= g_nDataListSizeLimits)
        return false;
    m_datalist.push_back(data);
    return true;
}

bool	YMYThread::addData(const MYData & data)
{
    TAutoLock au(m_lockForDataListAndCond);
    if (!addDataWithoutSigna(data))
        return false;
    pthread_cond_signal(&m_condForDataList);
    return true;
}

void YMYThread::dealWithEachData(const MYData & data)
{
    std::cout <<"deal each data;"<<std::endl;
    std::cout<<"thread num:"<<m_nThreadNum<<", data str:"<<data.strData<<", data num:"<<data.nNum <<std::endl;

}

int YMYThread::handleDataList(void)
{
    if ( m_pParent->m_boToExit )
        return int();

    typeDataList		datalist;		//list of mission must to do
    pthread_mutex_lock(&m_lockForDataListAndCond);		//Condition for the mission
    datalist.swap( m_datalist );
    pthread_mutex_unlock(&m_lockForDataListAndCond);

    setCurrentState( "Handling mission." );

    typeDataList::iterator it = datalist.begin();
    while (it != datalist.end())
    {
        dealWithEachData(*it);
        it++;
    }
}
///////////////////////////////////////////////////////////////////////

inline size_t	getHashIndex(const std::string & strSID,size_t nThreadCount)
{
    std::hash<std::string> hash_fn;  //c++11的新特性
    size_t nHashValue = hash_fn(strSID);
    return nHashValue % nThreadCount;
}

YMYThreadPool::YMYThreadPool():m_ppThreads(NULL),m_nThreadCount(0),
    m_boInit(false),m_boToExit(false)
{
}

YMYThreadPool::~YMYThreadPool(void)
{
    stop();
}

void YMYThreadPool::start(void)
{
    if (!m_boInit)
        return;
    for (size_t i = 0; i < m_nThreadCount; ++i)
        if ( m_ppThreads[i] != NULL )
            m_ppThreads[i]->startEx( 2 * 1024 * 1024 );
}

void YMYThreadPool::stop(void)
{
    if (m_ppThreads == NULL)
        return;

    m_boToExit = true;

    for (size_t i = 0; i < m_nThreadCount; ++i)
    {
        if ( m_ppThreads[i] != NULL )
        {
            m_ppThreads[i]->stopEx();
            delete m_ppThreads[i];
        }
    }
    delete []m_ppThreads;
    m_ppThreads = NULL;
    m_nThreadCount = 0;
}
void YMYThreadPool::init(size_t nThreadCount,size_t nSunDataLimits)
{
    YMYThread::initGobal(nThreadCount, nSunDataLimits);

    m_nThreadCount = nThreadCount;
    m_ppThreads = new YMYThread *[nThreadCount];

    for (size_t i = 0 ; i < m_nThreadCount; ++i)
    {
        m_ppThreads[i] = NULL;
        m_ppThreads[i] = new YMYThread(i,this/*,&m_AllReadyEvent*/);
        if (m_ppThreads[i] == NULL)
            CERR( "Memory not enough for Sub-Thread" );
    }
    m_boInit = true;
}

bool YMYThreadPool::pushMission(const MYData & data)
{
    if (!m_boInit)
        return false;
    unsigned nIndex = getHashIndex(data.strData, m_nThreadCount);
    return m_ppThreads[nIndex]->addData(data);
}

YMYThread &	YMYThreadPool::getThread(const std::string & strSid)
{
    std::hash<std::string> hash_fn;

    size_t nHash = hash_fn(strSid);
    return *(m_ppThreads[ nHash % m_nThreadCount ]);
}

main函数,注意请用单例模式去创建线程池

#include "ythreadpool.h"
#include <unistd.h>
int main()
{
    YMYThreadPool p;
    p.init(6, 100);
    p.start();
    MYData da1, da2;
    da1.strData = "ds1";
    da1.nNum = 11;
    da2.strData = "ds2";
    da2.nNum = 22;

    sleep(2);
    p.pushMission(da1);
    p.pushMission(da2);

     sleep(2);

    p.stop();

    return 0;
}
时间: 2024-10-12 20:14:21

C++线程池2013年的博客迁移的相关文章

如何用 Windows Live Writer 和 Word 2013 分别发表博客到Cnblog 和CSDN

为什么会写这篇 最近写博客在 Cnblog 上面写博客, 发现图片不能复制了直接粘贴上,这对于把博客当随手笔记的人来说无疑非常痛苦.求助于博客园,他们让我用 Windows Live Writer 试试.我查了下大家推荐的除了 WLW 还有直接用 Word 2013 发布的.这么高端,我果然 low 了.于是尝试着根据大神们的教程实验了一把,果然好用的不是一点点,聪明好学的我还拓展到了 CSDN 那边,开始都是大家说的 505 错误,后来经过修正也成功发表了. 谦虚地强调下,这里不是原创,而是总

博客迁移-爱T-blog

从2012.8月后 博客迁移到http://blog.itiwin.cn  爱T-blog php技术博客 magento博客 版权声明:本文为博主原创文章,未经博主允许不得转载.

Hexo博客迁移到Coding

Coding是一个面向开发者的云端开发平台,目前提供代码托管,运行空间,质量控制,项目管理等功能. Coding提供社会化协作功能,包含了社交元素,为开发者提供技术讨论和协作平台. 一.创建项目 注:选择公开 点击创建之后 获取页面HTTPS或SSH地址 二.Clone项目到本地 $ git clone https://coding.net/itmyhome/blog.git blog 三.推送代码 如果已有hexo博客代码 放在blog目录下(.deploy .git除外),其他不变 修改根目

博客迁移至个人技术博客

个人博客地址 个人博客地址: http://mrljdx.com 迁移原因: 为由于维护个人博客和51cto博客比较麻烦,故将博客迁移至个人博客.今后新的博客内容仅在个人博客中更新.感谢51cto平台多年的陪伴! ~ Mrljdx

Word 2013 发布51CTO博客

复制个人博客的管理地址: http://489887.blog.51cto.com

博客迁移通知

博客已经迁移到“惊鸿哥的港湾(http://jhonge.net/)”以后最新的文章会在那里发出.至于博客园这边,会在个人博客发布后几天内通过“手工”的方式同步过来(具体延迟还要看具体情况).希望各位朋友能继续支持.谢谢~!

拥抱互联网写作方式之博客迁移至GitBook

转眼间在这里度过了5年时间,随着GitHub的深度使用和Markdown的写作方式的青睐,传统的博文写作方式已经不能满足需要了,其主要体现在这些方面: 1.格式太依赖编辑器 2.文章的修改,更新比较困难 3.文章数量较多时查找不方便 4.本地化编辑比较弱 5.协同编辑能力无 GitHub和GitBook的诞生基本解决了所有问题,于是想出一招将博客以写书的方式进行,这样既不需要单独的托管,也能满足本地管理.省时省力,专心为文. 野马红尘新博文地址:https://www.gitbook.com/b

博客迁移

迁移至个人域名博客:http://www.desgard.com/ 欢迎来访! 如想添加友链,联系我: E-mail: [email protected]

VMCloud见面礼&mdash;&mdash;VMCloud博客迁移全记录

微信号 VMCloud 大家好,为了避免在愚人节再公开本订阅号(其实为了避免被误会成是愚人节玩笑),所以选在今天,三月份的最后一天,VMCloud订阅号正式开张.如果说再说多的话也没办法去解释VMCloud订阅号的作用,那么就让我用实际行动来为各位了解关注本VMCloud订阅号的意义所在吧:P --------------------------------------------------------------------------------------------------- 正文