Memcached线程模型分析

前面提到过Memcached的线程模型:Memcached使用Libevent,以此为基础来处理事件。其原理为:启动时的线程为main thread,它包含一个event_base,之后创建多个worker thread;每个work thread中也有一个event_base。main thread中的event_base负责监听网络,接收新连接;当建立连接后就把新连接交给worker thread来处理。

Memcached的main函数在Memcached.c中。main函数中初始化了系统设置,初始化多线程:

1、初始化了main thread中的event_base、初始化多线程、初始化网络

/* initialize main thread libevent instance */
    main_base = event_init();//初始化main event_base
……
/* start up worker threads if MT mode */
    memcached_thread_init(settings.num_threads, main_base);//初始化工作线程
……
//初始化socket并监听。这里假设使用的是TCP
        if (settings.port && server_sockets(settings.port, tcp_transport,
                                           portnumber_file)) {
            vperror("failed to listen on TCP port %d", settings.port);
            exit(EX_OSERR);
        }

2、接着main thread初始化worker thread。每个worker thread都对应一个结构体LIBEVENT_THREAD,包含其相关信息

//定义在Memcached.h中
typedef struct {
    pthread_t thread_id;        /* unique ID of this thread */
    struct event_base *base;    /* libevent handle this thread uses */
    struct event notify_event;  /* listen event for notify pipe */
    int notify_receive_fd;      /* receiving end of notify pipe */
    int notify_send_fd;         /* sending end of notify pipe */
    struct thread_stats stats;  /* Stats generated by this thread */
    struct conn_queue *new_conn_queue; /* queue of new connections to handle */
    cache_t *suffix_cache;      /* suffix cache */
} LIBEVENT_THREAD;

了解了这个结构体,再来看一下main thread和worker thread的初始化过程。

memcached_thread_init(settings.num_threads, main_base);//初始化工作线程
//函数memcached_thread_init定义在Thread.c中
void memcached_thread_init(int nthreads, struct event_base *main_base) {

    ……
    ……
     //LIBEVENT_THREAD包含了worker thread的相关信息,给worker thread的标识信息分配内存。
    threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
    if (! threads) {
        perror("Can‘t allocate thread descriptors");
        exit(1);
    }

    dispatcher_thread.base = main_base;//main thread的event_base
    dispatcher_thread.thread_id = pthread_self();//main thread的ID

    //为了能让main thread和workerthread通信,创建管道。worker thread监听管道的可读,当main thread需要
    //唤醒worker thread时,向管道写信息即可。
    for (i = 0; i < nthreads; i++) {
        int fds[2];
        if (pipe(fds)) {
            perror("Can‘t create notify pipe");
            exit(1);
        }

        //把创建的管道和每个worker thread关联
        threads[i].notify_receive_fd = fds[0];
        threads[i].notify_send_fd = fds[1];

        setup_thread(&threads[i]);//初始化每个worker thread
        /* Reserve three fds for the libevent base, and two for the pipe */
        stats.reserved_fds += 5;
    }

    /* Create threads after we‘ve done all the libevent setup. */
    //创建并启动worker thread。worker_libevent是worker线程的启动函数
    for (i = 0; i < nthreads; i++) {
        create_worker(worker_libevent, &threads[i]);
    }

    /* Wait for all the threads to set themselves up before returning. */
    //等待所有worker thread都启动
    pthread_mutex_lock(&init_lock);
    //每个线程启动后都会notify信号量,这里等待nthreads次notify。
    wait_for_thread_registration(nthreads);
    pthread_mutex_unlock(&init_lock);
}

在上面的代码中,setup_thread设置worker thread的event相关信息;create_worker是创建并启动线程。

//setup_thread函数
static void setup_thread(LIBEVENT_THREAD *me) {
    me->base = event_init();//worker thread的event_base

    /* Listen for notifications from other threads */
    //监听创建的管道事件,事件处理函数为thread_libevent_process
    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);//将事件和其event_base关联

    if (event_add(&me->notify_event, 0) == -1) {//将事件添加到event_base
        fprintf(stderr, "Can‘t monitor libevent notify pipe\n");
        exit(1);
    }

    me->new_conn_queue = malloc(sizeof(struct conn_queue));//为worker thread创建监听事件队列

    cq_init(me->new_conn_queue);//初始化这个队列

    ……
}

