libevent事件处理的中心部分——事件主循环,根据系统提供的事件多路分发机制执行事件循环,对已注册的就绪事件,调用注册事件的回调函数来处理事件。
事件处理主循环
libevent的事件主循环主要是通过event_base_loop ()函数完成的,其主要操作如下面的流程图所示,event_base_loop所作的就是持续执行下面的循环。
上图的简单描述就是:
- 校正系统当前时间。
- 将当前时间与存放时间的最小堆中的时间依次进行比较,将所有时间小于当前时间的定时器事件从堆中取出来加入到活动事件队列中。
- 调用I/O封装(比如:Epoll)的事件分发函数dispatch函数,以当前时间与时间堆中的最小值之间的差值(最小堆取最小值复杂度为O(1))作为Epoll/epoll_wait(Epoll.c/dispatch/407)的timeout值,在其中将触发的I/O和信号事件加入到活动事件队列中。
- 调用函数event_process_active(Event.c/1406)遍历活动事件队列,依次调用注册的回调函数处理相应事件。
int event_base_loop(struct event_base *base, int flags) { const struct eventop *evsel = base->evsel; void *evbase = base->evbase; struct timeval tv; struct timeval *tv_p; int res, done; // 清空时间缓存 base->tv_cache.tv_sec = 0; // evsignal_base是全局变量,在处理signal时,用于指名signal所属的event_base实例 if (base->sig.ev_signal_added) evsignal_base = base; done = 0; while (!done) { // 事件主循环 // 查看是否需要跳出循环,程序可以调用event_loopexit_cb()设置event_gotterm标记 // 调用event_base_loopbreak()设置event_break标记 if (base->event_gotterm) { base->event_gotterm = 0; break; } if (base->event_break) { base->event_break = 0; break; } // 校正系统时间,如果系统使用的是非MONOTONIC时间,用户可能会向后调整了系统时间 // 在timeout_correct函数里,比较last wait time和当前时间,如果当前时间< last wait time // 表明时间有问题,这是需要更新timer_heap中所有定时事件的超时时间。 timeout_correct(base, &tv); // 根据timer heap中事件的最小超时时间,计算系统I/O demultiplexer的最大等待时间 tv_p = &tv; if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) { timeout_next(base, &tv_p); } else { // 依然有未处理的就绪时间,就让I/O demultiplexer立即返回,不必等待 // 下面会提到,在libevent中,低优先级的就绪事件可能不能立即被处理 evutil_timerclear(&tv); } // 如果当前没有注册事件,就退出 if (!event_haveevents(base)) { event_debug(("%s: no events registered.", __func__)); return (1); } // 更新last wait time,并清空time cache gettime(base, &base->event_tv); base->tv_cache.tv_sec = 0; // 调用系统I/O demultiplexer等待就绪I/O events,可能是epoll_wait,或者select等; // 在evsel->dispatch()中,会把就绪signal event、I/O event插入到激活链表中 res = evsel->dispatch(base, evbase, tv_p); if (res == -1) return (-1); // 将time cache赋值为当前系统时间 gettime(base, &base->tv_cache); // 检查heap中的timer events,将就绪的timer event从heap上删除,并插入到激活链表中 timeout_process(base); // 调用event_process_active()处理激活链表中的就绪event,调用其回调函数执行事件处理 // 该函数会寻找最高优先级(priority值越小优先级越高)的激活事件链表, // 然后处理链表中的所有就绪事件; // 因此低优先级的就绪事件可能得不到及时处理; if (base->event_count_active) { event_process_active(base); if (!base->event_count_active && (flags & EVLOOP_ONCE)) done = 1; } else if (flags & EVLOOP_NONBLOCK) done = 1; } // 循环结束,清空时间缓存 base->tv_cache.tv_sec = 0; event_debug(("%s: asked to terminate loop.", __func__)); return (0); }
I/O和Timer事件的统一
libevent将Timer和Signal事件都统一到了系统的I/O 的demultiplex机制中了,相信读者从上面的流程和代码中也能窥出一斑了,下面就再啰嗦一次了。首先将Timer事件融合到系统I/O多路复用机制中,还是相当清晰的,因为系统的I/O机制像select()和epoll_wait()都允许程序制定一个最大等待时间(也称为最大超时时间)timeout,即使没有I/O事件发生,它们也保证能在timeout时间内返回。那么根据所有Timer事件的最小超时时间来设置系统I/O的timeout时间;当系统I/O返回时,再激活所有就绪的Timer事件就可以了,这样就能将Timer事件完美的融合到系统的I/O机制中了。
这是在Reactor和Proactor模式(主动器模式,比如Windows上的IOCP)中处理Timer事件的经典方法了,ACE采用的也是这种方法.堆是一种经典的数据结构,向堆中插入、删除元素时间复杂度都是O(lgN),N为堆中元素的个数,而获取最小key值(小根堆)的复杂度为O(1);因此变成了管理Timer事件的绝佳人选(当然是非唯一的),libevent就是采用的堆结构。
I/O和Signal事件的统一
Signal是异步事件的经典事例,将Signal事件统一到系统的I/O多路复用中就不像Timer事件那么自然了,Signal事件的出现对于进程来讲是完全随机的,进程不能只是测试一个变量来判别是否发生了一个信号,而是必须告诉内核“在此信号发生时,请执行如下的操作”。如果当Signal发生时,并不立即调用event的callback函数处理信号,而是设法通知系统的I/O机制,让其返回,然后再统一和I/O事件以及Timer一起处理,不就可以了嘛。是的,这也是libevent中使用的方法。
//libevent中Signal事件的管理是通过结构体evsignal_info完成的,结构体位于evsignal.h文件中,定义如下: struct evsignal_info { struct event ev_signal; //为socket pair的读socket向event_base注册读事件时使用的event结构体 int ev_signal_pair[2]; //socket pair对 int ev_signal_added; //记录ev_signal事件是否已经注册了 volatile sig_atomic_t evsignal_caught; //是否有信号发生的标记;是volatile类型,因为它会在另外的线程中被修改; struct event_list evsigevents[NSIG]; //数组,evsigevents[signo]表示注册到信号signo的事件链表; sig_atomic_t evsigcaught[NSIG]; //具体记录每个信号触发的次数,evsigcaught[signo]是记录信号signo被触发的次数; #ifdef HAVE_SIGACTION struct sigaction **sh_old; #else ev_sighandler_t **sh_old; //记录了原来的signal处理函数指针,当信号signo注册的event被清空时,需要重新设置其处理函数; #endif int sh_old_max; }; //的初始化包括,创建socket pair,设置ev_signal事件(但并没有注册,而是等到有信号注册时才检查并注册),并将所有标记置零,初始化信号的注册事件链表指针等。
运行循环
一旦有了一个已经注册了某些事件的event_base,就需要让libevent等待事件并且通知事件的发生。
#define EVLOOP_ONCE 0x01 #define EVLOOP_NONBLOCK 0x02 #define EVLOOP_NO_EXIT_ON_EMPTY 0x04 int event_base_loop(struct event_base *base, int flags);
默认情况下,event_base_loop()函数运行直到event_base中没有已经注册的事件为止。执行循环的时候,函数重复地检查是否有任何已经注册的事件被触发(比如说,读事件的文件描述符已经就绪,可以读取了;或者超时事件的超时时间即将到达)。如果有事件被触发,函数标记被触发的事件为“激活的”,并且执行这些事件。
在flags参数中设置一个或者多个标志就可以改变event_base_loop()的行为。
- 如果设置了EVLOOP_ONCE,循环将等待某些事件成为激活的,执行激活的事件直到没有更多的事件可以执行,然会返回。
- 如果设置了EVLOOP_NONBLOCK,循环不会等待事件被触发:循环将仅仅检测是否有事件已经就绪,可以立即触发,如果有,则执行事件的回调。
- 完成工作后,如果正常退出,event_base_loop()返回0;如果因为后端中的某些未处理错误而退出,则返回-1。
为方便起见,也可以调用
int event_base_dispatch(struct event_base *base);
event_base_dispatch()等同于没有设置标志的event_base_loop()。所以,event_base_dispatch()将一直运行,直到没有已经注册的事件了,或者调用了event_base_loopbreak()或者event_base_loopexit()为止。
停止循环
如果想在移除所有已注册的事件之前停止活动的事件循环,可以调用两个稍有不同的函数。
int event_base_loopexit(struct event_base *base,const struct timeval *tv); int event_base_loopbreak(struct event_base *base);
event_base_loopexit()让event_base在给定时间之后停止循环。如果tv参数为NULL,event_base会立即停止循环,没有延时。如果event_base当前正在执行任何激活事件的回调,则回调会继续运行,直到运行完所有激活事件的回调之才退出。event_base_loopbreak()让event_base立即退出循环。它与event_base_loopexit(base,NULL)的不同在于,如果event_base当前正在执行激活事件的回调,它将在执行完当前正在处理的事件后立即退出。
有时候需要知道对event_base_dispatch()或者event_base_loop()的调用是正常退出的,还是因为调用event_base_loopexit()或者event_base_break()而退出的。可以调用下述函数来确定是否调用了loopexit或者break函数。
int event_base_got_exit(struct event_base *base); int event_base_got_break(struct event_base *base);
这两个函数分别会在循环是因为调用event_base_loopexit()或者event_base_break()而退出的时候返回true,否则返回false。下次启动事件循环的时候,这些值会被重设。