概述:本来自己写了一篇,但是看到http://www.tuicool.com/articles/NfqeEf这篇更好,于是就算转载过来了吧。下一篇我们会讲到twemproxy的核心,即其转发响应转发请求。
作为一个proxy服务,我们应该深入了解它的进程和事件模型。
进程模型:
通过grep代码,查找fork()和pthread_create()函数的调用,并分析调用者两个函数的代码。我们可以看到twemproxy使用的是单进程单线程来处理请求,另外有一个线程来处理统计数据,但并不参与处理请求功能。
对于单进程单线程的程序,我们就不用去关心线程间通信等内容,可以把重点放在事件驱动模型上来。
事件模型:
Twemproxy自己封装了一个简单的事件驱动模型,相关代码在event目录下。
我们可以先看一下event/nc_event.h:
ifndef _NC_EVENT_H_
#define _NC_EVENT_H_ #include #define EVENT_SIZE 1024 #define EVENT_READ 0x0000ff #define EVENT_WRITE 0x00ff00 #define EVENT_ERR 0xff0000 typedef int ( * event_cb_t ) ( void * , uint32_t ) ; typedef void ( * event_stats_cb_t ) ( void * , void * ) ; |
文件开头定义了几个宏,然后定义了两个回调函数指针,分别是event的回调函数event_cb_t和状态统计的回调函数event_stats_cb_t。紧接下来是一个条件宏定义:
Twemproxy针对不同的平台定义了不同的event_base结构,因为我们用的是Linux,所以只关心epoll下的代码就行了:
|
epoll下的event_base定义很简单:一个epoll句柄ep,一个指向epoll_event数组的指针*event,一个event计数nevent,一个callback函数。
那么需要实现哪些函数呢?在event/nc_event.h的最底下有声明:
|
创建:
我们先看看如何创建event_base,下面是event/nc_epoll.c当中的event_base_create()函数定义:
struct event_base *
event_base_create ( int nevent , event_cb_t cb ) { struct event_base * evb ; int status , ep ; struct epoll_event * event ; ASSERT ( nevent > 0 ) ; ep = epoll_create ( nevent ) ; if ( ep < 0 ) { log_error ( "epoll create of size %d failed: %s" , nevent , strerror ( errno ) ) ; return NULL ; } event = nc_calloc ( nevent , sizeof ( * event ) ) ; if ( event == NULL ) { status = close ( ep ) ; if ( status < 0 ) { log_error ( "close e %d failed, ignored: %s" , ep , strerror ( errno ) ) ; } return NULL ; } evb = nc_alloc ( sizeof ( * evb ) ) ; if ( evb == NULL ) { nc_free ( event ) ; status = close ( ep ) ; if ( status < 0 ) { log_error ( "close e %d failed, ignored: %s" , ep , strerror ( errno ) ) ; } return NULL ; } evb -> ep = ep ; evb -> event = event ; evb -> nevent = nevent ; evb -> cb = cb ; log_debug ( LOG_INFO , "e %d with nevent %d" , evb -> ep , evb -> nevent ) ; return evb ; } |
创建event_base的过程,就是初始化前面定义的event_base结构体的内容:
- 调用epoll_create()创建epoll句柄
- 申请nevent * sizeof(struct epoll_event)大小的空间,存放epoll_event指针数组
- 申请sizeof(struct event_base),存放event_base
- 对event_base中的ep、event、nevent、cb进行赋值。
调用event_base_create的地点在nc_core.c当中:
1 2 |
ctx -> evb = event_base_create ( EVENT_SIZE , & core_core ) ; |
其中EVENT_SIZE刚才nc_event.h的代码里有定义(1024),而core_core函数则在nc_core.c当中定义。调用event_base_create()之后,我们便得到了一个可以容纳1024个event的event_base。
添加event:
创建完event_base之后,便要向event_base中添加event,需要用到event_add_*函数,我们先来看看event_add_conn()函数:
int event_add_conn ( struct event_base * evb , struct conn * c ) { int status ; struct epoll_event event ; int ep = evb -> ep ; ASSERT ( ep > 0 ) ; ASSERT ( c != NULL ) ; ASSERT ( c -> sd > 0 ) ; event . events = ( uint32_t ) ( EPOLLIN | EPOLLOUT | EPOLLET ) ; event . data . ptr = c ; status = epoll_ctl ( ep , EPOLL_CTL_ADD , c -> sd , & event ) ; if ( status < 0 ) { log_error ( "epoll ctl on e %d sd %d failed: %s" , ep , c -> sd , strerror ( errno ) ) ; } else { c -> send_active = 1 ; c -> recv_active = 1 ; } return status ; } |
conn的结构非常复杂,我们这里不去细看,因为这个函数只用到了conn->sd、conn->send_active和conn->recv_active,其中sd是要注册的文件描述符,而send_active和read_active只是布尔型的标记位,用来标记当前连接是否开启了读/写。
初始化event的时候,加入了EPOLLIN和EPOLLOUT事件,并指定了边缘触发EPOLLET模式,即当文件描述符的可读/可写状态切换的时候返回,成功后设置conn的send_active和read_active为1。
event_add_in()和event_add_out()其实是修改event的,具体分别将其修改为只读或者读写,同时修改send_active或者read_active。
那么什么时候添加event呢,一般是在创建连接之后,我们检索一下event_add_conn的调用位置:
event / nc_epoll .c : event_add_conn ( struct event_base * evb , struct conn * c ) event / nc_kqueue .c : event_add_conn ( struct event_base * evb , struct conn * c ) event / nc_event .h : int event_add_conn ( struct event_base * evb , struct conn * c ) ; event / nc_evport .c : event_add_conn ( struct event_base * evb , struct conn * c ) nc_proxy .c : status = event_add_conn ( ctx -> evb , p ) ; nc_proxy .c : status = event_add_conn ( ctx -> evb , c ) ; nc_server .c : status = event_add_conn ( ctx -> evb , conn ) ; |
其中nc_server.c中的调用是在Twemproxy创建了与后端Memcached Server的链接之后,而nc_proxy.c中的调用则分别是在创建listen端口之后,以及accept客户端的链接之后,这符合我们的预期。接下来我们分别来看看这3处。
监听端口:
针对每一个server pool,Twemproxy会创建一个proxy,每个proxy监听一个指定的端口,监听后即需要像event_base添加事件,相关代码在nc_proxy.c的proxy_listen()函数中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
static rstatus_t proxy_listen ( struct context * ctx , struct conn * p ) { …… status = event_add_conn ( ctx -> evb , p ) ; if ( status < 0 ) { log_error ( "event add conn p %d on addr ‘%.*s‘ failed: %s" , p -> sd , pool -> addrstr . len , pool -> addrstr . data , strerror ( errno ) ) ; return NC_ERROR ; } status = event_del_out ( ctx -> evb , p ) ; if ( status < 0 ) { log_error ( "event del out p %d on addr ‘%.*s‘ failed: %s" , p -> sd , pool -> addrstr . len , pool -> addrstr . data , strerror ( errno ) ) ; return NC_ERROR ; } return NC_OK ; } |
代码已经省略掉了不相关的部分,可以看出,proxy_listen()函数先调用event_add_conn添加一个事件,然后将其修改为只读(等待客户端连接)。
等待连接:
accept的代码在nc_proxy.c的proxy_accept()函数中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
static rstatus_t proxy_accept ( struct context * ctx , struct conn * p ) { …… status = event_add_conn ( ctx -> evb , c ) ; if ( status < 0 ) { log_error ( "event add conn from p %d failed: %s" , p -> sd , strerror ( errno ) ) ; c -> close ( ctx , c ) ; return status ; } log_debug ( LOG_NOTICE , "accepted c %d on p %d from ‘%s‘" , c -> sd , p ->sd , nc_unresolve_peer_desc ( c -> sd ) ) ; return NC_OK ; } |
同样省略了不相关的部分,可以看出在accept到客户端的文件描述符后,也是通过event_add_conn添加事件,不过这时没有调用event_del_out(),因为我们是需要向客户端写回数据的。
连接Memcached:
连接后端Memcached的代码在nc_server.c当中的server_connect()函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
rstatus_t server_connect ( struct context * ctx , struct server * server , struct conn * conn ) { …… status = event_add_conn ( ctx -> evb , conn ) ; if ( status != NC_OK ) { log_error ( "event add conn s %d for server ‘%.*s‘ failed: %s" , conn -> sd , server -> pname . len , server -> pname . data , strerror ( errno ) ) ; goto error ; } …… } |
初始化连接之后,调用event_add_conn()来添加事件,然后连接Memcached,因为连接需要读写,所以没有调用event_del_out()。
从上面的分析看,添加事件的过程基本上是一致的。
事件循环:
完成了event_base的创建和event的添加之后,就进入事件循环,前一节我们讲到时间循环是在core_loop()中做的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
rstatus_t core_loop ( struct context * ctx ) { int nsd ; nsd = event_wait ( ctx -> evb , ctx -> timeout ) ; if ( nsd < 0 ) { return nsd ; } core_timeout ( ctx ) ; stats_swap ( ctx -> stats ) ; return NC_OK ; } |
core_loop()调用了event_wait()函数做事件循环,event_wait()的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
int event_wait ( struct event_base * evb , int timeout ) { int ep = evb -> ep ; struct epoll_event * event = evb -> event ; int nevent = evb -> nevent ; ASSERT ( ep > 0 ) ; ASSERT ( event != NULL ) ; ASSERT ( nevent > 0 ) ; for ( ; ; ) { int i , nsd ; nsd = epoll_wait ( ep , event , nevent , timeout ) ; if ( nsd > 0 ) { for ( i = 0 ; i < nsd ; i ++ ) { struct epoll_event * ev = & evb -> event [ i ] ; uint32_t events = 0 ; log_debug ( LOG_VVERB , "epoll " PRIX32 " triggered on conn %p", ev -> events , ev -> data . ptr ) ; if ( ev -> events & EPOLLERR ) { events |= EVENT_ERR ; } if ( ev -> events & ( EPOLLIN | EPOLLHUP ) ) { events |= EVENT_READ ; } if ( ev -> events & EPOLLOUT ) { events |= EVENT_WRITE ; } if ( evb -> cb != NULL ) { evb -> cb ( ev -> data . ptr , events ) ; } } return nsd ; } if ( nsd == 0 ) { if ( timeout == - 1 ) { log_error ( "epoll wait on e %d with %d events and %d timeout " "returned no events" , ep , nevent , timeout ) ; return - 1 ; } return 0 ; } if ( errno == EINTR ) { continue ; } log_error ( "epoll wait on e %d with %d events failed: %s" , ep , nevent , strerror ( errno ) ) ; return - 1 ; } NOT_REACHED ( ) ; } |
epoll_wait()返回当前I/O就绪的句柄数nsd,同时将epoll_event放入*event指向的数组中,随后就可以根据nsd和*event去依次访问每一个句柄,调用event_base的cb函数进行处理。
事件触发:
前面我们分析event_base_create()的时候了解到,event_base_create()这个函数仅在nc_core.c当中调用,也就是说全局只有一个event_base,而这个event_base上所有的事件触发的回调函数event_base->cb都是指向调用event_base_create()函数时所设置的core_core()。那么问题来了,如何区分不同的连接(后端Memcached连接句柄,客户端连接句柄以及proxy的监听端口句柄)呢?
我们需要具体看一下core_core()这个函数的代码,这个函数非常短:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
rstatus_t core_core ( void * arg , uint32_t events ) { rstatus_t status ; struct conn * conn = arg ; struct context * ctx = conn_to_ctx ( conn ) ; log_debug ( LOG_VVERB , "event " PRIX32 " on %c %d" , events , conn -> client ? ‘c‘ : ( conn -> proxy ? ‘p‘ : ‘s‘ ) , conn -> sd ) ; conn -> events = events ; if ( events & EVENT_ERR ) { core_error ( ctx , conn ) ; return NC_ERROR ; } if ( events & EVENT_READ ) { status = core_recv ( ctx , conn ) ; if ( status != NC_OK || conn -> done || conn -> err ) { core_close ( ctx , conn ) ; return NC_ERROR ; } } if ( events & EVENT_WRITE ) { status = core_send ( ctx , conn ) ; if ( status != NC_OK || conn -> done || conn -> err ) { core_close ( ctx , conn ) ; return NC_ERROR ; } } return NC_OK ; } |
可以看出,针对不同的事件:EVENT_READ、EVENT_WRITE,core_core()函数只是分别去调用了core_recv()和core_send()函数,而EVENT_ERR事件则报错。
那么core_recv()和core_send()函数做了什么呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
static rstatus_t core_recv ( struct context * ctx , struct conn * conn ) { rstatus_t status ; status = conn -> recv ( ctx , conn ) ; if ( status != NC_OK ) { log_debug ( LOG_INFO , "recv on %c %d failed: %s" , conn -> client ? ‘c‘ : ( conn -> proxy ? ‘p‘ : ‘s‘ ) , conn -> sd , strerror ( errno ) ) ; } return status ; } static rstatus_t core_send ( struct context * ctx , struct conn * conn ) { rstatus_t status ; status = conn -> send ( ctx , conn ) ; if ( status != NC_OK ) { log_debug ( LOG_INFO , "send on %c %d failed: %s" , conn -> client ? ‘c‘ : ( conn -> proxy ? ‘p‘ : ‘s‘ ) , conn -> sd , strerror ( errno ) ) ; } return status ; } |
可以看出他们只是分别调用了conn->recv()和conn->send(),我们看一下conn这个结构体相关部分的定义:
1 2 3 4 5 6 7 8 9 10 11 12 |
struct conn { …… conn_recv_t recv ; conn_recv_next_t recv_next ; conn_recv_done_t recv_done ; conn_send_t send ; conn_send_next_t send_next ; conn_send_done_t send_done ; conn_close_t close ; conn_active_t active ; …… } |
省略掉了无关的部分,可以看到conn包含了若干handler的函数指针,这其中就包含recv和send。那么这些字段是在创建conn的时候初始化的。针对不同的conn,为他们初始化不同的handler即可。
那么,首先我们看看listen的过程,调用proxy_listen()函数的位置是在nc_proxy.c的proxy_each_init()当中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
rstatus_t proxy_each_init ( void * elem , void * data ) { rstatus_t status ; struct server_pool * pool = elem ; struct conn * p ; p = conn_get_proxy ( pool ) ; if ( p == NULL ) { return NC_ENOMEM ; } status = proxy_listen ( pool -> ctx , p ) ; if ( status != NC_OK ) { p -> close ( pool -> ctx , p ) ; return status ; } log_debug ( LOG_NOTICE , "p %d listening on ‘%.*s‘ in %s pool %" PRIu32 " ‘%.*s‘" " with %" PRIu32 " servers" , p -> sd , pool -> addrstr . len , pool -> addrstr . data , pool -> redis ? "redis" : "memcache" , pool -> idx , pool -> name . len , pool -> name . data , array_n ( & pool -> server ) ) ; return NC_OK ; } |
在调用proxy_listen()之前,调用了conn_get_proxy()来创建并初始化conn:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
struct conn * conn_get_proxy ( void * owner ) { struct server_pool * pool = owner ; struct conn * conn ; conn = _conn_get ( ) ; if ( conn == NULL ) { return NULL ; } conn -> redis = pool -> redis ; conn -> proxy = 1 ; conn -> recv = proxy_recv ; conn -> recv_next = NULL ; conn -> recv_done = NULL ; conn -> send = NULL ; conn -> send_next = NULL ; conn -> send_done = NULL ; conn -> close = proxy_close ; conn -> active = NULL ; conn -> ref = proxy_ref ; conn -> unref = proxy_unref ; conn -> enqueue_inq = NULL ; conn -> dequeue_inq = NULL ; conn -> enqueue_outq = NULL ; conn -> dequeue_outq = NULL ; conn -> ref ( conn , owner ) ; log_debug ( LOG_VVERB , "get conn %p proxy %d" , conn , conn -> proxy ) ; return conn ; } |
通过这个函数定义,我们看到为conn初始化了recv、close、ref和unref这4个handler,其中recv的handler是proxy_recv,那么也就是说,当监听句柄由不可读变为可读时,将会触发proxy_recv这个回调函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
rstatus_t proxy_recv ( struct context * ctx , struct conn * conn ) { rstatus_t status ; ASSERT ( conn -> proxy && ! conn -> client ) ; ASSERT ( conn -> recv_active ) ; conn -> recv_ready = 1 ; do { status = proxy_accept ( ctx , conn ) ; if ( status != NC_OK ) { return status ; } } while ( conn -> recv_ready ) ; return NC_OK ; } |
果不其然,这个函数调用了proxy_accept():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
static rstatus_t proxy_accept ( struct context * ctx , struct conn * p ) { rstatus_t status ; struct conn * c ; int sd ; ASSERT ( p -> proxy && ! p -> client ) ; ASSERT ( p -> sd > 0 ) ; ASSERT ( p -> recv_active && p -> recv_ready ) ; for ( ; ; ) { sd = accept ( p -> sd , NULL , NULL ) ; if ( sd < 0 ) { if ( errno == EINTR ) { log_debug ( LOG_VERB , "accept on p %d not ready - eintr" , p -> sd ) ; continue ; } if ( errno == EAGAIN || errno == EWOULDBLOCK ) { log_debug ( LOG_VERB , "accept on p %d not ready - eagain" , p -> sd ); p -> recv_ready = 0 ; return NC_OK ; } log_error ( "accept on p %d failed: %s" , p -> sd , strerror ( errno ) ) ; return NC_ERROR ; } break ; } c = conn_get ( p -> owner , true , p -> redis ) ; if ( c == NULL ) { log_error ( "get conn for c %d from p %d failed: %s" , sd , p -> sd , strerror ( errno ) ) ; status = close ( sd ) ; if ( status < 0 ) { log_error ( "close c %d failed, ignored: %s" , sd , strerror ( errno ) ) ; } return NC_ENOMEM ; } c -> sd = sd ; …… } |
代码已经滤掉了无关部分,proxy_accept在accept到客户端的句柄后,通过调用conn_get()创建并初始化conn,在conn_get()中对conn做了如下初始化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
conn -> recv = msg_recv ; conn -> recv_next = req_recv_next ; conn -> recv_done = req_recv_done ; conn -> send = msg_send ; conn -> send_next = rsp_send_next ; conn -> send_done = rsp_send_done ; conn -> close = client_close ; conn -> active = client_active ; conn -> ref = client_ref ; conn -> unref = client_unref ; conn -> enqueue_inq = NULL ; conn -> dequeue_inq = NULL ; conn -> enqueue_outq = req_client_enqueue_omsgq ; conn -> dequeue_outq = req_client_dequeue_omsgq ; |
而在创建Memcached连接时,是这么初始化的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
conn -> recv = msg_recv ; conn -> recv_next = rsp_recv_next ; conn -> recv_done = rsp_recv_done ; conn -> send = msg_send ; conn -> send_next = req_send_next ; conn -> send_done = req_send_done ; conn -> close = server_close ; conn -> active = server_active ; conn -> ref = server_ref ; conn -> unref = server_unref ; conn -> enqueue_inq = req_server_enqueue_imsgq ; conn -> dequeue_inq = req_server_dequeue_imsgq ; conn -> enqueue_outq = req_server_enqueue_omsgq ; conn -> dequeue_outq = req_server_dequeue_omsgq ; |
这些handler的作用,这里暂时不细究,只需要理解事件触发回调函数的过程就可以了。
小结:
通过上面的分析,我们知道了Twemproxy使用了单进程模型,并且自己封装了事件模型,这与redis是类似的。这也就意味着,Twemproxy不适合做CPU密集的操作,仅作为proxy来用是OK的,若要对Twemproxy的功能进行扩展,要特别注意这一点,以免影响Twemproxy的吞吐量。