Memcached网络模型

之前用libevent开发了一个流媒体服务器。用线程池实现的。之后又看了memcached的网络相关实现,今天来整理一下memcached的实现流程。

memcached不同于Redis的单进程单线程,是采用多线程的工作方式。有一个主线程,同时维护了一个线程池(工作线程)。worker thread工作线程和main thread主线程之间主要通过pipe来进行通信。因为用了libevent,所以感觉比Redis稍微庞大点,没有在生产环境对比过Redis和memcached,所以也不好说什么性能比对。

主线程和工作线程都有一个event base,大体框架如下图:

整体框架图:

线程模型:

每个线程包括主线程都各自有独立的Libevent实例,Memcached的listen fd注册到主线程的Libevent实例上,由主线程来accept新的连接,接受新的连接后根据Round-robin算法选择工作线程,将新连接的socket fd封装为CQ_ITEM后push到所选工作线程的CQ队列中,然后主线程(notify_send_fd)通过管道发送字符“c”到工作线程(notify_receive_fd),而notify_receive_fd已经注册到工作线程的Libevent实例上了,这样工作线程就能收到通知“c”,然后从该工作线程的CQ队列中pop出CQ_ITEM进而取出新连接并将fd注册到工作线程的Libevent实例上,从而由工作线程来处理该连接的所有后续事件

代码层实现图:

源码解读:

1. 关键数据结构

/* An item in the connection queue. */主要用于存储用户socket连接的基本信息

主线程会将用户的socket连接信息封装成CQ_ITEM,并放入工作线程的处理队列中。工作线程得到主线程的pipe通知后,就会将队列中的ITEM取出来,创建libevent的socket读事件。
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
    int               sfd;//socket fd
    enum conn_states  init_state;//事件类型
    int               event_flags;//libevent的flag
    int               read_buffer_size;//读取buffer的大小
    enum network_transport     transport;
    CQ_ITEM          *next;//下一个CQ_ITEM
};

/**
* The structure representing a connection into memcached.
*/
typedef struct conn conn;
struct conn {
    int    sfd;
    struct event event;
    short  ev_flags;
    short  which;   /** which events were just triggered */

LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
};

/* A connection queue. */
typedef struct conn_queue CQ;
struct conn_queue {
    CQ_ITEM *head;//指向队列的第一个节点
    CQ_ITEM *tail;//指向队列的最后一个节点
    pthread_mutex_t lock;//一个队列就对应一个锁
};

typedef struct {
    pthread_t thread_id;        /* unique ID of this thread */线程ID
    struct event_base *base;    /* libevent handle this thread uses */线程所使用的event_base
    struct event notify_event;  /* listen event for notify pipe */用于监听管道读事件的event
    int notify_receive_fd;      /* receiving end of notify pipe */管道的读端fd
    int notify_send_fd;         /* sending end of notify pipe */管道的写端fd
    struct thread_stats stats;  /* Stats generated by this thread */
    struct conn_queue *new_conn_queue; /* queue of new connections to handle */
    cache_t *suffix_cache;      /* suffix cache */
} LIBEVENT_THREAD;

2. CQ队列示意图

CQ_ITEM的创建实现了一个内存池,具体可以参考函数static CQ_ITEM *cqi_new(void)和static void cqi_free(CQ_ITEM *item),解决频繁分配释放内存而产生的内存碎片问题。

3.文字代码流程

(1)在main函数中调用main_base = event_init()来初始化主线程Libevent实例。
(2)在main函数中调用thread_init来初始化工作线程,并将主线程Libevent实例作为参数传入。
(3)在thread_init函数中为指定数量的工作线程分配内存,为每个线程创建管道,并分别绑定到通知收和发的socket描述符上,调用函数setup_thread初始化线程信息,调用函数create_worker为每个线程注册回调函数。每个线程有一个CQ队列。关键代码:
        for (i = 0; i < nthreads; i++) {
                int fds[2];
                if (pipe(fds)) {
                        ...
                }
                threads[i].notify_receive_fd = fds[0];
                threads[i].notify_send_fd = fds[1];
               setup_thread(&threads[i]);
               ...
        }
        /* Create threads after we‘ve done all the libevent setup. */
        for (i = 0; i < nthreads; i++) {
            create_worker(worker_libevent, &threads[i]);
        }

