1 背景
一个后台实时处理的业务平台,通常我们会根据数据的输入与输出,依据时间轴进行分解成不同阶段或不同粒度的逻辑任务,而每一个待处理的数据我们称为任务或者消息。任务之间的关系可以分为两类:a 上下游父子关系,b 可以并行运行的兄弟关系。具有上下游关系的任务集合具有逻辑或数据依赖关系,即上游任务执行完后,才能执行下游任务;具有兄弟关系的任务间逻辑上互不影响,可以并行运行。
无论是上面任一情况的业务场景,我们需要一种管理类,其职责:管理着一堆线程及其待执行的同类型任务集合。在jdk里面有现成的管理类ThreadPoolExecutor,那么在c++里面看看类似的实现吧:
2 任务与任务池
2.1任务
无论是消息或业务数据,可以抽象地表达为:
struct data_pair { char *data; int len; }
2.2 任务池
任务的缓存用队列表达:
std::queue<data_pair*> _queue;
2.3 任务提交入口
int CQueueThread::writeData(void *data, int len) { if (data == NULL || len <= 0) { return EXIT_FAILURE; } data_pair *item = new data_pair(); item->data = (char*) malloc(len); assert(item->data != NULL); memcpy(item->data, data, len); item->len = len; _mutex.lock(); _queue.push(item); _mutex.signal(); _mutex.unlock(); return EXIT_SUCCESS; }
3线程池
3.1 线程封装
c++里面类似jdk里面Thread类的封装CThread
{ class CThread { public: /** * 构造函数 */ CThread() { tid = 0; pid = 0; } /** * 起一个线程,开始运行 */ bool start(Runnable *r, void *a) { runnable = r; args = a; return 0 == pthread_create(&tid, NULL, CThread::hook, this); } /** * 等待线程退出 */ void join() { if (tid) { pthread_join(tid, NULL); tid = 0; pid = 0; } } /** * 得到Runnable对象 * * @return Runnable */ Runnable *getRunnable() { return runnable; } /** * 得到回调参数 * * @return args */ void *getArgs() { return args; } /*** * 得到线程的进程ID */ int getpid() { return pid; } /** * 线程的回调函数 * */ static void *hook(void *arg) { CThread *thread = (CThread*) arg; thread->pid = gettid(); if (thread->getRunnable()) { thread->getRunnable()->run(thread, thread->getArgs()); } return (void*) NULL; } private: /** * 得到tid号 */ #ifdef _syscall0 static _syscall0(pid_t,gettid) #else static pid_t gettid() { return static_cast<pid_t>(syscall(__NR_gettid));} #endif private: pthread_t tid; // pthread_self() id int pid; // 线程的进程ID Runnable *runnable; void *args; }; }
3.2 线程池
并行处理的能力有线程池的个数决定,定义如下:
CThread *_thread; int _threadCount; |
4 执行器
4.1 执行启动
int CDefaultRunnable::start() { if (_thread != NULL || _threadCount < 1) { TBSYS_LOG(ERROR, "start failure, _thread: %p, threadCount: %d", _thread, _threadCount); return 0; } _thread = new CThread[_threadCount]; if (NULL == _thread) { TBSYS_LOG(ERROR, "create _thread object failed, threadCount: %d", _threadCount); return 0; } int i = 0; for (; i<_threadCount; i++) { if (!_thread[i].start(this, (void*)((long)i))) { return i; } } return i; }
4.2 执行
执行器包含了具体业务的执行:
void CQueueThread::run(CThread *thread, void *args) { int threadIndex = (int)((long)(args)); _mutex.lock(); while(!_stop) { while(_stop == 0 && _queue.empty()) { _mutex.wait(); } if (_stop) { break; } data_pair *item = _queue.front(); _queue.pop(); _mutex.unlock(); if (item != NULL) { if (_handler) { _handler->handleQueue(item->data, item->len, threadIndex, _args); } if (item->data) { free(item->data); } free(item); } _mutex.lock(); } _mutex.unlock();
5 样例代码
CMyHandler handler; CQueueThread queueThread(3, &handler, NULL); queueThread.start(); char data[1024]; for(int i=1; i<=mWriteCount; i++) { int len = sprintf(data, "data_%05d", i); queueThread.writeData(data, len+1); } queueThread.wait();
参考:
http://code.taobao.org/p/tfs/src/
时间: 2024-10-11 05:34:15