//create_worker函数
static void create_worker(void *(*func)(void *), void *arg) {
    ……
    //创建线程,func为线程启动函数。这里func为worker_libevent,arg为threads[i]
    if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
        fprintf(stderr, "Can‘t create thread: %s\n",
                strerror(ret));
        exit(1);
    }
}

再来看线程的启动函数func(worker_libevent)

static void *worker_libevent(void *arg) {
    LIBEVENT_THREAD *me = arg;

    /* Any per-thread setup can happen here; memcached_thread_init() will block until
     * all threads have finished initializing.
     */

    register_thread_initialized();//这里向main thread发起信号量的notify。main thread在wait_for_thread_registration等待每个线程都启动

    event_base_loop(me->base, 0);
    return NULL;
}

到了这里多线程模型已经基本创建完成了。

3、main thread创建listen socket并监听。

//初始化socket并监听
main()
{
    ……
        if (settings.port && server_sockets(settings.port, tcp_transport,
                                           portnumber_file)) {
            vperror("failed to listen on TCP port %d", settings.port);
            exit(EX_OSERR);
        }
    ……
}

//函数server_sockets。其实内部又调用了一个函数server_socket,看这个函数
static int server_socket(const char *interface,
                         int port,
                         enum network_transport transport,
                         FILE *portnumber_file) {
    int sfd;

    hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;//判断是TCP还是UDP

    for (next= ai; next; next= next->ai_next) {
        conn *listen_conn_add;
        if ((sfd = new_socket(next)) == -1) {//创建socket fd
            continue;
        }

        setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));//设置sfd属性

        if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {//绑定地址
            close(sfd);
            continue;
        } else {
            success++;
            if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
                perror("listen()");
                close(sfd);
                freeaddrinfo(ai);
                return 1;
            }
        }

        if (IS_UDP(transport)) {
            ……
        } else {
            if (!(listen_conn_add = conn_new(sfd, conn_listening,//在conn_new中监听连接
                                             EV_READ | EV_PERSIST, 1,
                                             transport, main_base))) {
                fprintf(stderr, "failed to create listening connection\n");
                exit(EXIT_FAILURE);
            }
            listen_conn_add->next = listen_conn;
            listen_conn = listen_conn_add;
        }
    }

    freeaddrinfo(ai);
    /* Return zero iff we detected no errors in starting up connections */
    return success == 0;
}

在conn_new设置监听连接。这里用到了一个连接的结构体conn,字段比较多,那几个字段看一下

//在Memcached.h中
typedef struct conn conn;
struct conn {
    int    sfd;//连接的fd
    sasl_conn_t *sasl_conn;
    bool authenticated;
    enum conn_states  state;//连接状态
    enum bin_substates substate;
    rel_time_t last_cmd_time;
    struct event event;//连接时间
    short  ev_flags;
   ……
    conn   *next;     /* Used for generating a list of conn structures */
    LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
};

接下来看conn_new函数

conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, enum network_transport transport,
                struct event_base *base) {
    conn *c;

    assert(sfd >= 0 && sfd < max_fds);
    c = conns[sfd];

    ……
    ……

    if (transport == tcp_transport && init_state == conn_new_cmd) {//如果是新连接,则初始化
        if (getpeername(sfd, (struct sockaddr *) &c->request_addr,
                        &c->request_addr_size)) {
            perror("getpeername");
            memset(&c->request_addr, 0, sizeof(c->request_addr));
        }
    }

    ……

    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);//创建监听的event,事件处理函数为event_handler
    event_base_set(base, &c->event);//将event和event_base相关联
    c->ev_flags = event_flags;

    if (event_add(&c->event, 0) == -1) {//添加event事件
        perror("event_add");
        return NULL;
    }

    ……
    return c;
}

4、再来看一下多线程模型中的事件处理,即main thread和worker thread如何协同工作处理事件。main thread的事件处理函数为event_handler,worker thread的事件处理函数为thread_libevent_process

先来看main thread的事件处理函数event_handler