(4)在setup_thread函数中,为工作线程初始化Libevent实例,为主线程通知读(notify_receive_fd)注册回调函数thread_libevent_process,初始化cq队列,关键代码如下:
        static void setup_thread(LIBEVENT_THREAD *me) {
                me->base = event_init();
                ...
                /* Listen for notifications from other threads */
                event_set(&me->notify_event, me->notify_receive_fd,
                          EV_READ | EV_PERSIST, thread_libevent_process, me);
                event_base_set(me->base, &me->notify_event);
                if (event_add(&me->notify_event, 0) == -1) {
                        ...
                }
                me->new_conn_queue = malloc(sizeof(struct conn_queue));
                ...
                cq_init(me->new_conn_queue);
                ...
        }

(5)在thread_libevent_process函数中,读取主线程发送的通知接收消息,将主线程accept来的fd注册到工作线程的Libevent实例中,主线程accept来的fd从conn_queue队列获取,关键代码如下:
        static void thread_libevent_process(int fd, short which, void *arg) {
                LIBEVENT_THREAD *me = arg;
                CQ_ITEM *item;
                char buf[1];
                if (read(fd, buf, 1) != 1)
                        ...
                switch (buf[0]) {
                case ‘c‘:
                item = cq_pop(me->new_conn_queue);
                if (NULL != item) {
                        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                                                        item->read_buffer_size, item->transport, me->base);
               ...
                }
        }
(6)在函数conn_new中,创建conn句柄,为句柄注册回调函数event_handler处理事件,将该句柄作为参数传入回调函数并设置到Libevent中,该函数的关键代码如下:
        conn *conn_new(const int sfd, enum conn_states init_state,
                        const int event_flags,
                        const int read_buffer_size, enum network_transport transport,
                        struct event_base *base) {
            conn *c = conn_from_freelist();
            if (NULL == c) {
                if (!(c = (conn *)calloc(1, sizeof(conn)))) {
                        ...
            }
           ...
           event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
           event_base_set(base, &c->event);
           c->ev_flags = event_flags;
           if (event_add(&c->event, 0) == -1) {
                  ...
           }
           ...
        }

(7)在create_worker函数中,创建工作线程并注册回调函数,在工作线程的回调函数work_libevent中,开始Libevent主循环。

(8)在main函数中,调用函数server_sockets,再调用函数server_socket,进而调用函数new_socket,在调用函数conn_new,创建并注册listen fd到主线程Libevent实例上,最后开始Libevent主循环即event_base_loop。在conn_new函数关键代码见步骤(6)
(9)在event_handler函数中,调用函数drive_machine,在该函数中处理所有事件,其关键代码如下:
        static void drive_machine(conn *c) {
                ...
                while (!stop) {
                    switch(c->state) {
                        case conn_listening:
                                addrlen = sizeof(addr);
                                if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
                                       ...                                     
                                }
                               ...
                               if (settings.maxconns_fast &&
                                    stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
                                        ...
                               } else {
                                    dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                             DATA_BUFFER_SIZE, tcp_transport);
                              }
                            stop = true;
                            break;
                            ...
                        }
                    }
                return;
        }
        在处理事件时,如果是listening事件,则调用函数dispatch_conn_new(Memcached.c/3785行)将accept fd分配给工作线程。

(10)在dispatch_conn_new函数中,根据round-robin算法将新连接push到所分配线程的CQ队列中,并通过管道发送通知消息“c,关键代码如下:
        void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                               int read_buffer_size, enum network_transport transport) {
                    CQ_ITEM *item = cqi_new();
                    char buf[1];
                    int tid = (last_thread + 1) % settings.num_threads;
                    LIBEVENT_THREAD *thread = threads + tid;
                    last_thread = tid;
                    ...
                    cq_push(thread->new_conn_queue, item);
                    ...
                    buf[0] = ‘c‘;
                    if (write(thread->notify_send_fd, buf, 1) != 1) {
                        perror("Writing to thread notify pipe");
                    }
        }

