memcached源码阅读----使用libevent和多线程模型

本篇文章主要是我今天阅读memcached源码关于进程启动,在网络这块做了哪些事情。

一、iblievent的使用

首先我们知道,memcached是使用了iblievet作为网络框架的,而iblievet又是单线程模型的基于linux下epoll事件的异步模型。因此,其基本的思想就是 对可读,可写,超时,出错等事件进行绑定函数,等有其事件发生,对其绑定函数回调。

可以减掉了解一下 libevent基本api调用

struct event_base *base;
base = event_base_new();//初始化libevent

event_base_new对比epoll,可以理解为epoll里的epoll_create。

event_base内部有一个循环,循环阻塞在epoll调用上,当有一个事件发生的时候,才会去处理这个事件。其中,这个事件是被绑定在event_base上面的,每一个事件就会对应一个struct event,可以是监听的fd。

其中struct event 使用event_new 来创建和绑定,使用event_add来启用,例如:

struct event *listener_event;
listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);

参数说明:

base:event_base类型,event_base_new的返回值

listener:监听的fd,listen的fd

EV_READ|EV_PERSIST:事件的类型及属性

do_accept:绑定的回调函数

(void*)base:给回调函数的参数

event_add(listener_event, NULL);

对比epoll:

event_new相当于epoll中的epoll_wait,其中的epoll里的while循环,在libevent里使用event_base_dispatch。

event_add相当于epoll中的epoll_ctl,参数是EPOLL_CTL_ADD,添加事件。

注:libevent支持的事件及属性包括(使用bitfield实现,所以要用 | 来让它们合体)

EV_TIMEOUT: 超时

EV_READ: 只要网络缓冲中还有数据,回调函数就会被触发

EV_WRITE: 只要塞给网络缓冲的数据被写完,回调函数就会被触发

EV_SIGNAL: POSIX信号量

EV_PERSIST: 不指定这个属性的话,回调函数被触发后事件会被删除

EV_ET: Edge-Trigger边缘触发,相当于EPOLL的ET模式

事件创建添加之后,就可以处理发生的事件了,相当于epoll里的epoll_wait,在libevent里使用event_base_dispatch启动event_base循环,直到不再有需要关注的事件。

有了上面的分析,结合之前做的epoll服务端程序,对于一个服务器程序,流程基本是这样的:

1. 创建socket,bind,listen,设置为非阻塞模式

2. 创建一个event_base,即

[cpp] view
plain
copy

  1. struct event_base *  event_base_new(void)

3. 创建一个event,将该socket托管给event_base,指定要监听的事件类型,并绑定上相应的回调函数(及需要给它的参数)。即

[cpp] view
plain
copy

  1. struct event *  event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)

4. 启用该事件,即

[cpp] view
plain
copy

  1. int  event_add(struct event *ev, const struct timeval *tv)

5.  进入事件循环,即

[cpp] view
plain
copy

  1. int  event_base_dispatch(struct event_base *event_base)

有了上边的基础东西,可以进入memcached的阅读了。

二、memcached源码分析

main函数启动,首先会初始化很多数据,这里我们只涉及大网络这块,其他以后分析,先忽略。

1.首先初始化 主工作线程的的iblievet对象

  /* initialize main thread libevent instance */
    main_base = event_init();

最后会调用

   /* enter the event loop */
    if (event_base_loop(main_base, 0) != 0) {
        retval = EXIT_FAILURE;
    }

在该对象内部循环。不退出。

2.初始化连接的对象

static void conn_init(void) {
    freetotal = 200;
    freecurr = 0;
    if ((freeconns = calloc(freetotal, sizeof(conn *))) == NULL) {
        fprintf(stderr, "Failed to allocate connection structures\n");
    }
    return;
}

这里是先预先分配200个conn*的内存。等有连接上来,会从freeconns  取。

如下代码:

/*
 * Returns a connection from the freelist, if any.
 */
