三、流水线结构线程池设计
为了无阻塞地实现并发通信及处理,传统的小规模服务器采用每用户一线程的多线程技术,称为“任务伴随者”模式。该模式示意图如下:
然而,当客户端很多时,开启上百组线程,远远超过计算机的物理线程规模,导致大量计算资源浪费在线程上下文切换和环境恢复等维护工作中,有效计算能力显著降低。
在多线程并行计算技术中,能够有效利用CPU物理核心,避免上下文频繁切换的经典模式是线程池模式。系统仅开启与CPU核心数相等的工作线程,形成线程池(ThreadPool)。各个任务在队列中排队,按照先进先出次序(FIFO)送入线程池中处理。该模式的示意图如下:
该模式尽管避免了任务量较大时,实际计算能力降低的问题,但单位时间内仅能处理一定数量的任务,存在阻塞的可能。万一现在处理的几个任务均很耗时,则其他客户的简单任务也会被耽误。
在非阻塞的情况下利用线程池模式有效处理大量用户数据,要靠基于线程池的流水线技术,实现最优线程配置条件下低阻塞处理。
该模式的关键是对每个客户的处理任务进行细化,比如每K个指令为一个粒度单位,无论该客户的指令队列缓存了多少指令,一次仅处理K个,随后让出计算资源分配给其他任务使用。示意图如下:
采用该方法,流水线结构保证了各个客户工作在大粒度上并行化,线程池技术保证了处理器资源的最大利用,可以显著提高系统的吞吐能力。另一个附加好处,是可以让VIP获得高级优先级。
3.1 模块结构
在范例代码中,本模块的所有文件位于 pipeline 文件夹下。
命名空间:ZPTaskEngine
主要有三个类组成。
1、zp_pipeline类
该类是流水线线程池的接口类。其管理了各个执行者线程,以及任务队列。执行者线程存储在本类的成员变量中。
//working threads QVector<zp_plWorkingThread *> m_vec_workingThreads; QVector<QThread *> m_vec_InternalworkingThreads;
通过方法addThreads可以控制线程池的规模。
2、zp_plWorkingThread类
这个类是用于执行任务的线程对象。在其被创建时,绑定在一个QThread的线程事件循环中运行。创建的方法位于 zp_pipeline::addThreads中,
/** * @brief Add nThreads to the thread pool * * @fn zp_pipeline::addThreads * @param nThreads how many threads you want to add. * @return int current threads count after add. */ int zp_pipeline::addThreads(int nThreads) { if (nThreads>=1 && nThreads <=128) { for (int i=0;i<nThreads;i++) { zp_plWorkingThread * thread = new zp_plWorkingThread(this); m_vec_workingThreads.push_back(thread); QThread * pTh = new QThread(this); m_vec_InternalworkingThreads.push_back(pTh); thread->moveToThread(pTh); connect (this,&zp_pipeline::evt_start_work,thread,&zp_plWorkingThread::FetchNewTask,Qt::QueuedConnection); connect (this,&zp_pipeline::evt_stop_work,thread,&zp_plWorkingThread::setStopMark,Qt::QueuedConnection); connect (thread,&zp_plWorkingThread::taskFinished,this,&zp_pipeline::on_finished_task,Qt::QueuedConnection); pTh->start(); m_mutex_protect.lock(); m_nExistingThreads++; m_mutex_protect.unlock(); } } return m_vec_workingThreads.size(); }
3、zp_plTaskbase类
本类是一个纯虚基类,用于给应用者重载具体的执行任务。该类的核心方法是 run(),用于在线程池的某个线程中运行。
3.2 工作原理
1、 当外部需要执行任务时,调用 zp_pipeline::pushTask方法,向任务队列m_list_tasks中传入zp_plTaskbase类型的指针。一旦队列中被插入了新任务,会立刻判断是否有空闲的线程可以执行这个任务。如果有,立刻触发执行。核心代码:
void zp_pipeline::pushTask(zp_plTaskBase * task,bool bFire ) { m_mutex_protect.lock(); m_list_tasks.push_back(task); task->addRef(); m_mutex_protect.unlock(); int nsz = m_vec_workingThreads.size(); if (bFire==true) for (int i=0;i<nsz;i++ ) { if (m_vec_workingThreads[i]->m_bBusy==false) { on_finished_task (m_vec_workingThreads[i]); break; } } }
2、 zp_pipeline:: on_finished_task
槽既是任务的起始,也是任务的结束。当某个zp_plWorkingThread对象执行完了一次任务,便会触发本方法。在本方法中,zp_pipeline对象检查自己的队列,看看是否还有任务需要执行。如果有,则读入一个任务继续执行
void zp_pipeline::on_finished_task (zp_plWorkingThread * task) { int res = 0; m_mutex_protect.lock(); res = m_list_tasks.size(); m_mutex_protect.unlock(); if (res) emit evt_start_work(task ); }
触发执行任务是使用事件 evt_start_work 触发的,这个信号发给 task线程,使得它在自己的槽函数中获取新的任务。
/** * @brief Call zp_plTaskBase::popTask to fetch new tasks. * * @fn zp_plWorkingThread::FetchNewTask * @param obj the zp_plWorkingThread object recieved by signal-slot system. * this method will omit zp_plWorkingThread objs except for it self. */ void zp_plWorkingThread::FetchNewTask(zp_plWorkingThread * obj) { if (obj != this) return; if (m_bRuning) { bool bValid = false; zp_plTaskBase * ptr = this->m_pipeline->popTask(&bValid); if (bValid==true && ptr!=NULL) { m_bBusy = true; int res = ptr->run(); ptr->delRef(); m_bBusy = false; if (res!=0 ) this->m_pipeline->pushTask(ptr,false); } emit taskFinished(this); } }
在FetchNewTask槽中,会调用 zp_pipeline::popTask方法,弹出一个任务。一旦任务弹出,则会调用虚函数 run() 运行。run()的返回结果为0, 表示本任务彻底完成了,不再塞入队列的尾部;如果返回值非0, 说明任务还没有完成,只是执行了一部分,任务自己就释放了资源,防止阻塞整个流程。在这个情况下,任务被重新 push回队列。
3.3后续预告
下一章,将介绍数据库的简单封装。
一种基于Qt的可伸缩的全异步C/S架构服务器实现(三)