一种基于Qt的可伸缩的全异步C/S架构服务器实现(三)

三、流水线结构线程池设计

为了无阻塞地实现并发通信及处理,传统的小规模服务器采用每用户一线程的多线程技术,称为“任务伴随者”模式。该模式示意图如下:

然而,当客户端很多时,开启上百组线程,远远超过计算机的物理线程规模,导致大量计算资源浪费在线程上下文切换和环境恢复等维护工作中,有效计算能力显著降低。

在多线程并行计算技术中,能够有效利用CPU物理核心,避免上下文频繁切换的经典模式是线程池模式。系统仅开启与CPU核心数相等的工作线程,形成线程池(ThreadPool)。各个任务在队列中排队,按照先进先出次序(FIFO)送入线程池中处理。该模式的示意图如下:

该模式尽管避免了任务量较大时,实际计算能力降低的问题,但单位时间内仅能处理一定数量的任务,存在阻塞的可能。万一现在处理的几个任务均很耗时,则其他客户的简单任务也会被耽误。

在非阻塞的情况下利用线程池模式有效处理大量用户数据,要靠基于线程池的流水线技术,实现最优线程配置条件下低阻塞处理。

该模式的关键是对每个客户的处理任务进行细化,比如每K个指令为一个粒度单位,无论该客户的指令队列缓存了多少指令,一次仅处理K个,随后让出计算资源分配给其他任务使用。示意图如下:

采用该方法,流水线结构保证了各个客户工作在大粒度上并行化,线程池技术保证了处理器资源的最大利用,可以显著提高系统的吞吐能力。另一个附加好处,是可以让VIP获得高级优先级。

3.1 模块结构

在范例代码中,本模块的所有文件位于 pipeline 文件夹下。

命名空间:ZPTaskEngine

主要有三个类组成。

1、zp_pipeline类

该类是流水线线程池的接口类。其管理了各个执行者线程,以及任务队列。执行者线程存储在本类的成员变量中。

		//working threads
		QVector<zp_plWorkingThread *> m_vec_workingThreads;
		QVector<QThread *> m_vec_InternalworkingThreads;

通过方法addThreads可以控制线程池的规模。

2、zp_plWorkingThread类

这个类是用于执行任务的线程对象。在其被创建时,绑定在一个QThread的线程事件循环中运行。创建的方法位于 zp_pipeline::addThreads中,

	/**
	 * @brief Add nThreads to the thread pool
	 *
	 * @fn zp_pipeline::addThreads
	 * @param nThreads how many threads you want to add.
	 * @return int current threads count after add.
	 */
	int zp_pipeline::addThreads(int nThreads)
	{
		if (nThreads>=1 && nThreads <=128)
		{
			for (int i=0;i<nThreads;i++)
			{
				zp_plWorkingThread * thread = new zp_plWorkingThread(this);
				m_vec_workingThreads.push_back(thread);
				QThread * pTh = new QThread(this);
				m_vec_InternalworkingThreads.push_back(pTh);
				thread->moveToThread(pTh);
				connect (this,&zp_pipeline::evt_start_work,thread,&zp_plWorkingThread::FetchNewTask,Qt::QueuedConnection);
				connect (this,&zp_pipeline::evt_stop_work,thread,&zp_plWorkingThread::setStopMark,Qt::QueuedConnection);
				connect (thread,&zp_plWorkingThread::taskFinished,this,&zp_pipeline::on_finished_task,Qt::QueuedConnection);
				pTh->start();
				m_mutex_protect.lock();
				m_nExistingThreads++;
				m_mutex_protect.unlock();

			}

		}
		return m_vec_workingThreads.size();
	}

3、zp_plTaskbase类

本类是一个纯虚基类,用于给应用者重载具体的执行任务。该类的核心方法是 run(),用于在线程池的某个线程中运行。

3.2 工作原理

1、  当外部需要执行任务时,调用 zp_pipeline::pushTask方法,向任务队列m_list_tasks中传入zp_plTaskbase类型的指针。一旦队列中被插入了新任务,会立刻判断是否有空闲的线程可以执行这个任务。如果有,立刻触发执行。核心代码:

	void zp_pipeline::pushTask(zp_plTaskBase * task,bool bFire )
	{
		m_mutex_protect.lock();
		m_list_tasks.push_back(task);
		task->addRef();
		m_mutex_protect.unlock();

		int nsz =  m_vec_workingThreads.size();
		if (bFire==true)
			for (int i=0;i<nsz;i++ )
			{
				if (m_vec_workingThreads[i]->m_bBusy==false)
				{
					on_finished_task (m_vec_workingThreads[i]);
					break;
				}
			}

	}