conn *conn_from_freelist() {
    conn *c;

    pthread_mutex_lock(&conn_lock);
    if (freecurr > 0) {
        c = freeconns[--freecurr];
    } else {
        c = NULL;
    }
    pthread_mutex_unlock(&conn_lock);

    return c;
}

3.那么conn的结构体内部长什么样子呢?

typedef struct conn conn;
struct conn {
    int    sfd;
    sasl_conn_t *sasl_conn;
    enum conn_states  state;
    enum bin_substates substate;
    struct event event;
    short  ev_flags;
    short  which;   /** which events were just triggered */

    char   *rbuf;   /** buffer to read commands into */
    char   *rcurr;  /** but if we parsed some already, this is where we stopped */
    int    rsize;   /** total allocated size of rbuf */
    int    rbytes;  /** how much data, starting from rcur, do we have unparsed */

    char   *wbuf;
    char   *wcurr;
    int    wsize;
    int    wbytes;
    /** which state to go into after finishing current write */
    enum conn_states  write_and_go;
    void   *write_and_free; /** free this memory after finishing writing */

    char   *ritem;  /** when we read in an item's value, it goes here */
    int    rlbytes;

    /* data for the nread state */

    /**
     * item is used to hold an item structure created after reading the command
     * line of set/add/replace commands, but before we finished reading the actual
     * data. The data is read into ITEM_data(item) to avoid extra copying.
     */

    void   *item;     /* for commands set/add/replace  */

    /* data for the swallow state */
    int    sbytes;    /* how many bytes to swallow */

    /* data for the mwrite state */
    struct iovec *iov;
    int    iovsize;   /* number of elements allocated in iov[] */
    int    iovused;   /* number of elements used in iov[] */

    struct msghdr *msglist;
    int    msgsize;   /* number of elements allocated in msglist[] */
    int    msgused;   /* number of elements used in msglist[] */
    int    msgcurr;   /* element in msglist[] being transmitted now */
    int    msgbytes;  /* number of bytes in current msg */

    item   **ilist;   /* list of items to write out */
    int    isize;
    item   **icurr;
    int    ileft;

    char   **suffixlist;
    int    suffixsize;
    char   **suffixcurr;
    int    suffixleft;

    enum protocol protocol;   /* which protocol this con<pre name="code" class="cpp">  if (sigignore(SIGPIPE) == -1) {
        perror("failed to ignore SIGPIPE; sigaction");
        exit(EX_OSERR);
    }

nection speaks */ enum network_transport transport; /* what transport is used by this connection */ /* data for UDP clients */ int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */ struct sockaddr request_addr; /* Who sent the most recent
request */ socklen_t request_addr_size; unsigned char *hdrbuf; /* udp packet headers */ int hdrsize; /* number of headers‘ worth of space is allocated */ bool noreply; /* True if the reply should not be sent. */ /* current stats command */ struct { char *buffer;
size_t size; size_t offset; } stats; /* Binary protocol stuff */ /* This is where the binary header goes */ protocol_binary_request_header binary_header; uint64_t cas; /* the cas to return */ short cmd; /* current command being processed */ int opaque; int
keylen; conn *next; /* Used for generating a list of conn structures */ LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */};


这里的所有字段就是在处理数据需要用到的。这里不详细描述。以后会慢慢分解。

因为是memcached是多线程模型,因此在从freeconn取出一个对象的时候,是要加解锁使用。

忽略SIGIPIE信号,防止rst时的程序退出

  if (sigignore(SIGPIPE) == -1) {
        perror("failed to ignore SIGPIPE; sigaction");
        exit(EX_OSERR);
    }

初始化多线程模型,并且每个线程一个iblievent的事件模型就是调用event_init函数。

/* start up worker threads if MT mode */
    thread_init(settings.num_threads, main_base);

内部实现不详细。主要是调用pthread_create函数。

