socket编程:多路复用I/O服务端客户端之epoll

什么是epoll

epoll是什么?按照man手册的说法:是为处理大批量句柄而作了改进的poll。当然,这不是

2.6内核才有的,它是在2.5.44内核中被引进的(epoll(4) is a new API introduced in Linux kernel

2.5.44),它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通

知方法。

epoll的相关系统调用

epoll只有epoll_create,epoll_ctl,epoll_wait 3个系统调用。

1. int epoll_create(int size);

创建一个epoll的句柄。自从linux2.6.8之后,size参数是被忽略的。需要注意的是,当创建好

epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这

个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件注册函数,它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而

是在这里先注册要监听的事件类型。

第一个参数是epoll_create()的返回值。

第二个参数表示动作,用三个宏来表示:

EPOLL_CTL_ADD:注册新的fd到epfd中;

EPOLL_CTL_MOD:修改已经注册的fd的监听事件;

EPOLL_CTL_DEL:从epfd中删除一个fd;

第三个参数是需要监听的fd。

第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:

events可以是以下几个宏的集合:

EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);

EPOLLOUT:表示对应的文件描述符可以写;

EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);

EPOLLERR:表示对应的文件描述符发生错误;

EPOLLHUP:表示对应的文件描述符被挂断;

EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level

Triggered)来说的。

EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个

socket的话,需要再次把这个socket加入到EPOLL队列里

3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

收集在epoll监控的事件中已经发送的事件。参数events是分配好的epoll_event结构体数组,

epoll将会把发生的事件赋值到events数组中(events不可以是空指针,内核只负责把数据复

制到这个events数组中,不会去帮助我们在用户态中分配内存)。maxevents告之内核这个

events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时

时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。如果函数调用成功,

返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时。

epoll工作原理

epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,

返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一

个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这

样便彻底省掉了这些文件描述符在系统调用时复制的开销。

另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调

用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来

注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机

制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

Epoll的2种工作方式-水平触发(LT)和边缘触发(ET)

假如有这样一个例子:

1. 我们已经把一个用来从管道中读取数据的文件句柄(RFD)添加到epoll描述符

2. 这个时候从管道的另一端被写入了2KB的数据

3. 调用epoll_wait(2),并且它会返回RFD,说明它已经准备好读取操作

4. 然后我们读取了1KB的数据

5. 调用epoll_wait(2)......

Edge Triggered 工作模式:

如果我们在第1步将RFD添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用

epoll_wait(2)之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数

据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某

个事件的时候 ET 工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍

在存在于文件输入缓冲区内的剩余数据。在上面的例子中,会有一个事件产生在RFD句柄

上,因为在第2步执行了一个写操作,然后,事件将会在第3步被销毁。因为第4步的读取操

作没有读空文件输入缓冲区内的数据,因此我们在第5步调用epoll_wait(2)完成后,是否挂

起是不确定的。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件

句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。最好以下面的方式调用ET模

式的epoll接口,在后面会介绍避免可能的缺陷。

i 基于非阻塞文件句柄

ii 只有当read(2)或者write(2)返回EAGAIN时才需要挂起,等待。但这并不是说每次read()

时都需要循环读,直到读到产生一个EAGAIN才认为此次事件处理完成,当read()返回的读

到的数据长度小于请求的数据长度时,就可以确定此时缓冲中已没有数据了,也就可以认

为此事读事件已处理完成。

Level Triggered 工作模式

相反的,以LT方式调用epoll接口的时候,它就相当于一个速度比较快的poll(2),并且无论后

面的数据是否被使用,因此他们具有同样的职能。因为即使使用ET模式的epoll,在收到多

个chunk的数据的时候仍然会产生多个事件。调用者可以设定EPOLLONESHOT标志,在

epoll_wait(2)收到事件后epoll会与事件关联的文件句柄从epoll描述符中禁止掉。因此当

EPOLLONESHOT设定后,使用带有 EPOLL_CTL_MOD标志的epoll_ctl(2)处理文件句柄就