void event_handler(const int fd, const short which, void *arg) {
    conn *c;

    c = (conn *)arg;//arg是新建的连接
    assert(c != NULL);

    c->which = which;
    ……
    drive_machine(c);//这里处理连接

    /* wait for next event */
    return;
}

//函数drive_machine
static void drive_machine(conn *c) {

    while (!stop) {

        switch(c->state) {//根据连接状态来处理
        case conn_listening:
            sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);//接收连接

            if () {
               ……
            } else {
                dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                     DATA_BUFFER_SIZE, tcp_transport);
            }

            stop = true;
            break;
            ……
        }
    }
    return;
}

//函数dispatch_conn_new。在Thread.c中
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                       int read_buffer_size, enum network_transport transport) {
    CQ_ITEM *item = cqi_new();//创建结构体CQ_ITEM,用来保存连接信息
    char buf[1];
    if (item == NULL) {
        close(sfd);
        /* given that malloc failed this may also fail, but let‘s try */
        fprintf(stderr, "Failed to allocate memory for connection object\n");
        return ;
    }

    int tid = (last_thread + 1) % settings.num_threads;//使用Round Robin分配worker thread

    LIBEVENT_THREAD *thread = threads + tid;

    last_thread = tid;

    //给结构体CQ_ITEM赋值
    item->sfd = sfd;
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;

    cq_push(thread->new_conn_queue, item);//将item添加到worker thread

    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
    buf[0] = ‘c‘;
    if (write(thread->notify_send_fd, buf, 1) != 1) {//通知worker thread
        perror("Writing to thread notify pipe");
    }
}

至此,main thread已经将连接交给了worker thread。接下来看worker thread怎么处理

static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg;
    CQ_ITEM *item;
    char buf[1];

    if (read(fd, buf, 1) != 1)
        if (settings.verbose > 0)
            fprintf(stderr, "Can‘t read from libevent pipe\n");

    switch (buf[0]) {
    case ‘c‘://main thread给worker thread插入item后,发送了‘c‘
    item = cq_pop(me->new_conn_queue);//取出这个Item

    if (NULL != item) {
        //设置这个连接的event事件,并添加到event_base
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                           item->read_buffer_size, item->transport, me->base);
        if (c == NULL) {
            if (IS_UDP(item->transport)) {
                fprintf(stderr, "Can‘t listen for events on UDP socket\n");
                exit(1);
            } else {
                if (settings.verbose > 0) {
                    fprintf(stderr, "Can‘t listen for events on fd %d\n",
                        item->sfd);
                }
                close(item->sfd);
            }
        } else {
            c->thread = me;
        }
        cqi_free(item);
    }
        break;
    /* we were told to pause and report in */
    case ‘p‘:
    register_thread_initialized();
        break;
    }
}

至此,Memcached中,线程如何创建、关联event_base、设置event_handle、main thread和worker thread如何通信等问题已经基本理清,细节暂不考虑。

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-15 21:01:42

Memcached线程模型分析的相关文章

MyCat线程模型分析