4、然后开始通过端口号启动网络监听事件

代码如下:

   if (settings.port && server_sockets(settings.port, tcp_transport,
                                           portnumber_file)) {
            vperror("failed to listen on TCP port %d", settings.port);
            exit(EX_OSERR);
        }

然后调用下面的函数:

static int server_socket(const char *interface,
                         int port,
                         enum network_transport transport,
                         FILE *portnumber_file)

因为,一个主机可能会有多个网卡,比如双线机房,联通或者电信,因此内部实现会出现以下代码:

 for (next= ai; next; next= next->ai_next) {
        conn *listen_conn_add;
        if ((sfd = new_socket(next)) == -1) {
            /* getaddrinfo can return "junk" addresses,
             * we make sure at least one works before erroring.
             */
            if (errno == EMFILE) {
                /* ...unless we're out of fds */
                perror("server_socket");
                exit(EX_OSERR);
            }
            continue;
        }

static int new_socket(struct addrinfo *ai)

该函数就是调用socket函数,设置为非阻塞。

5、然后生成一个监听的conn对象

代码如下

 if (!(listen_conn_add = conn_new(sfd, conn_listening,
                                             EV_READ | EV_PERSIST, 1,
                                             transport, main_base))) {
                fprintf(stderr, "failed to create listening connection\n");
                exit(EXIT_FAILURE);
            }
            listen_conn_add->next = listen_conn;
            listen_conn = listen_conn_add;

static conn *listen_conn = NULL;

作为全局的静态的变量。无头结点的单链表

我们继续深入conn_new 函数内部

conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, enum network_transport transport,
                struct event_base *base) {
    conn *c = conn_from_freelist();

该函数主要是做了哪些动作呢?

第一,从刚才的free_cnn_list取出一个conn* 来,然互分配内存,根据相关配置信息,进行相关的字段初始化工作。

第二,加入到iblievent事件库中

event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
    event_base_set(base, &c->event);
    c->ev_flags = event_flags;

    if (event_add(&c->event, 0) == -1) {
        if (conn_add_to_freelist(c)) {
            conn_free(c);
        }
        perror("event_add");
        return NULL;
    }

这一步就是,讲sfd上的事件绑定event_handler 函数,就是当有该连接上来的时候有数据进行可读的时候绑定,回调。

7、状态机的解读

<span style="font-family: Arial, Helvetica, sans-serif; background-color: rgb(255, 255, 255);">最终event_handler函数会调用</span>
static void drive_machine(conn *c)

函数。那么这个函数做了哪些工作呢?

当然是等待连接了,那就是accept函数了。

因此,入股市conn_listening状态,

  while (!stop) {

        switch(c->state) {
        case conn_listening:
            addrlen = sizeof(addr);
            if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1)

当然同样是 讲sfd设置成非阻塞的。

这个时候是有数据上来了。

因此就要设置读命令状态了,调用以下函数:

<pre name="code" class="cpp">/*
 * Dispatches a new connection to another thread. This is only ever called
 * from the main thread, either during initialization (for UDP) or because
 * of an incoming connection.
 */
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                       int read_buffer_size, enum network_transport transport) {
    CQ_ITEM *item = cqi_new();
    char buf[1];
    int tid = (last_thread + 1) % settings.num_threads;

    LIBEVENT_THREAD *thread = threads + tid;

    last_thread = tid;

    item->sfd = sfd;
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;

    cq_push(thread->new_conn_queue, item);

    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
    buf[0] = 'c';
    if (write(thread->notify_send_fd, buf, 1) != 1) {
        perror("Writing to thread notify pipe");
    }
}

通过注释可以知道,该函数是讲一个新连接分配各其他线程,

通过代码我们可以看出

首先,分配一个item块,讲连接的socket的fd 赋值给item,同时有当前状态,标志位,读buff大小等,然后分配一个线程,讲item推送到该thread的处理队列里了。

然互,通过往管道里写入C字符,通知到管道的另一端,进行处理该操作符的事件。因此,完成了对对该连接的 分配工作。

那么我接下来看一看 线程是如果处理的。

在初始化线程的时候,已经把管道的两个操作符放入到了iblievent里了。如下代码:

  /* Listen for notifications from other threads */
    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);

    if (event_add(&me->notify_event, 0) == -1) {
        fprintf(stderr, "Can't monitor libevent notify pipe\n");
        exit(1);
    }