成为调用者必须作的事情。

LT(level triggered)是epoll缺省的工作方式,并且同时支持block和no-block socket.在这种做法

中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果

你不作任何操作,内核还是会继续通知你 的,所以,这种模式编程出错误可能性要小一点。

传统的select/poll都是这种模型的代表.

ET (edge-triggered)是高速工作方式,只支持no-block socket,它效率要比LT更高。ET与LT的

区别在于,当一个新的事件到来时,ET模式下当然可以从epoll_wait调用中获取到这个事件,

可是如果这次没有把这个事件对应的套接字缓冲区处理完,在这个套接字中没有新的事件

再次到来时,在ET模式下是无法再次从epoll_wait调用中获取这个事件的。而LT模式正好相

反,只要一个事件对应的套接字缓冲区还有数据,就总能从epoll_wait中获取这个事件。

因此,LT模式下开发基于epoll的应用要简单些,不太容易出错。而在ET模式下事件发生时,

如果没有彻底地将缓冲区数据处理完,则会导致缓冲区中的用户请求得不到响应。

Nginx默认采用ET模式来使用epoll。

需要注意的一些点是:

  1. 在ET模式下,我们在ET模式下是边缘式触发,他是在非阻塞模式下的运行的,对于服务端来说。我们需要保证读事件写事件的数据完全读取,因为在非阻塞模式下,可能出现数据读取的不完全,所以必须要进行数据控制。

    所以封装了read_data函数,保证读取。

    但是在非阻塞模式下的read函数,如果内部没有数据,他会进行阻塞等待数据写入,返回EAGAIN错误码,所以我们需要考虑到这种情况。

  2. ET模式下的写事件,他是写出数据,有一种特殊情况,就是在我们fd内部的存储写满的情况下,他会返回EAGAIN错误,等待有空间写入才继续进行写,但是我们在编写代码的时候设置buf,控制了缓冲区的长度,所以暂时不需要考虑出现的情况。

    一次调用write()写入可写入字节,然后返回值。所以我们一次write()完全可以满足需求。

下面看一下代码,写了很多过程输出信息,大家可以结合代码和运行结果看一下epoll的流程。

#include<assert.h>
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#include<errno.h>
#include<fcntl.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<sys/epoll.h>

#define _BACKLOG_ 5
#define _MAX_FD_NUM_ 64

typedef struct data_buf
{
	int fd;
	char buf[1024];
}data_buf_t,*data_buf_p;

void usage(char *porc)
{
	printf("%s: [ip][port]\n",porc);
}

static int set_non_block(int fd)
{
	int old_fl = fcntl(fd,F_GETFL);
	if(old_fl < 0)
	{
		perror("fcntl");
		return -1;
	}
	if(fcntl(fd,F_SETFL,old_fl |O_NONBLOCK))
	{
		perror("fcntl");
		return -1;
	}
	return 0;
}

int startup(char *ip,int port)
{
	int sock = socket(AF_INET,SOCK_STREAM,0);

	int opt = 1;
	if(setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt)))
	{
		perror("setsockopt");
		exit(1);
	}

	struct sockaddr_in listen_sock;
	listen_sock.sin_family = AF_INET;
	listen_sock.sin_port = htons(port);
	if(strcmp(ip,"any") == 0)
		listen_sock.sin_addr.s_addr = htonl(INADDR_ANY);
	else 
		listen_sock.sin_addr.s_addr = inet_addr(ip);

	if(bind(sock,(struct sockaddr*)&listen_sock,sizeof(listen_sock)) < 0)
	{
		perror("bind");
		exit(2);
	}

	if(listen(sock,_BACKLOG_) < 0)
	{
		perror("listen");
		exit(3);
	}

	return sock;
}
int read_data(int fd,char *buf,int size)
{
	assert(buf);
	int ret = -1;
	int index = 0;
	printf("join read_data\n");
	while(index < size)
	{
		printf("join read_data while\n");
		ret = read(fd,buf+index,size-index);
		printf("ret:%d,read success,fd is %d\n",ret,fd);
		if(ret > 0)
		{
			index += ret;
		}
		else
		{
			if(errno == EAGAIN)
			{
				printf("EAGAIN\n");
				return index;
			}
			perror("read");
			printf("now fd is %d",fd);
			return index;
		}
	}
	return index;
}

