http线程池的设计与实现(c++)

http线程池的主要用途是异步处理使用无状态短连接的http请求,在传输层通信基于tcp协议和应用层基于http协议的基础上,达到c++服务器与web服务器通信的目的。

设计上:

(1)服务器启动时,初始化配置数量的线程(形成被动连接线程池)。每个线程会生成epoll描述符。

(2)主线程生成监听socket,绑定端口。生成epoll描述符,注册监听socket,非阻塞接收(限定最大时间,如2s)新连接到连接队列。

(2)投放主线程连接队列中的新连接到被动连接线程池。根据硬哈希选择需求的线程来投放。加入后需要注册连接socket(注册时连接对象作为epoll事件的携带数据)到线程的epoll描述符。

(3)在每个线程的例程里会非阻塞监听epoll描述符上发生的读事件,并解析和处理获取的http请求。

这样每个业务线程可以相对独立的处理无状态的http请求。跟单业务线程的场景不同的是,http线程池的线程之间尽量减少数据共享(实在需要缓存在内存则加锁),每个线程又可以作为客户端短时间阻塞向其他服务器请求数据。

http线程池代码如下:(大致上http线程池的思路可以看得出来。主线程接收连接对象和连接对象接收数据并没有在这里展现实现过程。注意接收时需要忽略EINTR和SIGPIPE信号,如果接收返回-1且错误号为EAGAIN或EWOULDBLOCK,说明接收缓冲区满了,需要继续尝试接收,直到超时。接收返回0则表示对方断开连接,则接收失败。接收成功、失败、超时都需要移除连接对象(epoll描述符注销连接socket、关闭socket、移出和销毁连接对象),因为是短连接)

线程池头文件

/**
 * \brief 定义实现轻量级(lightweight)的http服务框架类
 */
class zHttpTaskPool : private zNoncopyable
{
public:
	/**
	 * \brief 构造函数
	 */
	zHttpTaskPool()
	{
	}
	/**
	 * \brief 析构函数,销毁一个线程池对象
	 *
	 */
	~zHttpTaskPool()
	{
		final();
	}
	bool addHttp(zHttpTask *task);
	bool init();
	void final();
private:
	static const int maxHttpThreads = 16;					/**< 最大验证线程数量 */
	zThreadGroup httpThreads;						/**< http服务处理线程组 */
};

线程池源文件

/**
* \brief 轻量级http服务的主处理线程
*/
class zHttpThread : public zThread
{
private:
	/**
	 * \brief http连接任务链表类型
	 */
	typedef std::list<zHttpTask * > zHttpTaskContainer;
	/**
	 * \brief epoll事件结构向量类型
	 */
	typedef std::vector<struct epoll_event> epollfdContainer;
	zHttpTaskPool *pool;		/**< 所属的池 */
	zRTime currentTime;			/**< 当前时间 */
	zMutex mutex;				/**< 互斥变量 */
	zHttpTaskContainer tasks;	/**< 任务列表 */
	int kdpfd;
	epollfdContainer epfds;
	epollfdContainer::size_type fds_count;
public:
	/**
	 * \brief 构造函数
	 * \param pool 所属的连接池
	 * \param name 线程名称
	 */
	zHttpThread(
			zHttpTaskPool *pool,
			const std::string &name = std::string("zHttpThread"))
		: zThread(name), pool(pool), currentTime()
		{
			kdpfd = epoll_create(256);
			assert(-1 != kdpfd);
			epfds.resize(256);
			fds_count = 0;
		}
	/**
	 * \brief 析构函数
	 */
	~zHttpThread()
	{
		TEMP_FAILURE_RETRY(::close(kdpfd));
	}

	void run();

	/**
	 * \brief 添加一个连接任务
	 * \param task 连接任务
	 */
	void add(zHttpTask *task)
	{
		mutex.lock();
		task->addEpoll(kdpfd, EPOLLIN | EPOLLERR | EPOLLPRI, (void *)task);
		tasks.push_back(task);
		++fds_count;
		if (fds_count > epfds.size())
		{
			epfds.resize(fds_count + 16);
		}
		mutex.unlock();
	}