dispatch_conn_new函数只在主线程中调用,last_thread为静态变量,每次将该变量值+1,再模线程数来选择工作线程。

4. conn状态机

1.listening:这个状态是主线程的connection默认状态,它只有这一个状态,它做的工作就是把接到连接分发到worker子线程。

2.conn_new_cmd:每个新连接的初始状态,这个状态会清空读写buf。

3.conn_waiting:这个状态就是在event_base中设置读事件,然后状态机暂停,挂起当前connection(函数退出,回调函数的attachment会记录这个connection),等待有新的信息过来,然后通过回调函数的attachment重新找到这个connection,然后启动状态机。

4.conn_read:该状态从sfd中读取客户端的指令信息。例如有用户提交数据过来的时候,工作线程监听到事件后,最终会走到这里。

5.conn_parse_cmd:判断具体的指令,如果是update的指令,那么需要跳转到conn_nread中,因为需要在从网络中读取固定byte的数据,如果是查询之类的指令,就直接查询完成后,跳转到conn_mwrite中,返回数据

6.conn_nread:从网络中读取指定大小的数据,这个数据就是更新到item的数据,然后将数据更新到hash和lru中去,然后跳转到conn_write

7.conn_write:这个状态主要是调用out_string函数会跳转到这个状态,一般都是提示信息和返回的状态信息,然后输出这些数据,然后根据write_to_go的状态,继续跳转

8.conn_mwrite:这个写是把connection中的msglist返回到客户端,这个msglist存的是item的数据,用于那种get等获得item的操作的返回数据。

9.conn_swallow:对于那种update操作,如果分配item失败,显然后面的nread,是无效的,客户端是不知道的,这样客户端继续发送特定的数量的数据,就需要把读到的这些数据忽略掉,然后如果把后面指定的数据都忽略掉了(set的两部提交,数据部分忽略掉),那么connection跳转到conn_new_cmd,如果读nread的那些特定数量的数据没有读到,直接跳转到conn_closing。

10.conn_closing:服务器端主动关闭连接,调用close函数关闭文件描述符,同时把conn结构体放到空闲队列中,供新的连接重用这写conn结构体。

Memcached的数据结构那块源码还没有看过,下次也梳理下。看过Redis的,数据结构设计的还是十分精妙的。

参考资料:

http://www.lvtao.net/c/623.html?utm_source=tuicool

http://www.linuxidc.com/Linux/2012-01/52515p3.htm

http://blog.csdn.net/funkri/article/details/17022309

http://blog.chinaunix.net/uid-16723279-id-3568441.html

http://blog.csdn.net/luotuo44/article/details/42705475

http://blog.chinaunix.net/uid-27767798-id-3415510.html

感谢以上作者的无私分享。

时间: 2024-11-03 22:26:20

Memcached网络模型的相关文章

memcached源码分析-----半同步半异步网络模型

转载请注明出处:http://blog.csdn.net/luotuo44/article/details/42705475 半同步/半异步: memcached使用半同步/半异步网络模型处理客户端的连接和通信. 半同步/半异步模型的基础设施:主线程创建多个子线程(这些子线程也称为worker线程),每一个线程都维持自己的事件循环,即每个线程都有自己的epoll,并且都会调用epoll_wait函数进入事件监听状态.每一个worker线程(子线程)和主线程之间都用一条管道相互通信.每一个子线程都

Redis和Memcached的区别

?说到redis就会联想到memcached,反之亦然.了解过两者的同学有那么个大致的印象:redis与memcached相比,比仅支持简单的key-value数据类型,同时还提供list,set,zset,hash等数据结构的存储:redis支持数据的备份,即master-slave模式的数据备份:redis支持数据的持久化,可以将内存中的数据保持在磁盘中,重启的时候可以再次加载进行使用等等,这似乎看起来redis比memcached更加牛逼一些,那么事实上是不是这样的呢?存在即合理,我们来根

