libevent多线程使用事项

转 http://www.cnblogs.com/Seapeak/archive/2010/04/08/1707807.html

在linux平台上使用c开发网络程序的同志们一般情况下都对鼎鼎大名的libevent非常的熟悉了。但是一些新进入此领域的new new people们对此都是一头雾水。原本的迷茫再加上开源软件一贯的“帮助文件”缺失作风,让我们这些新手们显的非常的无助。幸好有一些热心的朋友们帮忙,才化险为夷啊!

前几天一直在开发一个locker server,虽然公司现有的locker server能很好的运转,但是毕竟是net的,通用性不广,当我们要在linux上开发多集群系统的时候现有的locker server就未免显得有点捉襟见肘了。正是在开发locker server的过程中使用到了libevent。

总体上,libevent是很好用的。一两个函数就能搞定你复杂的网络通讯工作了。当然了,这句话得用在你使用的是“单线程”的情况下。虽然在linux系统中,进程的资源和window系统中进程的资源相比轻量级很多,代价也相当的没有那么昂贵,所以很多的软件都是使用“多进程”方式实现的,比如大名鼎鼎的apache。但是在我们的系统中,我们使用了“单进程多线程”的方式,这样,我们就能在单机上启动多个进程,以达到“伪分布式”的效果来达到测试的目的。

那么这个时候就要注意libevent的使用了,因为对于event_base来说,不是线程安全的。也就是说多线程不能share同一个event_base,就算是加锁操作也不行。那么这个时候就只能采取“单线程单event_base”的策略了。我的做法是做一个task pool(任务对象池),每个任务会被一个thread执行,当然了,thread肯定也是从thread pool拿出来的,而在task pool初始化的时候,我就给每个task中的event_base初始化了对象,这样,万事大吉了。

这个地方注意了以后,就开始说网络通讯了。在使用libevent的时候,触发事件是在接收到网络连接(或者timeout事件超时)的时候。所以你需要在事件处理函数中判断时间源,其次libevent接收网络通讯的字节流时是使用了libevnet中自带的缓冲的,所以当你接收的时候一定要注意累加,并且多次loop或者注册 event_event中的事件。所以在我的task中,会有接收的data。当然了如果你的协议是分为header和body的,通常header比较短,body比较长,而且在client,header和body通常是连续发送的,这样,在使用libevent的时候,header和body是同时被接收到的,这点一定要注意,所以提醒你在接收数据的函数中,需要区分接收header部分还是body部分;当body非常长,超过libevent的缓冲时,是需要多次多次触发接收函数的,这点也要注意,就是让你需要在接收的时候除了区分header和body以外,还要注意一次接收不完全的情况下,对于数据需要累加。

当你在使用libevent时,event_set事件时,只要不是使用EV_PERSIST注册的事件是不需要在接收完一次数据后多次event_add的,只有当你不使用EV_PERSIST时,你的事件才需要多次event_add到event_base中;当然了,使用了EV_PERSIST注册的函数在event_base被task pool回收时是要显式的event_del该注册事件的,没有使用EV_PERSIST注册的事件是不需要显式的使用event_del删除该事件的。