static int epoll_server(int sock)
{
	int epoll_fd = epoll_create(256);
	if(epoll_fd < 0)
	{
		perror("epoll_create");
		exit(4);
	}

	printf("epoll_create success\n");
	struct epoll_event ev;
	ev.events = EPOLLIN |EPOLLET;
	ev.data.fd = sock;

	if(epoll_ctl(epoll_fd,EPOLL_CTL_ADD,sock,&ev) < 0)
	{
		perror("epoll_ctl");
		exit(5);
	}

	//设置一个输出的参数数组;
	struct epoll_event ev_out[_MAX_FD_NUM_];

	int max = _MAX_FD_NUM_;
	int timeout = 5000;
	int num = -1;
	int i = 0;
	int done = 0;
	data_buf_p mem = (data_buf_p)malloc(sizeof(data_buf_t));
	while(!done)
	{
		 //switch(num = epoll_wait(epoll_fd,ev_out,max,timeout))

		num = epoll_wait(epoll_fd,ev_out,max,timeout);
		printf("num is %d\n",num);
		switch(num)
		{
			case 0://timeout
				printf("timeout \n");
				break;
			case -1:
				perror("epoll_wait");
				break;
			default:
				{
					for(i = 0; i < num;++i)
					{
						if((ev_out[i].data.fd == sock) && (ev_out[i].events & (EPOLLIN | EPOLLET)))
						{
							struct sockaddr_in client;
							socklen_t len = sizeof(client);

							int fd = ev_out[i].data.fd;
							int newsock = accept(fd,(struct sockaddr*)&client,&len);
							if(newsock < 0)
							{
								perror("newsock");
								continue;
							}
							int err = set_non_block(newsock);
							if(err <  0)
							{
								printf("non_block error\n");
								close(newsock);
								continue;
							}
							ev.events = EPOLLIN | EPOLLET;
							ev.data.fd = newsock;
							epoll_ctl(epoll_fd,EPOLL_CTL_ADD,newsock,&ev);
							printf("get a new connect\n");

						}
						else if(ev_out[i].events & (EPOLLIN |EPOLLET))
						{
							printf("join read\n");
							int fd = ev_out[i].data.fd;
						//	data_buf_p mem = (data_buf_p)malloc(sizeof(data_buf_t));
							if(!mem)
							{
								perror("malloc");
								continue;
							}

							mem->fd = fd;
							ssize_t _s = read_data(mem->fd,mem->buf,sizeof(mem->buf)-1);
							 // ssize_t _s = read_data(mem->fd,mem->buf,sizeof(mem->buf) - 1);
							if(_s > 0)
							{
								mem->buf[_s] = ‘\0‘;
								printf("%d client:%s,len:%d\n",mem->fd,mem->buf,_s);
								ev.events = EPOLLOUT | EPOLLET;
								ev.data.ptr = mem;
								epoll_ctl(epoll_fd,EPOLL_CTL_MOD,fd,&ev);
								printf("change fd success EPOLLOUT\n");
							}
							else if(_s == 0)
							{
								printf("client close...\n");
								epoll_ctl(epoll_fd,EPOLL_CTL_DEL,fd,NULL);
								close(fd);
								free(mem);
							}
							else
							{
								printf("data_read is failed");
								continue;
							}
						}
						else if (ev_out[i].events & (EPOLLOUT |EPOLLET))
						{
							data_buf_p mem = (data_buf_p)ev_out[i].data.ptr;
							int fd = mem->fd;
							char *buf = mem->buf;
							write(fd,buf,strlen(buf));
							ev.events = EPOLLIN | EPOLLET;
							ev.data.ptr = mem->buf;
							ev.data.fd = mem->fd;
							epoll_ctl(epoll_fd,EPOLL_CTL_MOD,fd,&ev);
							printf("echo write success,change fd EPOLLIN\n");
						}
						else
						{

						}
					}
				}
				break;
		}
	}
}

