前面提到过Memcached的线程模型:Memcached使用Libevent,以此为基础来处理事件。其原理为:启动时的线程为main thread,它包含一个event_base,之后创建多个worker thread;每个work thread中也有一个event_base。main thread中的event_base负责监听网络,接收新连接;当建立连接后就把新连接交给worker thread来处理。
Memcached的main函数在Memcached.c中。main函数中初始化了系统设置,初始化多线程:
1、初始化了main thread中的event_base、初始化多线程、初始化网络
/* initialize main thread libevent instance */
main_base = event_init();//初始化main event_base
……
/* start up worker threads if MT mode */
memcached_thread_init(settings.num_threads, main_base);//初始化工作线程
……
//初始化socket并监听。这里假设使用的是TCP
if (settings.port && server_sockets(settings.port, tcp_transport,
portnumber_file)) {
vperror("failed to listen on TCP port %d", settings.port);
exit(EX_OSERR);
}
2、接着main thread初始化worker thread。每个worker thread都对应一个结构体LIBEVENT_THREAD
,包含其相关信息
//定义在Memcached.h中
typedef struct {
pthread_t thread_id; /* unique ID of this thread */
struct event_base *base; /* libevent handle this thread uses */
struct event notify_event; /* listen event for notify pipe */
int notify_receive_fd; /* receiving end of notify pipe */
int notify_send_fd; /* sending end of notify pipe */
struct thread_stats stats; /* Stats generated by this thread */
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
cache_t *suffix_cache; /* suffix cache */
} LIBEVENT_THREAD;
了解了这个结构体,再来看一下main thread和worker thread的初始化过程。
memcached_thread_init(settings.num_threads, main_base);//初始化工作线程
//函数memcached_thread_init定义在Thread.c中
void memcached_thread_init(int nthreads, struct event_base *main_base) {
……
……
//LIBEVENT_THREAD包含了worker thread的相关信息,给worker thread的标识信息分配内存。
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
if (! threads) {
perror("Can‘t allocate thread descriptors");
exit(1);
}
dispatcher_thread.base = main_base;//main thread的event_base
dispatcher_thread.thread_id = pthread_self();//main thread的ID
//为了能让main thread和workerthread通信,创建管道。worker thread监听管道的可读,当main thread需要
//唤醒worker thread时,向管道写信息即可。
for (i = 0; i < nthreads; i++) {
int fds[2];
if (pipe(fds)) {
perror("Can‘t create notify pipe");
exit(1);
}
//把创建的管道和每个worker thread关联
threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];
setup_thread(&threads[i]);//初始化每个worker thread
/* Reserve three fds for the libevent base, and two for the pipe */
stats.reserved_fds += 5;
}
/* Create threads after we‘ve done all the libevent setup. */
//创建并启动worker thread。worker_libevent是worker线程的启动函数
for (i = 0; i < nthreads; i++) {
create_worker(worker_libevent, &threads[i]);
}
/* Wait for all the threads to set themselves up before returning. */
//等待所有worker thread都启动
pthread_mutex_lock(&init_lock);
//每个线程启动后都会notify信号量,这里等待nthreads次notify。
wait_for_thread_registration(nthreads);
pthread_mutex_unlock(&init_lock);
}
在上面的代码中,setup_thread
设置worker thread的event相关信息;create_worker
是创建并启动线程。
//setup_thread函数
static void setup_thread(LIBEVENT_THREAD *me) {
me->base = event_init();//worker thread的event_base
/* Listen for notifications from other threads */
//监听创建的管道事件,事件处理函数为thread_libevent_process
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event);//将事件和其event_base关联
if (event_add(&me->notify_event, 0) == -1) {//将事件添加到event_base
fprintf(stderr, "Can‘t monitor libevent notify pipe\n");
exit(1);
}
me->new_conn_queue = malloc(sizeof(struct conn_queue));//为worker thread创建监听事件队列
cq_init(me->new_conn_queue);//初始化这个队列
……
}
//create_worker函数
static void create_worker(void *(*func)(void *), void *arg) {
……
//创建线程,func为线程启动函数。这里func为worker_libevent,arg为threads[i]
if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
fprintf(stderr, "Can‘t create thread: %s\n",
strerror(ret));
exit(1);
}
}
再来看线程的启动函数func(worker_libevent)
static void *worker_libevent(void *arg) {
LIBEVENT_THREAD *me = arg;
/* Any per-thread setup can happen here; memcached_thread_init() will block until
* all threads have finished initializing.
*/
register_thread_initialized();//这里向main thread发起信号量的notify。main thread在wait_for_thread_registration等待每个线程都启动
event_base_loop(me->base, 0);
return NULL;
}
到了这里多线程模型已经基本创建完成了。
3、main thread创建listen socket并监听。
//初始化socket并监听
main()
{
……
if (settings.port && server_sockets(settings.port, tcp_transport,
portnumber_file)) {
vperror("failed to listen on TCP port %d", settings.port);
exit(EX_OSERR);
}
……
}
//函数server_sockets。其实内部又调用了一个函数server_socket,看这个函数
static int server_socket(const char *interface,
int port,
enum network_transport transport,
FILE *portnumber_file) {
int sfd;
hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;//判断是TCP还是UDP
for (next= ai; next; next= next->ai_next) {
conn *listen_conn_add;
if ((sfd = new_socket(next)) == -1) {//创建socket fd
continue;
}
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));//设置sfd属性
if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {//绑定地址
close(sfd);
continue;
} else {
success++;
if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
perror("listen()");
close(sfd);
freeaddrinfo(ai);
return 1;
}
}
if (IS_UDP(transport)) {
……
} else {
if (!(listen_conn_add = conn_new(sfd, conn_listening,//在conn_new中监听连接
EV_READ | EV_PERSIST, 1,
transport, main_base))) {
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}
listen_conn_add->next = listen_conn;
listen_conn = listen_conn_add;
}
}
freeaddrinfo(ai);
/* Return zero iff we detected no errors in starting up connections */
return success == 0;
}
在conn_new设置监听连接。这里用到了一个连接的结构体conn
,字段比较多,那几个字段看一下
//在Memcached.h中
typedef struct conn conn;
struct conn {
int sfd;//连接的fd
sasl_conn_t *sasl_conn;
bool authenticated;
enum conn_states state;//连接状态
enum bin_substates substate;
rel_time_t last_cmd_time;
struct event event;//连接时间
short ev_flags;
……
conn *next; /* Used for generating a list of conn structures */
LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
};
接下来看conn_new
函数
conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
struct event_base *base) {
conn *c;
assert(sfd >= 0 && sfd < max_fds);
c = conns[sfd];
……
……
if (transport == tcp_transport && init_state == conn_new_cmd) {//如果是新连接,则初始化
if (getpeername(sfd, (struct sockaddr *) &c->request_addr,
&c->request_addr_size)) {
perror("getpeername");
memset(&c->request_addr, 0, sizeof(c->request_addr));
}
}
……
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);//创建监听的event,事件处理函数为event_handler
event_base_set(base, &c->event);//将event和event_base相关联
c->ev_flags = event_flags;
if (event_add(&c->event, 0) == -1) {//添加event事件
perror("event_add");
return NULL;
}
……
return c;
}
4、再来看一下多线程模型中的事件处理,即main thread和worker thread如何协同工作处理事件。main thread的事件处理函数为event_handler
,worker thread的事件处理函数为thread_libevent_process
。
先来看main thread的事件处理函数event_handler
void event_handler(const int fd, const short which, void *arg) {
conn *c;
c = (conn *)arg;//arg是新建的连接
assert(c != NULL);
c->which = which;
……
drive_machine(c);//这里处理连接
/* wait for next event */
return;
}
//函数drive_machine
static void drive_machine(conn *c) {
while (!stop) {
switch(c->state) {//根据连接状态来处理
case conn_listening:
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);//接收连接
if () {
……
} else {
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, tcp_transport);
}
stop = true;
break;
……
}
}
return;
}
//函数dispatch_conn_new。在Thread.c中
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport) {
CQ_ITEM *item = cqi_new();//创建结构体CQ_ITEM,用来保存连接信息
char buf[1];
if (item == NULL) {
close(sfd);
/* given that malloc failed this may also fail, but let‘s try */
fprintf(stderr, "Failed to allocate memory for connection object\n");
return ;
}
int tid = (last_thread + 1) % settings.num_threads;//使用Round Robin分配worker thread
LIBEVENT_THREAD *thread = threads + tid;
last_thread = tid;
//给结构体CQ_ITEM赋值
item->sfd = sfd;
item->init_state = init_state;
item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size;
item->transport = transport;
cq_push(thread->new_conn_queue, item);//将item添加到worker thread
MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
buf[0] = ‘c‘;
if (write(thread->notify_send_fd, buf, 1) != 1) {//通知worker thread
perror("Writing to thread notify pipe");
}
}
至此,main thread已经将连接交给了worker thread。接下来看worker thread怎么处理
static void thread_libevent_process(int fd, short which, void *arg) {
LIBEVENT_THREAD *me = arg;
CQ_ITEM *item;
char buf[1];
if (read(fd, buf, 1) != 1)
if (settings.verbose > 0)
fprintf(stderr, "Can‘t read from libevent pipe\n");
switch (buf[0]) {
case ‘c‘://main thread给worker thread插入item后,发送了‘c‘
item = cq_pop(me->new_conn_queue);//取出这个Item
if (NULL != item) {
//设置这个连接的event事件,并添加到event_base
conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport, me->base);
if (c == NULL) {
if (IS_UDP(item->transport)) {
fprintf(stderr, "Can‘t listen for events on UDP socket\n");
exit(1);
} else {
if (settings.verbose > 0) {
fprintf(stderr, "Can‘t listen for events on fd %d\n",
item->sfd);
}
close(item->sfd);
}
} else {
c->thread = me;
}
cqi_free(item);
}
break;
/* we were told to pause and report in */
case ‘p‘:
register_thread_initialized();
break;
}
}
至此,Memcached中,线程如何创建、关联event_base、设置event_handle、main thread和worker thread如何通信等问题已经基本理清,细节暂不考虑。
版权声明:本文为博主原创文章,未经博主允许不得转载。