2、 zp_pipeline:: on_finished_task
槽既是任务的起始,也是任务的结束。当某个zp_plWorkingThread对象执行完了一次任务,便会触发本方法。在本方法中,zp_pipeline对象检查自己的队列,看看是否还有任务需要执行。如果有,则读入一个任务继续执行

	void  zp_pipeline::on_finished_task (zp_plWorkingThread * task)
	{
		int res = 0;
		m_mutex_protect.lock();
		res = m_list_tasks.size();
		m_mutex_protect.unlock();
		if (res)
			emit evt_start_work(task );
	}

触发执行任务是使用事件 evt_start_work 触发的,这个信号发给 task线程,使得它在自己的槽函数中获取新的任务。

	/**
	 * @brief Call zp_plTaskBase::popTask to fetch new tasks.
	 *
	 * @fn zp_plWorkingThread::FetchNewTask
	 * @param obj the zp_plWorkingThread object recieved by signal-slot system.
	 * this method will omit zp_plWorkingThread objs except for it self.
	 */
	void zp_plWorkingThread::FetchNewTask(zp_plWorkingThread * obj)
	{

		if (obj != this)
			return;
		if (m_bRuning)
		{

			bool bValid = false;
			zp_plTaskBase * ptr = this->m_pipeline->popTask(&bValid);

			if (bValid==true && ptr!=NULL)
			{
				m_bBusy = true;
				int res = ptr->run();
				ptr->delRef();
				m_bBusy = false;
				if (res!=0 )
					this->m_pipeline->pushTask(ptr,false);
			}

			emit taskFinished(this);

		}

	}

在FetchNewTask槽中,会调用 zp_pipeline::popTask方法,弹出一个任务。一旦任务弹出,则会调用虚函数  run() 运行。run()的返回结果为0, 表示本任务彻底完成了,不再塞入队列的尾部;如果返回值非0, 说明任务还没有完成,只是执行了一部分,任务自己就释放了资源,防止阻塞整个流程。在这个情况下,任务被重新 push回队列。

3.3后续预告

下一章,将介绍数据库的简单封装。

一种基于Qt的可伸缩的全异步C/S架构服务器实现(三)

时间: 2024-10-08 05:42:18

一种基于Qt的可伸缩的全异步C/S架构服务器实现(三)的相关文章

一种基于Qt的可伸缩的全异步C/S架构服务器实现(六) 整合各个模块实现功能

在前面的章节中,介绍了网络传输.任务线程池.数据库和集群四个主要功能模块.到现在为止,这些模块都还只是一种资源,没有产生实际的运行效果.对一个具备真实功能的应用来说,需要有一个整合的过程.整合方法很多,这里以典型的客户 -客户通信来举例说明. (一)类结构 1."客户端" 这个概念被抽象为一个节点类st_clientNode,每个客户端连接对应了该类的一个实例.这个类不但存储了有关该连接的所有背景信息(比如聊天程序中的用户名等),还提供了正确解释数据流的代码实现.由于想分开传输层和应用

一种基于Qt的可伸缩的全异步C/S架构服务器实现(流浪小狗,六篇,附下载地址)

本文向大家介绍一种基于Qt的伸缩TCP服务实现.该实现针对C/S客户端-服务集群应用需求而搭建.连接监听.数据传输.数据处理均在独立的线程池中进行,根据特定任务不同,可安排负责监听.传输.处理的线程数目,从而在高传输负荷.高计算符合上达成取舍.数据处理采用流水线结构,以避免少量客户的密集计算请求影响其他客户端的处理.本文对应的代码符合LGPL协议,可直接从https://github.com/goldenhawking/zpserver下载. 也可从http://download.csdn.ne

一种基于Qt的可伸缩的全异步C/S架构服务器实现(一) 综述