	typedef zHttpTask* zHttpTaskP;
	void remove(zHttpTaskP &task)
	{
		task->delEpoll(kdpfd, EPOLLIN | EPOLLERR | EPOLLPRI);
		tasks.remove(task);
		SAFE_DELETE(task);
		fds_count--;
	}
	void remove(zHttpTaskContainer::iterator &it)
	{
		zHttpTask *task = *it;
		task->delEpoll(kdpfd, EPOLLIN | EPOLLERR | EPOLLPRI);
		tasks.erase(it);
		SAFE_DELETE(task);
		fds_count--;
	}

};

/**
* \brief http线程例程
*/
void zHttpThread::run()
{
	zHttpTaskContainer::iterator it, next;
	while(!isFinal())
	{
		mutex.lock();
		if (!tasks.empty())
		{
			int retcode = epoll_wait(kdpfd, &epfds[0], fds_count, 0);
			if (retcode > 0)
			{
				for(int i = 0; i < retcode; ++i)
				{
					zHttpTask *task = (zHttpTask *)epfds[i].data.ptr;//获取epoll事件的数据,即连接对象
					if (epfds[i].events & (EPOLLERR | EPOLLPRI))//检查epoll事件是否有错
					{
						//套接口出现错误
						remove(task);
					}
					else if (epfds[i].events & EPOLLIN)//检查epoll事件是否是读事件
					{
						switch(task->httpCore())//阻塞recv连接对象缓冲区数据
						{
							case 1:		//接收成功
							case -1:	//接收失败
								remove(task);
								break;
							case 0:		//接收超时,
								break;
						}
					}
				}
			}

			currentTime.now();
			for(it = tasks.begin(), next = it, ++next; it != tasks.end(); it = next, ++next)//检查短连接的连接对象,移除超时的连接对象
			{
				zHttpTask *task = *it;
				if (task->checkHttpTimeout(currentTime))
				{
					//超过指定时间验证还没有通过,需要回收连接
					remove(it);
				}
			}
		}
		mutex.unlock();

		zThread::msleep(50);
	}

	//把所有等待验证队列中的连接加入到回收队列中,回收这些连接
	for(it = tasks.begin(), next = it, ++next; it != tasks.end(); it = next, ++next)
	{
		remove(it);
	}
}

/**
* \brief 把一个TCP连接添加到验证队列中,因为存在多个验证队列,需要按照一定的算法添加到不同的验证处理队列中
* \param task 一个连接任务
*/
bool zHttpTaskPool::addHttp(zHttpTask *task)
{
	//因为存在多个验证队列,需要按照一定的算法添加到不同的验证处理队列中
	static unsigned int hashcode = 0;
	zHttpThread *pHttpThread = (zHttpThread *)httpThreads.getByIndex(hashcode++ % maxHttpThreads);
	if (pHttpThread)
		pHttpThread->add(task);
	return true;
}

/**
* \brief 初始化线程池,预先创建各种线程
* \return 初始化是否成功
*/
bool zHttpTaskPool::init()
{
	//创建初始化验证线程
	for(int i = 0; i < maxHttpThreads; ++i)
	{
		std::ostringstream name;
		name << "zHttpThread[" << i << "]";
		zHttpThread *pHttpThread = new zHttpThread(this, name.str());
		if (NULL == pHttpThread)
			return false;
		if (!pHttpThread->start())
			return false;
		httpThreads.add(pHttpThread);
	}
	return true;
}

/**
* \brief 释放线程池,释放各种资源,等待各种线程退出
*/
void zHttpTaskPool::final()
{
	httpThreads.joinAll();
}
时间: 2024-10-27 07:47:41

http线程池的设计与实现(c++)的相关文章

C++ 线程池的设计问题

1. 给用户添加任务的接口是 schedule(arg), arg应该如何设置 a) 创建 Work class, 将arg设置为 Work*, Work由用户创建,用户删除,线程池内仅保留对Work对象的引用 问题:我写完thread_pool后才发现,用户不再知道如何去删除他创建的 Work了,因为线程池内保存着指向Work的指针,线程池是自动析构的,因此,这种设计方法不可行 b) 设计方法和 a 一致,唯一的不同是线程池内复制了Work, 在类内,直接使用Work对象,不再使用指针了 问题

