epoll是linux在2.6内核新增的系统调用,为了更高效地实现多路IO复用。与poll和select相比,它的高效体现在
1、 select和poll都是线性扫描FD(文件描述符)的集合,随着集合的增大,性能自然下降,且不能通过返回值得知那些文件描述符有I/O事件发生,还要再便利一边集合才能找出有I/O事件的文件描述符。而epool只是管理活跃的I/O的FD,不会因为集合增大而性能下降。
2、 select和poll是通过内存拷贝的方式,把FD消息从内核空间拷贝到用户空间,效率地下。epool是通过共享内存方式。
epool的使用也比较简单。说明如下:
1、创建epoll句柄。
int epoll_create(int size);
参数size是监听句柄的最大数目,它的大小和机器内存有关。返回值是创建的epoll句柄。句柄使用完后调用close()关闭,因为它占用系统的FD。创建epoll句柄的本质是向内核申请空间,用来存放关注的FD集合以及事件。
2、epoll事件注册函数
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event)
注册想要监听的事件类型。参数epfd是epoll_create创建的句柄。
参数op表示动作,在使用时用三个宏来表示:
EPOLL_CTL_ADD表示注册新的fd到epfd中。
EPOLL_CTL_MOD表示修改已经注册过的fd的监听事件。
EPOLL_CTL_DEL表示从epf中删除fd。
参数fd是要监听的fd。
参数event是向内核注册的监听事件。struct epoll_event结构如下:
typedef union epoll_data {
void *ptr;//可以通过这个指针指向自定义结构
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
3、等待事件
int epoll_wait(int epfd,struct epoll_event* events, int maxevents,int timeout)
参数epfd是epoll_create创建的句柄。参数events是从内核得到的事件的集合。参数maxevents告诉内核events的大小,它可以大于epoll_create时指定的size的大小。参数timeout是超时事件,单位为毫秒。
函数返回需要处理的事件的个数,返回0则表示已超时。
epoll事件的触发有两种模式,边沿触发Edge Triggered(ET)和水平Level Triggered(LT)。
ET模式:边沿触发是指当监听的FD状态变换时获得通知。例如接收数据的FD缓存一次接收了2k数据,但是读取时只读取了1k,缓存还有1k数据,但是不会再获得通知。所以采用边沿触发时,如果读取的数据长度等于设定值的话就要继续读取,直到产生EAGAIN。ET模式只支持非阻塞的I/O,以防止一个句柄的读/写操作把处理多个文件描述符的任务饿死。
LT模式:水平触发是默认的模式,它支持阻塞和非阻塞I/O。当I/O就绪时,就会获得通知,如果不对I/O进行处理或没有处理完,内核就会一直通知。
下面结合网上例子,用epoll写一个Echo服务器。实现很简单,把accept、recv、send的I/O操作放到epoll中,有了I/O事件后调用相应的回调函数即可。具体步骤如下:
1、初始化监听FD,把accept事件作为回调函数 ‘AcceptConnection’,等待客户端connect后在服务的调用。
2、在回调函数’ AcceptConnection’中,把accept后的FD注册为等待EPOLLIN,即等待客户端send操作,回调函数为’RecvData‘。
3、客户端写操作(send)后,服务端调用了’RecvData’,在这个函数中,把对应的FD的I/O事件改为EPOLLOUT,回调函数为’SendData’,因为下一步就是向客户端写收到的数据了。
4、在’SendData’函数中,发送接收到的数据,并把对应FD等待事件改为EPOLLIN,重新等待客户端的send操作。
在实现时,用到了封装的数据结构myevent,目的是为了便于操作事件,把事件的回调函数、buffer、上一次激活事件等做了封装。并使用了全局数组集合来作为事件的集合。
代码如下:
//EchoEpoll.cpp
#include <iostream>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <string.h> //bzero Function
#include <stdlib.h>//atoi()
#define MAX_EVENTS 1000
struct myevent
{
public:
int fd;
void (*callBack)(int fd, int events, void* arg);
int events;
void* arg;
bool status;//ture represent it has been added to epoll
char buffer[128];
int len;//use length
int offset;//mark from where to send
long lastActiveTime;
};
//Global variables
int g_epollFd;//epoll fd
myevent g_Events[MAX_EVENTS+1];//event set. The last one is used to accept
//Set event
void SetEvent(myevent* ev, int fd, void(*callBack)(int, int , void*), void* arg)
{
ev->fd=fd;
ev->callBack=callBack;
ev->events=0;
ev->arg=arg;
ev->status=false;
bzero(ev->buffer,sizeof ev->buffer);
ev->offset=0;
ev->len=0;
ev->lastActiveTime=time(NULL);
}
//only set event‘s callback function, don‘t set it‘s buffer
void SetEventCallback(myevent* ev, void(*callBack)(int, int ,void*))
{
ev->callBack=callBack;
}
void AddEvent(int epfd, int events, myevent* ev)
{
struct epoll_event epv={0,{0}};
epv.data.ptr=ev;
epv.events=ev->events=events;
int op;
if(ev->status)// have been added
op=EPOLL_CTL_MOD;
else
{
op=EPOLL_CTL_ADD;
ev->status=true;
}
if(epoll_ctl(epfd,op,ev->fd,&epv)<0)
printf("Event add failed FD=%d,events=%d\n",ev->fd,events);
else
printf("Event add successfully,FD=%d,op=%d,events=%0x\n",ev->fd,op,events);
}
void DelEvent(int epfd, myevent* ev)
{
struct epoll_event epv={0,{0}};
if(!ev->status)//have not been added
return;
epv.data.ptr=ev;
ev->status=false;
epoll_ctl(epfd,EPOLL_CTL_DEL,ev->fd,&epv);
}
//forward declation
void RecvData(int fd, int events, void* arg);
void SendData(int fd, int events, void* arg);
void AcceptConnection(int fd, int events, void* arg)
{
struct sockaddr_in sin;
socklen_t len=sizeof(struct sockaddr_in);
int nfd=accept(fd,(struct sockaddr*)&sin,&len) ;
if(nfd==-1)
{
if(errno!= EAGAIN&&errno!=EINTR)
{}
printf("%s, accept, %d",__func__,errno);
}
int i;
do
{
for( i=0; i<MAX_EVENTS;++i)//find the first unused Event
{
if(g_Events[i].status==false)
break;
}
if(i==MAX_EVENTS)
{
printf("%s:max connection limit %d\n",__func__,MAX_EVENTS );
break;//break do-while
}
int iret=fcntl(nfd,F_SETFL, O_NONBLOCK);
if(iret<0)
{
printf("%s, fctl nonblocking failed:%d",__func__, iret);
break;//break do-while
}
//set nfd, wait to receive data
SetEvent(&g_Events[i], nfd, RecvData,&g_Events[i]);
AddEvent(g_epollFd,EPOLLIN,&g_Events[i]);
}while(0);
printf("new connection[%s:%d] [time:%d], pos[%d]\n",inet_ntoa(sin.sin_addr),ntohs(sin.sin_port),g_Events[i].lastActiveTime,i);
}
void RecvData(int fd, int events, void* arg)
{
struct myevent* ev=(struct myevent*)arg;
int len=recv(fd,ev->buffer+ev->len,sizeof(ev->buffer)-1-ev->len,0);
//has been received data, delete recv event
DelEvent(g_epollFd,ev);
if(len>0)
{
ev->len+=len;
ev->buffer[len]=‘\0‘;
printf("C[%d]:%s\n",fd,ev->buffer);
SetEventCallback(ev,SendData);
//ev->callBack=SendData;
AddEvent(g_epollFd,EPOLLOUT,ev);
}
else if(len==0)
{
close(ev->fd);
printf("[fd=%d] pos[%d], closed gracefully.\n",fd, ev->events);
}
else
{
close(ev->fd);
printf("recv[fd=%d] errno[%d]:%s\n",fd,errno,strerror(errno));
}
}
void SendData(int fd, int events, void* arg)
{
struct myevent* ev=(struct myevent*)arg;
int len=send(fd, ev->buffer+ev->offset, ev->len-ev->offset, 0);
if(len>0)
{
printf("send [fd=%d],[%d<->%d]%s\n",fd,len,ev->len,ev->buffer);
ev->offset+=len;
if(ev->offset==ev->len)// all data has been send
{
DelEvent(g_epollFd,ev);
SetEvent(ev,fd,RecvData,ev);//reset event
AddEvent(g_epollFd,EPOLLIN,ev);
}
}
else
{
close(ev->fd);
DelEvent(g_epollFd,ev);
printf("send[fd=%d] errno[%d]\n",fd,errno);
}
}
void InitListenSocket(int epfd, short port)
{
int listenFd=socket(AF_INET, SOCK_STREAM,0);
fcntl(listenFd, F_SETFL, O_NONBLOCK);//Set listen socket to Non-block
printf("server listen fd=%d\n",listenFd);
SetEvent(&g_Events[MAX_EVENTS], listenFd, AcceptConnection, &g_Events[MAX_EVENTS]);
AddEvent(epfd, EPOLLIN,&g_Events[MAX_EVENTS]);
sockaddr_in sin;
bzero(&sin, sizeof(sin));
sin.sin_family=AF_INET;
sin.sin_addr.s_addr=INADDR_ANY;
sin.sin_port=htons(port);
bind(listenFd,(const sockaddr*)&sin, sizeof(sin));
listen(listenFd,MAX_EVENTS);
}
int main(int argc, char* argv[])
{
unsigned short port=12345;
if(argc==2)
{
port=atoi(argv[1]);
}
g_epollFd=epoll_create(MAX_EVENTS);
if(g_Events<=0)
printf("Create epoll failed.\n");
InitListenSocket(g_epollFd,port);
printf("Server running:port %d\n",port);
struct epoll_event events[MAX_EVENTS];
int checkPos=0;
while(true)
{
long now=time(NULL);
for(int i=0; i< 100; ++i, checkPos++)
{
if(checkPos==MAX_EVENTS)
checkPos=0;
if(g_Events[checkPos].status==false)
continue;
long duration=now - g_Events[checkPos].lastActiveTime;
//close time out fd
if(duration>=60)//timeout
{
close(g_Events[checkPos].fd);
printf("fd=%d timeout:%d--%d\n",g_Events[checkPos].fd, g_Events[checkPos].lastActiveTime , now);
DelEvent(g_epollFd,&g_Events[checkPos]);
}
}
int fds=epoll_wait(g_epollFd,events,MAX_EVENTS,1000);
if(fds<0)
{
printf("epoll_wait error\n");
break;
}
for(int i=0; i<fds;++i)
{
myevent* ev=(struct myevent*)events[i].data.ptr;
if((events[i].events&EPOLLIN)&&(ev->events&EPOLLIN))
{
ev->callBack(ev->fd, events[i].events, ev->arg);
}
if((events[i].events&EPOLLOUT)&&(ev->events&EPOLLOUT))
{
ev->callBack(ev->fd, events[i].events, ev->arg);
}
}
}
return 0;
}
测试客户端如下:
//EchoClient.cpp
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <string.h>
int main(int argc, char* argv[])
{
int fd=socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if(fd<0)
{
perror("Create Scoket Error\n");
exit(EXIT_FAILURE);
}
struct sockaddr_in serverAddr;
bzero(&serverAddr, sizeof serverAddr);
serverAddr.sin_family=AF_INET;
serverAddr.sin_port=htons(12345);
serverAddr.sin_addr.s_addr=inet_addr("127.0.0.1");
if(connect(fd, (struct sockaddr*)&serverAddr, sizeof serverAddr)<0)
{
perror("Connect Error\n");
exit(EXIT_FAILURE);
}
char sendBuf[128]={0};
char recvBuf[128]={0};
while(fgets(sendBuf, sizeof sendBuf, stdin) != NULL)
{
write(fd, sendBuf, strlen(sendBuf));
read(fd, recvBuf, sizeof recvBuf);
fputs(recvBuf, stdout);
bzero(sendBuf, sizeof sendBuf);
bzero(recvBuf, sizeof recvBuf);
}
close(fd);
return 0;
}
版权声明:本文为博主原创文章,未经博主允许不得转载。