绑定了回调函数:

static void thread_libevent_process(int fd, short which, void *arg) 

当读到字符‘c‘的时候,就从其中队列中取出一个item*,掉用一下函数

conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, enum network_transport transport,
                struct event_base *base) 

同样,调用

 conn *c = conn_from_freelist();

取出一个conn* ,然后进行初始化,这个时候和上文讲到的一样了,知识状态不同了,

因此这里使用了一个状态机的模式了。

有如下状态:

enum conn_states {
    conn_listening,  /**< the socket which listens for connections */
    conn_new_cmd,    /**< Prepare connection for next command */
    conn_waiting,    /**< waiting for a readable socket */
    conn_read,       /**< reading in a command line */
    conn_parse_cmd,  /**< try to parse a command from the input buffer */
    conn_write,      /**< writing out a simple response */
    conn_nread,      /**< reading in a fixed number of bytes */
    conn_swallow,    /**< swallowing unnecessary bytes w/o storing */
    conn_closing,    /**< closing this connection */
    conn_mwrite,     /**< writing out many items sequentially */
    conn_max_state   /**< Max state value (used for assertion) */
};

也就是

static void drive_machine(conn *c)

的核心逻辑了。通过设置状态,然后调用不同的代码,

因此在一个状态结束之后,总是会看大如下代码调用:

/*
 * Sets a connection's current state in the state machine. Any special
 * processing that needs to happen on certain state transitions can
 * happen here.
 */
static void conn_set_state(conn *c, enum conn_states state) {
    assert(c != NULL);
    assert(state >= conn_listening && state < conn_max_state);

    if (state != c->state) {
        if (settings.verbose > 2) {
            fprintf(stderr, "%d: going from %s to %s\n",
                    c->sfd, state_text(c->state),
                    state_text(state));
        }

        if (state == conn_write || state == conn_mwrite) {
            MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
        }
        c->state = state;
    }
}

到此,网络框架部分已经基本处理完成。起始这个框架是非常简单而且实用的。

redis也是基本的思想模型,只不过是单线程的,而memcached是多线程的模型。在开发模式上可以有效的借鉴。

该文章为原创文章,更多文章,欢迎访问 http://blog.csdn.net/wallwind

时间: 2024-10-10 10:00:46

memcached源码阅读----使用libevent和多线程模型的相关文章

memcached源码阅读----使用libevent

本篇文章主要是我今天阅读memcached源码关于进程启动,在网络这块做了哪些事情. 一.iblievent的使用 首先我们知道,memcached是使用了iblievet作为网络框架的,而iblievet又是单线程模型的基于linux下epoll事件的异步模型.因此,其基本的思想就是 对可读,可写,超时,出错等事件进行绑定函数,等有其事件发生,对其绑定函数回调. 可以减掉了解一下 libevent基本api调用 struct event_base *base; base = event_bas

memcached源码分析-----安装、调试以及如何阅读memcached源码

        转载请注明出处:http://blog.csdn.net/luotuo44/article/details/42639131 安装: 安装memcached之前要先安装Libevent.现在假定Libevent安装在/usr/local/libevent目录了. 因为memcached安装后不像Libevent那样,有一堆头文件和库文件.安装后的memcached不是用来编程而直接用来运行的.所以不需要在/usr/local目录下专门为memcached建立一个目录.直接把mem

Java多线程——ReentrantReadWriteLock源码阅读