int main(int argc,char *argv[])
{
	if(argc != 3)
	{
		usage(argv[0]);
		return -1;
	}

	int port = atoi(argv[2]);
	char *ip = argv[1];
	int listen_sock = startup(ip,port);
	printf("listen succed\n");
	epoll_server(listen_sock);
	close(listen_sock);

	return 0;
}

client端和之前的代码一样。就贴上来了。

看一下运行结果:

epoll的优点:

1.支持一个进程打开大数目的socket描述符(FD)

select 最不能忍受的是一个进程所打开的FD是有一定限制的,由FD_SETSIZE设置,默认

值是2048。对于那些需要支持的上万连接数目的IM服务器来说显然太少了。这时候你一是

可以选择修改这个宏然后重新编译内核,不过资料也同时指出这样会带来网络效率的下降,

二是可以选择多进程的解决方案(传统的 Apache方案),不过虽然linux上面创建进程的代价比

较小,但仍旧是不可忽视的,加上进程间数据同步远比不上线程间同步的高效,所以也不

是一种完美的方案。不过 epoll则没有这个限制,它所支持的FD上限是最大可以打开文件的

数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目

可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。

2.IO效率不随FD数目增加而线性下降

传统的select/poll另一个致命弱点就是当你拥有一个很大的socket集合,不过由于网络延时,

任一时间只有部分的socket是"活跃"的,但是select/poll每次调用都会线性扫描全部的集合,

导致效率呈现线性下降。但是epoll不存在这个问题,它只会对"活跃"的socket进行操作---这

是因为在内核实现中epoll是根据每个fd上面的callback函数实现的。那么,只有"活跃"的

socket才会主动的去调用 callback函数,其他idle状态socket则不会,在这点上,epoll实现了

一个"伪"AIO,因为这时候推动力在os内核。在一些 benchmark中,如果所有的socket基本上

都是活跃的---比如一个高速LAN环境,epoll并不比select/poll有什么效率,相反,如果过多

使用epoll_ctl,效率相比还有稍微的下降。但是一旦使用idle connections模拟WAN环境,epoll的

效率就远在select/poll之上了。

3.使用mmap加速内核与用户空间的消息传递

这点实际上涉及到epoll的具体实现了。无论是select,poll还是epoll都需要内核把FD消息通

知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核于用户

空间mmap同一块内存实现的。而如果你想我一样从2.5内核就关注epoll的话,一定不会忘记

手工 mmap这一步的。

时间: 2024-10-28 05:04:09

socket编程:多路复用I/O服务端客户端之epoll的相关文章

socket编程,简单多线程服务端测试程序

socket编程,简单多线程服务端测试程序 前些天重温了MSDN关于socket编程的WSAStartup.WSACleanup.socket.closesocket.bind.listen.accept.recv.send等函数的介绍,今天写了一个CUI界面的测试程序(依赖MFC)作为补充.程序功能简介如下: 1:一个线程做监听用. 2:监听线程收到客户端连接后,创建新线程接收客户端数据.所有对客户端线程将加入容器,以便管理. 3:服务端打印所有客户端发来的信息. 4:服务端CUI界面输入数字

socket编程:多路复用I/O服务端客户端。

其实在之前的TCP之中,我们编程实现了多进程,多线程机制下的TCP服务器,但是对于这种的TCP服务器而言,存在太大的资源局限性.所以我们可以是用I/0模型中的多路复用I/O模型来进行编程. 他的具体思想就是:当前进程可以处理多个相应时间,记录多个描述符,然后控制轮询时间态,当有响应产生的时候我们就去保存当前响应文件描述符,对他进行连接处理/数据传输就OK了.在一个进程进行多个响应时间的答复情况下,可以大大的节约我们系统所消耗的资源. 对这个进行操作的函数就是select(): 他的函数原型如下:

socket编程:多路复用I/O服务端客户端之poll

一. 关于poll 对于IO复用模型,其优点无疑是免去了对一个个IO事件就绪的等待,转而代之的是同时对多个IO数据的检测,当检测等待的事件中至少有一个就绪的时候,就会返回告诉用户进程"已经有数据准备好了,快看看是哪个赶紧处理",而对于IO复用的实现,除了可以用select函数,另外一个函数仍然支持这种复用IO模型,就是poll函数: 二. poll函数的用法 虽然同样是对多个IO事件进行检测等待,但poll和select多少还是有些不同的: 函数参数中, 先来说nfds,这个是指当前需

Socket编程实践(6) --TCP服务端注意事项

僵尸进程处理 1)通过忽略SIGCHLD信号,避免僵尸进程 在server端代码中添加 signal(SIGCHLD, SIG_IGN); 2)通过wait/waitpid方法,解决僵尸进程 signal(SIGCHLD,onSignalCatch); void onSignalCatch(int signalNumber) { wait(NULL); } 3) 如果多个客户端同时关闭, 问题描述如下面两幅图所示: /** client端实现的测试代码**/ int main() { int s

TCP/IP网络编程之基于TCP的服务端/客户端(二)

回声客户端问题 上一章TCP/IP网络编程之基于TCP的服务端/客户端(一)中,我们解释了回声客户端所存在的问题,那么单单是客户端的问题,服务端没有任何问题?是的,服务端没有问题,现在先让我们回顾下服务端的I/O代码 echo_server.c --while ((str_len = read(clnt_sock, messag, 1024)) != 0) write(clnt_sock, messag, str_len);-- 接着,我们回顾客户端的代码 echo_client.c -- wr

[javaSE] 网络编程(TCP服务端客户端互访阻塞)

客户端给服务端发送数据,服务端收到数据后,给客户端反馈数据 客户端: 获取Socket对象,new出来,构造参数:String的ip地址,int的端口号 调用Socket对象的getOutputStream()方法,获取到OutputStream对象 调用OutputStream对象的write()方法,输出流输出数据,参数:byte[]字节数组 调用Socket对象的getInputStream()方法,获取到InputStream对象 调用InputStream对象的read()方法,读取数

socket基础实例(一个服务端对应一个客户端情形)

服务端处理1个客户端的例子 运行结果: 执行服务端进程: [[email protected] single_link]# ./server [server]: begin [server]: loop...... [server]: client[127.0.0.138528] is connected [server]: recv from client[127.0.0.1:38528]: data=12345, len=1024 [server]: send to client[127.0

Python socket模块实现TCP服务端客户端

Python socket模块实现TCP服务端客户端 写了详细的注释,如果有哪一行不明白,可留言哦. 服务端脚本 # _*_ coding: utf-8 _*_ __author__ = 'xiaoke' __date__ = '2018/6/13 14:39' # 这个脚本创建一个TCP服务器,它接收来自客户端的消息,然后将消息加上时间戳前缀并返回客户端 import socket from time import ctime HOST = '' PORT = 21567 BUFSIZ = 4

手写一个模块化的 TCP 服务端客户端

前面的博客 基于 socket 手写一个 TCP 服务端及客户端 写过一个简单的 TCP 服务端客户端,没有对代码结构进行任何设计,仅仅是实现了相关功能,用于加深对 socket 编程的认识. 这次我们对整个代码结构进行一下优化,使其模块化,易扩展,成为一个简单意义上的“框架”. 对于 Socket 编程这类所需知识偏底层的情况(OS 协议栈的运作机制,TCP 协议的理解,多线程的理解,BIO/NIO 的理解,阻塞函数的运作原理甚至是更底层处理器的中断.网卡等外设与内核的交互.核心态与内核态的切