static void  read_buffer(int client_socket_fd,short event_type,void *arg)
{
	if(NULL == arg)
	{
		log_error("File:"__FILE__",Line:%d.event base arg is NULL.",__LINE__);
		return;
	}
	task_info_t *task_info = (task_info_t *) arg;

	if(event_type == EV_TIMEOUT)
	/*
	这个地方注意需要判断是否超时
	因为我event_add事件的时候没有使用ev_persist
	所以当超时时需要再add一次事件到event_base的loop中
	*/
	{
		if(0 != event_add(&task_info->on_read,&task_info->timeout))
		{
			log_error("File:"__FILE__",Line:%d.repeart add read header event to event_base is error.");
			close(task_info->on_read.ev_fd);
			task_pool_push(task_info);

		}
		return;
	}

	int bytes;
	/*
	这个地方就是开始接收头部
	接收头部时,可能分为好几次从缓冲中取得,所以需要一个while累加
	*/
	while(header == task_info->read_type)//recv header
	{
		bytes = recv(client_socket_fd,task_info->header_buffer+task_info->offset,REQUEST_LENGTH -task_info->offset,0);
		if(0 > bytes )
		{
			if (errno == EAGAIN || errno == EWOULDBLOCK)
			{
				if(0 != event_add(&task_info->on_read, &task_info->timeout))
				{
					close(task_info->on_read.ev_fd);
					task_pool_push(task_info);

					log_error("File: "__FILE__", line: %d, "						"event_add fail.", __LINE__);
					return;
				}
			}
			else
			{
				log_error("File: "__FILE__", line: %d,recv failed,errno: %d, error info: %s",
					__LINE__, errno, strerror(errno));

				close(task_info->on_read.ev_fd);
				task_pool_push(task_info);
			}
			return;
		}
		else if(0 == bytes)
		{
			log_warning("File:"__FILE__",Line:%d.recv buffer form network is error.disconnection the network.",
					__LINE__);
			close(task_info->on_read.ev_fd);
			task_pool_push(task_info);
			return;
		}

		if(REQUEST_LENGTH > bytes+task_info->offset)
		{
			log_warning("File:"__FILE__",Line:%d.recv header is not over.",__LINE__);
			task_info->offset += bytes;
			if(0 != event_add(&task_info->on_read, &task_info->timeout))
			{
				close(task_info->on_read.ev_fd);
				task_pool_push(task_info);
				log_error("File: "__FILE__", line: %d, "					"event_add fail.", __LINE__);
				return;
			}
		}
		else
		{
			task_info->read_type = body;
			deal_request_header(task_info);
			task_info->body_buffer = (char *) malloc(task_info->request_info.length);
			if(NULL == task_info->body_buffer)
			{
				log_error("File:"__FILE__",Line:%d.alloc mem to task_info data is error.",__LINE__);
				close(client_socket_fd);
				task_pool_push(task_info);
				return;
			}
			memset(task_info->body_buffer,0,task_info->request_info.length);
			task_info->offset = 0;//set recv body buffer offset to 0
			break;
		}
	}

	/*
	这个地方就是开始接收body,
	和header一样,也要考虑body多次接收累加的情况。
	*/
	while(body == task_info->read_type)
	{
		bytes = recv(client_socket_fd,task_info->body_buffer+task_info->offset,task_info->request_info.length-task_info->offset,0);
		if(0 > bytes )
		{
			if (errno == EAGAIN || errno == EWOULDBLOCK)
			{
				if(0 != event_add(&task_info->on_read, &task_info->timeout))
				{
					close(task_info->on_read.ev_fd);
					task_pool_push(task_info);

					log_error("File: "__FILE__", line: %d, "						"event_add fail.", __LINE__);
					return;
				}
			}
			else
			{
				log_error("File: "__FILE__", line: %d,recv failed,errno: %d, error info: %s",
					__LINE__, errno, strerror(errno));

				close(task_info->on_read.ev_fd);
				task_pool_push(task_info);
			}
			return;
		}
		else if(0 == bytes)
		{
			log_warning("File:"__FILE__",Line:%d.recv buffer form network is error.disconnection the network.",
					__LINE__);
			close(task_info->on_read.ev_fd);
			task_pool_push(task_info);
			return;
		}

		if(task_info->request_info.length-task_info->offset > bytes)
		{
			log_warning("File:"__FILE__",Line:%d.recv body is not over.",__LINE__);
			task_info->offset += bytes;
			if(0 != event_add(&task_info->on_read, &task_info->timeout))
			{
				close(task_info->on_read.ev_fd);
				task_pool_push(task_info);
				log_error("File: "__FILE__", line: %d, "					"event_add fail.", __LINE__);
				return;
			}
		}
		else
		{
			task_info->read_type = unspecified;
			break;
		}
	}
	deal_request_body(client_socket_fd,task_info);
	return;
}
void deal_working_thread(void *arg)
{
	log_info("debug to this.");
	int client_socket_fd = (int) arg;
	if(0 > client_socket_fd)
	{
		log_error("File:"__FILE__",Line:%d.the arg means client socket filedesc is less 0!",__LINE__);
		return;
	}
	/*
	设置网络为非阻塞,libevent必须的
	*/
	if(!set_nonblocking(client_socket_fd))
	{
		log_error("File:"__FILE__",Line:%d.set client socket filedesc is error.error info is %s!",
				__LINE__,strerror(errno));
		close(client_socket_fd);
		return;
	}

	task_info_t *task_info;
	task_info = task_pool_pop();
	/*
	对event_base注册事件回调函数,
	注意没有使用EV_PERSIST
	*/
	do
	{
		task_info->read_type = header;
		event_set(&task_info->on_read,client_socket_fd,EV_READ,read_buffer,(void *) task_info);
		if(0 != event_base_set(task_info->event_base,&task_info->on_read))
		{
			log_error("File:"__FILE__",Line:%d.Associate the read header event to  event_base is error.",__LINE__);
			task_info->read_type = unspecified;
			close(client_socket_fd);
			task_pool_push(task_info);
			break;
		}

		event_set(&task_info->on_write,client_socket_fd,EV_WRITE,response_handle,(void *) task_info);
		if(0 != event_base_set(task_info->event_base,&task_info->on_write))
		{
			log_error("File:"__FILE__",Line:%d.Associate the write hander to event_base is error.",__LINE__);
			task_info->read_type = unspecified;
			close(client_socket_fd);
			task_pool_push(task_info);
			break;
		}

		if(0 != event_add(&task_info->on_read,&task_info->timeout))
		{
			log_error("File:"__FILE__",Line:%d.add the read header event to  event_base is error.",__LINE__);
			task_info->read_type = unspecified;
			close(client_socket_fd);
			task_pool_push(task_info);
			break;
		}

		event_base_loop(task_info->event_base,EVLOOP_NONBLOCK);
	}while(false);
	return;
}
时间: 2024-10-03 22:42:19