本文向大家介绍一种基于Qt的伸缩TCP服务实现.该实现针对C/S客户端-服务集群应用需求而搭建.连接监听.数据传输.数据处理均在独立的线程池中进行,根据特定任务不同,可安排负责监听.传输.处理的线程数目,从而在高传输负荷.高计算符合上达成取舍.数据处理采用流水线结构,以避免少量客户的密集计算请求影响其他客户端的处理.本文对应的代码符合LGPL协议,可直接从https://github.com/goldenhawking/zpserver下载. 也可从http://download.csdn.ne

一种基于Qt的可伸缩的全异步C/S架构server实现(一) 综述

本文向大家介绍一种基于Qt的伸缩TCP服务实现.该实现针对C/Sclient-服务集群应用需求而搭建. 连接监听.传输数据.数据处理均在独立的线程池中进行,依据特定任务不同,可安排负责监听.传输.处理的线程数目,从而在高传输负荷.高计算符合上达成取舍.数据处理採用流水线结构.以避免少量客户的密集计算请求影响其它client的处理. 本文相应的代码符合LGPL协议,可直接从https://github.com/goldenhawking/zpserver下载. 也可从http://download

一种基于Qt的可伸缩的全异步C/S架构server实现(二) 网络传输

二.网络传输模块 模块相应代码命名空间    (namespace ZPNetwork) 模块相应代码存储目录    (\ZoomPipeline_FuncSvr\network) 2.1 模块结构 网络传输模块负责管理监听器,并依据各个传输线程眼下的负荷,把新申请接入的客户套接字描写叙述符引导到最空暇的传输线程中运行"接受连接(Accept)"操作.该模块由例如以下几个类组成. 1.zp_net_Engine类,派生自Qobject.模块的外部接口类.同一时候也是功能管理者.提供了设

一种可伸缩的全异步C/S架构服务器实现(二)

二.网络传输模块 模块对应代码命名空间    (namespace ZPNetwork) 模块对应代码存储文件夹    (\ZoomPipeline_FuncSvr\network) 2.1 模块结构 网络传输模块负责管理监听器,并根据各个传输线程目前的负荷,把新申请接入的客户套接字描述符引导到最空闲的传输线程中执行"接受连接(Accept)"操作.该模块由如下几个类组成. 1.zp_net_Engine类,派生自Qobject.模块的外部接口类,同时也是功能管理者.提供了设置监听器.

基于Qt有限状态机人工智能的一种实现及改进方法

基于Qt有限状态机人工智能的一种实现及改进方法 人工智能在今年是一个非常火的方向,当然了,不仅仅是今年,它一直火了很多年,有关人工智能的一些算法层出不穷.人工智能在很多领域都有应用,就拿我熟悉的游戏领域来说吧,一些寻路算法,比如说A*算法(我的<十日驱鬼记>就曾经使用了A*算法进行寻路),还有一些高级的算法,比如说决策树等,都在游戏中得以了广泛的应用.我目前想制作的项目和人工智能也有一定的关系,因此,我这个月开始学习搭建一些简单的人工智能框架. 蒋彩阳原创文章,首发地址:http://blog

基于Qt有限状态机的一种实现方式和完善的人工智能方法

基于Qt有限状态机的一种实现方式和完善的人工智能方法 人工智能在今年是一个非常火的方向,当然了.不不过今年,它一直火了非常多年,有关人工智能的一些算法层出不穷.人工智能在非常多领域都有应用,就拿我熟悉的游戏领域来说吧,一些寻路算法,比方说A*算法(我的<十日驱鬼记>就以前使用了A*算法进行寻路).另一些高级的算法,比方说决策树等.都在游戏中得以了广泛的应用.我眼下想制作的项目和人工智能也有一定的关系,因此.我这个月開始学习搭建一些简单的人工智能框架. 蒋彩阳原创文章,首发地址:http://b

【Qt编程】基于Qt的词典开发系列--后序

从去年八月份到现在,总算完成了词典的编写以及相关技术文档的编辑工作.从整个过程来说,文档的编写比程序的实现耗费的时间更多.基于Qt的词典开发系列文章,大致包含了在编写词典软件过程中遇到的技术重点与难点.每篇文章都完成了一个小的功能,所给的代码都基本上是可以独立运行的.本系列文章对于想要自己动手完成词典软件的程序员来说具有很好的参考价值,对于想要编写其它软件的人来说也具有参考意义. 词典软件制作的初衷 在2013的年终总结中,我提过想要学习一门界面编程语言,后来就选中了Qt.于是在2014年上半年