memcached与redis的区别

1.redis中的缓存数据并不是都在内存中,redis在maxmemory或vm开启并且vm-max-memory到达上限时出发置换操作用swap机制将部分value对象(冷数据)转移至磁盘,同时将redisobj替换成VM pointer对象,标识value值在磁盘的存储位置,分有阻塞跟非阻塞机制,所以redis可以存储远超容量本身的数据 2.网络io模型:Memcached是多线程,非阻塞IO复用的网络模型,分为监听主线程和worker子线程,监听线程监听网络连接,接受请求后,将连接描述字p

Redis与Memcached的区别

       传统MySQL+ Memcached架构遇到的问题 实际MySQL是适合进行海量数据存储的,通过Memcached将热点数据加载到cache,加速访问,很多公司都曾经使用过这样的架构,但随着业务数据量的不断增加,和访问量的持续增长,我们遇到了很多问题: 1.MySQL需要不断进行拆库拆表,Memcached也需不断跟着扩容,扩容和维护工作占据大量开发时间. 2.Memcached与MySQL数据库数据一致性问题. 3.Memcached数据命中率低或down机,大量访问直接穿透到D

Linux c 开发 - Memcached源码分析之命令解析(2)

前言 从我们上一章<Linux c 开发 - Memcached源码分析之基于Libevent的网络模型>我们基本了解了Memcached的网络模型.这一章节,我们需要详细解读Memcached的命令解析. 我们回顾上一章发现Memcached会分成主线程和N个工作线程.主线程主要用于监听accpet客户端的Socket连接,而工作线程主要用于接管具体的客户端连接. 主线程和工作线程之间主要通过基于Libevent的pipe的读写事件来监听,当有连接练上来的时候,主线程会将连接交个某一个工作线

传统MySQL+ Memcached架构遇到的问题

实际MySQL是适合进行海量数据存储的,通过Memcached将热点数据加载到cache,加速访问,很多公司都曾经使用过这样的架构,但随着业务数据量的不断增加,和访问量的持续增长,我们遇到了很多问题: MySQL需要不断进行拆库拆表,Memcached也需不断跟着扩容,扩容和维护工作占据大量开发时间. Memcached与MySQL数据库数据一致性问题. Memcached数据命中率低或down机,大量访问直接穿透到DB,MySQL无法支撑. 跨机房cache同步问题. 众多NoSQL百花齐放,

Memcached和Redis比较

一.存储 Memcached基本只支持简单的key-value存储方式.Redis除key-value之外,还支持list,set,sorted set,hash等数据结构:Redis支持数据的备份,即master-slave模式的数据备份:Redis支持数据的持久化(快照.AOF),可以将内存中的数据保持在磁盘中,重启的时候可以再次加载进行使用:Redis可以实现主从复制,实现故障恢复:Redis的Sharding技术,很容易将数据分布到多个Redis实例中. 二.数据一致性 Memcache

Memcached源代码分析 - Memcached源代码分析之消息回应(3)

文章列表: <Memcached源代码分析 - Memcached源代码分析之基于Libevent的网络模型(1)> <Memcached源代码分析 - Memcached源代码分析之命令解析(2)> <Memcached源代码分析 - Memcached源代码分析之消息回应(3)  > <Memcached源代码分析 - Memcached源代码分析之HashTable(4) > <Memcached源代码分析 - Memcached源代码分析之增删

memcached源代码分析-----set命令处理流程

转载请注明出处:http://blog.csdn.net/luotuo44/article/details/44236591 前一篇博文以get命令为样例把整个处理流程简单讲述了一遍.本篇博文将以set命令具体讲述memcached的处理流程. 具体的命令为"set tt 3 0 10".并如果当然memcachedserver没有名为tt的item. 读取命令: 在前一篇博文的最后,conn的状态被设置为conn_new_cmd,回到了一開始的状态. 假设此时conn结构体里面的bu