libevent多线程使用事项的相关文章

对libevent+多线程服务器模型的C++封装类

最近在看memcached的源码,觉得它那种libevent+多线程的服务器模型真的很不错,我将这个模型封装成一个C++类,根据我的简单测试,这个模型的效率真的很不错,欢迎大家试用. 这个类的使用方法很简单(缺点是不太灵活),只要派生一个类,根据需要重写以下这几个虚函数就行了: //新建连接成功后,会调用该函数virtual void ConnectionEvent(Conn *conn) { }//读取完数据后,会调用该函数virtual void ReadEvent(Conn *conn) 

libevent+多线程

最近在看memcached的源码,觉得它那种libevent+多线程的服务器模型真的很不错,我将这个模型封装成一个C++类,根据我的简单测试,这个模型的效率真的很不错,欢迎大家试用. 这个类的使用方法很简单(缺点是不太灵活),只要派生一个类,根据需要重写以下这几个虚函数就行了: //新建连接成功后,会调用该函数 virtual void ConnectionEvent(Conn *conn) { } //读取完数据后,会调用该函数 virtual void ReadEvent(Conn *con

项目中的Libevent(多线程)

多线程版Libevent //保存线程的结构体 struct LibeventThread { LibEvtServer* that; //用作传参 std::shared_ptr<std::thread> spThread; // 线程 struct event_base * thread_base; // 事件根基 struct event notify_event; evutil_socket_t notfiy_recv_fd; // socketpair 接收端fd(工作线程接收通知)

多线程注意事项和获取子线程何时都执行完毕

前言 最近一段时间在整公司项目里一个功能的优化,用到了多线程处理.期间也是踩了不少的坑,在这里想说下我遇到的问题和注意事项.以及怎样知道启动的那些多线程都处理完毕这些问题. 实现Runnable接口类需要注意事项 我这里用的多线程,是用了实现Runnable接口,这样的话,要比继承Thread类更加的灵活.毕竟类只能单继承,但可以多实现. 1.事务失效 我实现Runnable接口的类,是处理业务的handler类,在spring配置里面是默认给这些类添加事务的.所以我当时直接在这个类里面写了业务

Android 多线程注意事项

参考:http://blog.csdn.net/x86android/article/details/14161981 http://geeksun.iteye.com/blog/1447708 Android中的线程 在Android平台中多线程应用很广泛,在UI更新.游戏开发和耗时处理(网络通信等)等方面都需要多线程.Android线程涉及的技术有:Handler;Message;MessageQueue;Looper;HandlerThread. 有一点要非常注意的,就是在写这个多线程代码

java多线程注意事项

1:继承thread和实现Runnable创建线程的区别: 继承thread创建的对象直接start()就可以就绪,但是使用Runnable所new出来的对象要先new Thread(xx)才能start() 这也就意味着在start()之前实现Runnable的类一次new 出来的对象是可以多次传入new Thread()创建多个线程实例并且多次start()的,就是说 runnable的数据可以被共享,但是继承thread创建的线程不具备这个功能. class Thread1 extends

(转)Libevent(1)— 简介、编译、配置

转自:http://name5566.com/4190.html 参考文献列表:http://www.wangafu.net/~nickm/libevent-book/ 此文编写的时候,使用到的 Libevent 为 2.0.21 Libevent 之跨平台 在处理大量 SOCKET 连接时,使用 select 并不高效.各个系统都提供了处理大量 SOCKET 连接时的解决方案: Linux 下的 epoll() BSD 下的 kqueue() Solaris 下的 evports Window

使用libevent进行多线程socket编程demo

最近要对一个用libevent写的C/C++项目进行修改,要改成多线程的,故做了一些学习和研究. libevent是一个用C语言写的开源的一个库.它对socket编程里的epoll/select等功能进行了封装,并且使用了一些设计模式(比如反应堆模式),用事件机制来简化了socket编程.libevent的好处网上有很多,但是初学者往往都看不懂.我打个比方吧,1)假设有N个客户端同时往服务端通过socket写数据,用了libevent之后,你的server程序里就不用再使用epoll或是sele

memcached源码阅读----使用libevent和多线程模型

本篇文章主要是我今天阅读memcached源码关于进程启动,在网络这块做了哪些事情. 一.iblievent的使用 首先我们知道,memcached是使用了iblievet作为网络框架的,而iblievet又是单线程模型的基于linux下epoll事件的异步模型.因此,其基本的思想就是 对可读,可写,超时,出错等事件进行绑定函数,等有其事件发生,对其绑定函数回调. 可以减掉了解一下 libevent基本api调用 struct event_base *base; base = event_bas