首先,libevent是个什么东西呢?通过阅读:官网。
libevent:一个事件通知库。libevent的API提供了一个可以执行回调函数的机制。这些事件可以是一个文件描述符或到达指定时间。而且,libevent也支持由signals或常规的timeout产生的回调。
libevent是用来替代网络服务器上的时间循环的。一个程序只需要去调用event_dispatch()然后去动态的增加或删除事件,而不用去改变事件循环。
目前,libevent支持/dev/poll, kqueue(2), event ports, POSIX select(2), Windows select(), poll(2) 和epoll(4)。它内部的事件机制完全独立于暴露在外的API,所以小的功能性更新变动不需要去重构程序。
官网还列出了一些实用了libevent的项目:Chromium(google的开源浏览器,在Mac和Linux上使用了libevent),Memcached(一个高效的,分布式存储的内存缓存系统), Transmission,NTP等等。
关于本系列:
这个系列文章会教你如何使用libevent 2.0或以后版本去用C语言写一个高效便携式异步网络IO框架。我们假设:
1. 你懂C语言。
2. 你已经了解基于C的一些系统调用(socket, connect,等)
对于异步IO的简介:
在开始的时候大多数程序员都使用的是阻塞IO调用。什么是阻塞IO调用呢?就是当你调用它时,它不会直接返回,而是等到操作完成或操作超时发生时才会返回。例如,当你调用“connect()”发起一条连接,操作系统会发送一个SYN包给对方主机。直到对方直接返回一个SYN ACK包,或超过一定时间后操作系统自动放弃,connect函数才会返回。
虽然,阻塞IO不一定是坏的。如果没有其他的事情需要你同时去处理,阻塞IO可以很好的工作。但是如果你需要同时去处理多条连接。我们更明确一些:假设你想从两条连接同时read,并且你不知道哪条会先有数据到达。Bad Example:
/* This won't work. */ char buf[1024]; int i, n; while (i_still_want_to_read()) { for (i=0; i<n_sockets; ++i) { n = recv(fd[i], buf, sizeof(buf), 0); if (n==0) handle_close(fd[i]); else if (n<0) handle_error(fd[i], errno); else handle_input(fd[i], buf, n); } }
即使fd[2]先有数据到达,你也必须先等fd[0], fd[1]有数据到达,并且接收结束之后才能再处理fd[2]。有些人通过多线程,或多进程去处理多条连接。但是进程的创建对于系统来说是昂贵的。线程虽然没有那么大的消耗,但是如果连接很多,线程的数量达到数千条,那么CPU的利用效率就会大大降低。
在Unix编程中,你可以使socket成为非阻塞。
fcntl(fd, F_SETFL, O_NONBLOCK);
这里有一个例子,虽然不是很好:
/* This will work, but the performance will be unforgivably bad. */ int i, n; char buf[1024]; for (i=0; i < n_sockets; ++i) fcntl(fd[i], F_SETFL, O_NONBLOCK); while (i_still_want_to_read()) { for (i=0; i < n_sockets; ++i) { n = recv(fd[i], buf, sizeof(buf), 0); if (n == 0) { handle_close(fd[i]); } else if (n < 0) { if (errno == EAGAIN) ; /* The kernel didn't have any data for us to read. */ else handle_error(fd[i], errno); } else { handle_input(fd[i], buf, n); } } }
现在,我们使用的就是非阻塞sockets,上面的代码可以运行,但是它的性能很差。首先,如果当前没有数据去读,那循环就会自旋,浪费掉CPU循环。其次,每次循环都会有系统调用。所以我们需要一种方式去告诉内核“在有任何一条socket准备好数据去读之前保持等待,然后告诉我哪条连接准备好了”。
一种比较老的解决方式是select()。但是使用select的一个弊端是,当连接变多,select的循环列表会变的很大。不同的操作系统提供了不同的替代方案。其中有poll(), epoll(), kqueue(), evports, 和/dev/poll。除了poll()之外,其他的这些都比select的性能好。poll提供了一个O(1)复杂度的添加,删除,和通知socket。
但是不幸的是,这些接口没有一个统一的解决方案。所以你想去写一个可移植的高性能异步程序,你就需要去抽象一个接口去提供给无论是哪个系统的人使用,都能得到最好的性能。
这就是Libevent API提供的最低级的功能。它提供了一个统一的接口,即使你的程序运行在不同的计算机上。
/* For sockaddr_in */ #include <netinet/in.h> /* For socket functions */ #include <sys/socket.h> /* For fcntl */ #include <fcntl.h> #include <event2/event.h> #include <assert.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #define MAX_LINE 16384 void do_read(evutil_socket_t fd, short events, void *arg); void do_write(evutil_socket_t fd, short events, void *arg); char rot13_char(char c) { /* We don't want to use isalpha here; setting the locale would change * which characters are considered alphabetical. */ if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M')) return c + 13; else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z')) return c - 13; else return c; } struct fd_state { char buffer[MAX_LINE]; size_t buffer_used; size_t n_written; size_t write_upto; struct event *read_event; struct event *write_event; }; struct fd_state * alloc_fd_state(struct event_base *base, evutil_socket_t fd) { struct fd_state *state = malloc(sizeof(struct fd_state)); if (!state) return NULL; state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state); if (!state->read_event) { free(state); return NULL; } state->write_event = event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state); if (!state->write_event) { event_free(state->read_event); free(state); return NULL; } state->buffer_used = state->n_written = state->write_upto = 0; assert(state->write_event); return state; } void free_fd_state(struct fd_state *state) { event_free(state->read_event); event_free(state->write_event); free(state); } void do_read(evutil_socket_t fd, short events, void *arg) { struct fd_state *state = arg; char buf[1024]; int i; ssize_t result; while (1) { assert(state->write_event); result = recv(fd, buf, sizeof(buf), 0); if (result <= 0) break; for (i=0; i < result; ++i) { if (state->buffer_used < sizeof(state->buffer)) state->buffer[state->buffer_used++] = rot13_char(buf[i]); if (buf[i] == '\n') { assert(state->write_event); event_add(state->write_event, NULL); state->write_upto = state->buffer_used; } } } if (result == 0) { free_fd_state(state); } else if (result < 0) { if (errno == EAGAIN) // XXXX use evutil macro return; perror("recv"); free_fd_state(state); } } void do_write(evutil_socket_t fd, short events, void *arg) { struct fd_state *state = arg; while (state->n_written < state->write_upto) { ssize_t result = send(fd, state->buffer + state->n_written, state->write_upto - state->n_written, 0); if (result < 0) { if (errno == EAGAIN) // XXX use evutil macro return; free_fd_state(state); return; } assert(result != 0); state->n_written += result; } if (state->n_written == state->buffer_used) state->n_written = state->write_upto = state->buffer_used = 1; event_del(state->write_event); } void do_accept(evutil_socket_t listener, short event, void *arg) { struct event_base *base = arg; struct sockaddr_storage ss; socklen_t slen = sizeof(ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if (fd < 0) { // XXXX eagain?? perror("accept"); } else if (fd > FD_SETSIZE) { close(fd); // XXX replace all closes with EVUTIL_CLOSESOCKET */ } else { struct fd_state *state; evutil_make_socket_nonblocking(fd); state = alloc_fd_state(base, fd); assert(state); /*XXX err*/ assert(state->write_event); event_add(state->read_event, NULL); } } void run(void) { evutil_socket_t listener; struct sockaddr_in sin; struct event_base *base; struct event *listener_event; base = event_base_new(); if (!base) return; /*XXXerr*/ sin.sin_family = AF_INET; sin.sin_addr.s_addr = 0; sin.sin_port = htons(40713); listener = socket(AF_INET, SOCK_STREAM, 0); evutil_make_socket_nonblocking(listener); #ifndef WIN32 { int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); } #endif if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) { perror("bind"); return; } if (listen(listener, 16)<0) { perror("listen"); return; } listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base); /*XXX check it */ event_add(listener_event, NULL); event_base_dispatch(base); } int main(int c, char **v) { setvbuf(stdout, NULL, _IONBF, 0); run(); return 0; }
关于Windows?
你可能注意到了,虽然上边的代码可以得到更高的性能,但是它也同时增加了程序的复杂度。返回到使用fork的时候,我们不需要去为每条连接都管理一个buffer:我们仅仅需要给每条连接一个独立的栈-buffer。我们不需要去显示的跟踪每条连接是在读还是写:它应该隐藏在代码里。而且,我们也不需要一个结构体去跟踪每个操作到底完成了多少。
如果你有很丰富的Windows网络编程经验,你一定会知道上边的Libevent并没有达到最佳性能。在Windows上,最高效的异步IO并不是select系列的接口,而是IOCP API(完成端口)。当一个socket准备好一个操作时,IOCP并不会通知你。而是,程序告诉Windows去开启一个网络操作,然后IOCP告诉程序这个操作已经做完。
幸运的是,Libevent 2 “bufferevents”接口解决了这两个问题:它写起来更简便,并且提供了一个可以在Windows和Unix上通用的接口。
/* For sockaddr_in */ #include <netinet/in.h> /* For socket functions */ #include <sys/socket.h> /* For fcntl */ #include <fcntl.h> #include <event2/event.h> #include <event2/buffer.h> #include <event2/bufferevent.h> #include <assert.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #define MAX_LINE 16384 void do_read(evutil_socket_t fd, short events, void *arg); void do_write(evutil_socket_t fd, short events, void *arg); char rot13_char(char c) { /* We don't want to use isalpha here; setting the locale would change * which characters are considered alphabetical. */ if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M')) return c + 13; else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z')) return c - 13; else return c; } void readcb(struct bufferevent *bev, void *ctx) { struct evbuffer *input, *output; char *line; size_t n; int i; input = bufferevent_get_input(bev); output = bufferevent_get_output(bev); while ((line = evbuffer_readln(input, &n, EVBUFFER_EOL_LF))) { for (i = 0; i < n; ++i) line[i] = rot13_char(line[i]); evbuffer_add(output, line, n); evbuffer_add(output, "\n", 1); free(line); } if (evbuffer_get_length(input) >= MAX_LINE) { /* Too long; just process what there is and go on so that the buffer * doesn't grow infinitely long. */ char buf[1024]; while (evbuffer_get_length(input)) { int n = evbuffer_remove(input, buf, sizeof(buf)); for (i = 0; i < n; ++i) buf[i] = rot13_char(buf[i]); evbuffer_add(output, buf, n); } evbuffer_add(output, "\n", 1); } } void errorcb(struct bufferevent *bev, short error, void *ctx) { if (error & BEV_EVENT_EOF) { /* connection has been closed, do any clean up here */ /* ... */ } else if (error & BEV_EVENT_ERROR) { /* check errno to see what error occurred */ /* ... */ } else if (error & BEV_EVENT_TIMEOUT) { /* must be a timeout event handle, handle it */ /* ... */ } bufferevent_free(bev); } void do_accept(evutil_socket_t listener, short event, void *arg) { struct event_base *base = arg; struct sockaddr_storage ss; socklen_t slen = sizeof(ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if (fd < 0) { perror("accept"); } else if (fd > FD_SETSIZE) { close(fd); } else { struct bufferevent *bev; evutil_make_socket_nonblocking(fd); bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); bufferevent_setcb(bev, readcb, NULL, errorcb, NULL); bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE); bufferevent_enable(bev, EV_READ|EV_WRITE); } } void run(void) { evutil_socket_t listener; struct sockaddr_in sin; struct event_base *base; struct event *listener_event; base = event_base_new(); if (!base) return; /*XXXerr*/ sin.sin_family = AF_INET; sin.sin_addr.s_addr = 0; sin.sin_port = htons(40713); listener = socket(AF_INET, SOCK_STREAM, 0); evutil_make_socket_nonblocking(listener); #ifndef WIN32 { int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); } #endif if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) { perror("bind"); return; } if (listen(listener, 16)<0) { perror("listen"); return; } listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base); /*XXX check it */ event_add(listener_event, NULL); event_base_dispatch(base); } int main(int c, char **v) { setvbuf(stdout, NULL, _IONBF, 0); run(); return 0; }