参考MyCat权威指南,对MyCat-Server里面的线程模型做简要分析: 1. 线程模型图 根据MyCat权威指南,做出以下线程模型图: MyCat的线程模型主要分为三部分(: 网络通讯线程.业务线程和定时任务线程,下面分别介绍这些线程的使用: [温馨提示] 这里排除JVM自身使用的线程,只关注MyCat服务所使用的线程,如果需要详细了解MyCat里面使用的所有线程,请参考<MyCat权威指南>-> 开发篇 -> MyCat线程模型分析 2. 网络通讯线程 MyCat-Serv

memcached 线程模型

memcached 线程可分为两种,一是负责事件处理(主线程)和内存管理等的线程, 二是负责负责连接请求处理的线程即 worker threads .这里只对 worker threads 进行讨论. 从 main 函数开始分析: int main(int argc, char* argv[]) { // in memcached.c // ... /* start up worker threads if MT mode */ memcached_thread_init(settings.nu

Android系统Surface机制的SurfaceFlinger服务的线程模型分析

文章转载至CSDN社区罗升阳的安卓之旅,原文地址:http://blog.csdn.net/luoshengyang/article/details/8062945 在前面两篇文章中,我们分析了SurfaceFlinger服务的启动过程以及SurfaceFlinger服务初始化硬件帧缓冲区的过程.从这两个过程可以知道,SurfaceFlinger服务在启动的过程中,一共涉及到了三种类型的线程,它们分别是Binder线程.UI渲染线程和控制台事件监控线程.在本文中,我们就将详细分SurfaceFl

Memcached源码分析之线程模型

作者:Calix 一)模型分析 memcached到底是如何处理我们的网络连接的? memcached通过epoll(使用libevent,下面具体再讲)实现异步的服务器,但仍然使用多线程,主要有两种线程,分别是“主线程”和“worker线程”,一个主线程,多个worker线程. 主线程负责监听网络连接,并且accept连接.当监听到连接时,accept后,连接成功,把相应的client fd丢给其中一个worker线程.worker线程接收主线程丢过来的client fd,加入到自己的epol

转:Memcached 线程部分源码分析

目前网上关于memcached的分析主要是内存管理部分,下面对memcached的线程模型做下简单分析 有不对的地方还请大家指正,对memcahced和libevent不熟悉的请先google之 先看下memcahced启动时线程处理的流程 memcached的多线程主要是通过实例化多个libevent实现的,分别是一个主线程和n个workers线程 无论是主线程还是workers线程全部通过libevent管理网络事件,实际上每个线程都是一个单独的libevent实例 主线程负责监听客户端的建

Linux 线程实现机制分析 Linux 线程实现机制分析 Linux 线程模型的比较:LinuxThreads 和 NPTL

Linux 线程实现机制分析 Linux 线程实现机制分析  Linux 线程模型的比较:LinuxThreads 和 NPTL http://www.ibm.com/developerworks/cn/linux/kernel/l-thread/ 自从多线程编程的概念出现在 Linux 中以来,Linux 多线应用的发展总是与两个问题脱不开干系:兼容性.效率.本文从线程模型入手,通过分析目前 Linux 平台上最流行的 LinuxThreads 线程库的实现及其不足,描述了 Linux 社区是

redis和memcached有什么区别?redis的线程模型是什么?为什么单线程的redis比多线程的memcached效率要高得多(为什么redis是单线程的但是还可以支撑高并发)?

1.redis和memcached有什么区别? 这个事儿吧,你可以比较出N多个区别来,但是我还是采取redis作者给出的几个比较吧 1)Redis支持服务器端的数据操作:Redis相比Memcached来说,拥有更多的数据结构和并支持更丰富的数据操作,通常在Memcached里,你需要将数据拿到客户端来进行类似的修改再set回去.这大大增加了网络IO的次数和数据体积.在Redis中,这些复杂的操作通常和一般的GET/SET一样高效.所以,如果需要缓存能够支持更复杂的结构和操作,那么Redis会是

Redis 和 Memcached 有什么区别?Redis 的线程模型是什么?为什么单线程的 Redis 比多线程的 Memcached 效率要高得多?

面试题 redis 和 memcached 有什么区别?redis 的线程模型是什么?为什么 redis 单线程却能支撑高并发? 面试官心理分析 这个是问 redis 的时候,最基本的问题吧,redis 最基本的一个内部原理和特点,就是 redis 实际上是个单线程工作模型,你要是这个都不知道,那后面玩儿 redis 的时候,出了问题岂不是什么都不知道? 还有可能面试官会问问你 redis 和 memcached 的区别,但是 memcached 是早些年各大互联网公司常用的缓存方案,但是现在近

2.redis 和 memcached 有什么区别?redis 的线程模型是什么?为什么 redis 单线程却能支撑高并发?

作者:中华石杉 面试题 redis 和 memcached 有什么区别?redis 的线程模型是什么?为什么 redis 单线程却能支撑高并发? 面试官心理分析 这个是问 redis 的时候,最基本的问题吧,redis 最基本的一个内部原理和特点,就是 redis 实际上是个单线程工作模型,你要是这个都不知道,那后面玩儿 redis 的时候,出了问题岂不是什么都不知道? 还有可能面试官会问问你 redis 和 memcached 的区别,但是 memcached 是早些年各大互联网公司常用的缓存