Linux多线程实践(9) --简单线程池的设计与实现

线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务(不止一个不同的任务)就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池正是遵循这一思想而产生的,本文将介绍的线程池技术同

两种unix网络编程线程池的设计方法

unp27章节中的27.12中,我们的子线程是通过操作共享任务缓冲区,得到task的,也就是通过线程间共享的clifd[]数组,这个数组其实就是我们的任务数组,得到其中的connfd资源. 我们对这个任务数组的操作,需要互斥量+条件变量达到同步的目的..每个线程是无规律的从clifd得到任务,然后执行的.任务和线程之间没有对应关系.线程完成本次任务之后,如果任务数组中任然有任务,则再次运行下一个任务. 而另外的一个线程池模型中,pthread_create (&temp[i].tid, NULL

线程池的设计

http://blog.csdn.net/liangjingbo/article/details/2777112 目前线程池主要包含下列部分: 1.        busy_list 当前在处理客户请求的线程队列,即正在进行智能网页分析的线程 2.        idle_list 当前线程池中空闲的线程队列 3.        request_list 当用户请求到达后,发现线程池中已经没有可用线程,同时线程的个数已经达到了最大限制数(可设定的),此时只能将此次请求放入请求队列中,等待管理线程

线程池的设计思想

单例模式之懒汉式 饿汉式在多线程的情况小可能会出现多实例的情况 解决1:加锁 解决2:双层判断方式,减少枷锁的次数doubleCheck 3.保证程序的有序性,禁止程序内部优化,从而保证不会出现空指针异常 4.利用jvm只产生一份static,并且是主动加载,实现单例懒汉式 使用枚举模式 原文地址:https://www.cnblogs.com/zj-xu/p/11863527.html

使用线程池优化多线程编程

Java中的对象是使用new操作符创建的,如果创建大量短生命周期的对象,这种方式性能非常低下.为了解决这个问题,而发明了池技术. 对于数据库连接有连接池,对于线程则有线程池. 本实例介绍两种方式创建1000个短生命周期的线程,第一种是普通方式,第二种是线程池的方式.通过时间和内存消耗的对比,就可以很明显地看出线程池的优势. 实例结果如下: 说明:使用线程池创建对象的时间是15毫秒,说明线程池是非常高效的. 关键技术: Executors类为java.util.concurrent包中所定义的Ex

Java并发编程系列之二十五:线程池

线程池简介 在之前介绍Executor框架的文章中对线程池有一个初步的认识,实际上线程池这种设计思想在Java中很普遍,比如JVM中常量池,以及Web开发使用到的数据库连接池.这些池本质上还是Java中的对象池,因为池中存放的都是Java对象.回到线程池,几乎所有需要异步或者执行并发任务的程序都可以使用到线程池.使用线程池带来的好处主要包括以下几个方面: 一,提高资源利用率.由于线程池中的线程使可以重复利用的,所以达到了循环利用的目的 二,提高响应速度.由于线程的创建也是需要开销的,如果请求到来

线程池的原理和实现

一. 线程池的简介 通常我们使用多线程的方式是,需要时创建一个新的线程,在这个线程里执行特定的任务,然后在任务完成后退出.这在一般的应用里已经能够满足我们应用的需求,毕竟我们并不是什么时候都需要创建大量的线程,并在它们执行一个简单的任务后销毁. 但是在一些web.email.database等应用里,比如彩铃,我们的应用在任何时候都要准备应对数目巨大的连接请求,同时,这些请求所要完成的任务却又可能非常的简单,即只占用很少的处理时间.这时,我们的应用有可能处于不停的创建线程并销毁线程的状态.虽说比

python(13)线程池:threading

先上代码: 1 pool = threadpool.ThreadPool(10) #建立线程池,控制线程数量为10 2 reqs = threadpool.makeRequests(get_title, data, print_result) #构建请求,get_title为要运行的函数,data为要多线程执行函数的参数 3 #最后这个print_result是可选的,是对前两个函数运行结果的操作 4 [pool.putRequest(req) for req in reqs] #多线程一块执行