平时做项目,涉及到网络层的都是epoll,前几年发现redis的epoll实现起来非常的精简,好用。因为提供的接口简单,爱并实现的很高效。于是,我就提取出来,直接使用。
今天又打开该文件详细的看看他的实现细节。
首先简单介绍epoll,它是linux内核下的一个高效的处理大批量的文件操作符的一个实现。不仅限于socket fd。
他在超时时间内会唤醒有事件的操作符。其中有两种模式 1、水平触发(Level Triggered)2、边缘触发(Edge Triggered)
简单概括这两种,水平触发是是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。
而边缘模式是 有读写等事件,只会通知你一次,直到下一次事件再一次触发。所以,使用该模式的时候,一般情况下比较复杂,要对操作符读取数据到完全为空。才能保证数据不会丢失
epoll 提供了三个接口,
首先通过epoll_create(int maxfds)来创建一个epoll的句柄
之后在你的网络主循环里面,每一帧的调用epoll_wait(int epfd, epoll_event *events, int max events, int timeout)来查询所有的网络接口,看哪一个可以读,哪一个可以写了。
epoll_ctl用来添加/修改/删除需要侦听的文件描述符及其事件。
好了,当我们了解了如何使用这三个函数后,redis ae 做得就是如何友好的使用这三个函数了,并给我们提供方面的接口,让我们只关注 数据包的处理。
首先了解一下ae的结构体eventloop
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
我们首先只关注epoll相关,maxfd,表示能够注册的最大操作符数,也就是aeFileEvent *events的最大数组,
int setsize; /* max number of file descriptors tracked */
同上,能够分配的最大数组的数量。events 成员保存了我们要注册到epoll里的操作符,以及对该操作符事件到来的时候进行的操作的相关函数,具体看一下起结构体我们就明白了。
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
mask表示 我们对改操作符所要关心的时间,比如可读,可写时间的掩码。rfileProc为当我们有可读事件的时候,进行对其回调,wfileProc表示当有可写的事件的时候,进行回调。clientData为函数参数。
一般都是以fd作为aeFileEvent的数组下标,当有fd有事件时候,我们可以直接用fd定位到相应的位置,直接调用相应的函数。
redis 通过提供int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
该方法,将fd注册进入。
eventloop中的fired 用来临时保存epoll_wait中要有事件触发的操作符。
相应的结构体为
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
有了这个结构体,我们就可以根据fd 找到相应的struct aeFileEvent相对应的的数组元素了。
那么最终是什么时候被填充呢,下面我们就要看epoll_wait函数的调用了。
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
首先关注一下,第二个参数,struct epoll_event
* events
这个结构体是epoll的参数,它是什么样子呢?
//保存触发事件的某个文件描述符相关的数据(与具体使用方式有关)
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
//感兴趣的事件和被触发的事件
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
而redis将该结构体放到了,
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
内,epfd是epoll_create的返回句柄,events用来保存epoll_wait的的第二个参数结果。能够保存的数目也就是我们之前提到的setSize大小了。
好,当我们调用epoll_wait后,就会有相应的epoll_event填充到state内,那么,我们就要对这些fd进行操作了。
请看代码。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
我们可以清晰的认识到,epoll_wait返回值是本次触发的时间数量,然后将其便利,相应的事件放入到fired中,
紧接着,对fired进行遍历操作
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn‘t
* processed, so we check if the event is still valid. */
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
这就完成了一次对操作符的操作实现
那么为什么中间还弄了一个fired的临时存储fd的成员呢,多了一次循环操作,我想应该是为了实现kqueue,select,epoll的提供共了一个通用的结构。
是不是很简单?
因此,我在项目中是直接拿来使用的。非常好用方便。
http://blog.chinaunix.net/uid-24517549-id-4051156.html 这篇文章对epoll的使用有很详细的讲解。
更多文章,欢迎访问
http://blog.csdn.net/wallwind