之前讲了<AQS源码阅读>和<ReentrantLock源码阅读>,本次将延续阅读下ReentrantReadWriteLock,建议没看过之前两篇文章的,先大概了解下,有些内容会基于之前的基础上阅读.这个并不是ReentrantLock简单的升级,而是落地场景的优化,我们来详细了解下吧. 背景 JUC包里面已经有一个ReentrantLock了,为何还需要一个ReentrantReadWriteLock呢?看看头注解找点线索.它是ReadWriteLock接口的实现.那看看这个接

Memcached源码分析之线程模型

作者:Calix 一)模型分析 memcached到底是如何处理我们的网络连接的? memcached通过epoll(使用libevent,下面具体再讲)实现异步的服务器,但仍然使用多线程,主要有两种线程,分别是“主线程”和“worker线程”,一个主线程,多个worker线程. 主线程负责监听网络连接,并且accept连接.当监听到连接时,accept后,连接成功,把相应的client fd丢给其中一个worker线程.worker线程接收主线程丢过来的client fd,加入到自己的epol

memcached源码分析-----memcached启动参数详解以及关键配置的默认值

转载请注明出处: http://blog.csdn.net/luotuo44/article/details/42672913 本文开启本系列博文的代码分析.本系列博文研究是memcached版本是1.4.21. 本文将给出memcached启动时各个参数的详细解释以及一些关键配置的默认值.以便在分析memcached源码的时候好随时查看.当然也方便使用memcached时可以随时查看各个参数的含义.<如何阅读memcached源码>说到memcached有很多全局变量(也就是关键配置),这些

JDK部分源码阅读与理解

本文为博主原创,允许转载,但请声明原文地址:http://www.coselding.cn/article/2016/05/31/JDK部分源码阅读与理解/ 不喜欢重复造轮子,不喜欢贴各种东西.JDK代码什么的,让整篇文章很乱...JDK源码谁都有,没什么好贴的...如果你没看过JDK源码,建议打开Eclipse边看源码边看这篇文章,看过的可以把这篇文章当成是知识点备忘录... JDK容器类中有大量的空指针.数组越界.状态异常等异常处理,这些不是重点,我们关注的应该是它的一些底层的具体实现,这篇

memcached源码分析-----item过期失效处理以及LRU爬虫

memcached源码分析-----item过期失效处理以及LRU爬虫,memcached-----item 转载请注明出处:http://blog.csdn.net/luotuo44/article/details/42963793 温馨提示:本文用到了一些可以在启动memcached设置的全局变量.关于这些全局变量的含义可以参考<memcached启动参数详解>.对于这些全局变量,处理方式就像<如何阅读memcached源代码>所说的那样直接取其默认值. 过期失效处理: 一个i

Linux c 开发 - Memcached源码分析之命令解析(2)

前言 从我们上一章<Linux c 开发 - Memcached源码分析之基于Libevent的网络模型>我们基本了解了Memcached的网络模型.这一章节,我们需要详细解读Memcached的命令解析. 我们回顾上一章发现Memcached会分成主线程和N个工作线程.主线程主要用于监听accpet客户端的Socket连接,而工作线程主要用于接管具体的客户端连接. 主线程和工作线程之间主要通过基于Libevent的pipe的读写事件来监听,当有连接练上来的时候,主线程会将连接交个某一个工作线

【原】FMDB源码阅读(一)

[原]FMDB源码阅读(一) 本文转载请注明出处 —— polobymulberry-博客园 1. 前言 说实话,之前的SDWebImage和AFNetworking这两个组件我还是使用过的,但是对于FMDB组件我是一点都没用过.好在FMDB源码中的main.m文件提供了大量的示例,况且网上也有很多最佳实践的例子,我就不在这献丑了.我们先从一个最简单的FMDB的例子开始: // 找到用户目录下的Documents文件夹位置 NSString* docsdir = [NSSearchPathFor