分布式缓存系统 Memcached 主线程之main函数

前两节中对工作线程的工作流程做了较为详细的分析,现把其主要流程总结为下图:

接下来本节主要分析主线程相关的函数设计,主函数main的基本流程如下图所示:

对于主线程中的工作线程的初始化到启动所有的工作线程前面已经做了分析,后面的创建监听socket、注册监听socket的libevent事件、启动主线程的libevent事件循环,就是接下来的内容了。

其中主要调用的函数是server_sockets,该函数从配置参数setting.inner字符串中依次提取出一个ip或者一个hostname(一个hostname可能有多个ip),然后传给函数server_socket函数处理之。server_socket函数负责完成创建socket,绑定到端口,监听socket,并将该监听socket对应的conn结构(调用函数conn_new,在该函数中将监听socket注册到主线程libevent main_base上,回调函数为event_handler,其核心部分是drive_machine,这与工作线程是一致的),然后将该conn放入监听队列( conn *listen_conn)中。监听socket上接收到的客户连接的conn将放到连接队列conn **conns中。 最后在main函数中启动libevent事件循环。 还是图来的直观,如下:

static int server_sockets(int port, enum network_transport transport,  
                          FILE *portnumber_file) {  //<span style="color: rgb(0, 130, 0); font-family: Consolas, ‘Courier New‘, Courier, mono, serif; font-size: 11.8518514633179px; line-height: 18px;">port是默认的11211或者用户使用-p选项设置的端口号</span>
  
    //settings.inter里面可能有多个IP地址.如果有多个那么将用逗号分隔  
    char *b;  
    int ret = 0;  
    //复制一个字符串,避免下面的strtok_r函数修改(污染)全局变量settings.inter  
    char *list = strdup(settings.inter);  
  
    //这个循环主要是处理多个IP的情况  
    for (char *p = strtok_r(list, ";,", &b);  
        p != NULL; //分割出一个个的ip,使用分号;作为分隔符  
        p = strtok_r(NULL, ";,", &b)) {  
        int the_port = port;  
        char *s = strchr(p, ‘:‘);//启动的可能使用-l ip:port 参数形式  
        //ip后面接着端口号,即指定ip的同时也指定了该ip的端口号  
        //此时采用ip后面的端口号,而不是采用-p指定的端口号  
        if (s != NULL) {  
            *s = ‘\0‘;//截断后面的端口号,使得p指向的字符串只是一个ip  
            ++s;  
            if (!safe_strtol(s, &the_port)) {//非法端口号参数值  
                return 1;  
            }  
        }  
        if (strcmp(p, "*") == 0) {  
            p = NULL;  
        }  
        //处理其中一个IP。有p指定ip(或者hostname)  
        ret |= server_socket(p, the_port, transport, portnumber_file);  
    }  
    free(list);  
    return ret;  
}  
  
  
static conn *listen_conn = NULL;//监听队列(可能要同时监听多个IP)

//interface是一个ip、hostname或者NULL。这个ip字符串后面没有端口号。端口号由参数port指出  
static int server_socket(const char *interface,  
                        int port,  
                        enum network_transport transport,  
                        FILE *portnumber_file) {  
    int sfd;  
    struct linger ling = {0, 0};  
    struct addrinfo *ai;  
    struct addrinfo *next;  
    struct addrinfo hints = { .ai_flags = AI_PASSIVE,  
                              .ai_family = AF_UNSPEC };  
    char port_buf[NI_MAXSERV];  
    int success = 0;  
    int flags =1;  
  
    hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;  
  
  
    snprintf(port_buf, sizeof(port_buf), "%d", port);  
    getaddrinfo(interface, port_buf, &hints, &ai);  
  
    //如果interface是一个hostname的话,那么可能就有多个ip  
    for (next= ai; next; next= next->ai_next) {  
        conn *listen_conn_add;  
  
        //创建一个套接字,然后设置为非阻塞的  
        sfd = new_socket(next);//调用socket函数  
        bind(sfd, next->ai_addr, next->ai_addrlen);  
  
        success++;  
        listen(sfd, settings.backlog);  
  
  //函数conn_new中将监听套接字fd注册到main_base上,并设定回调函数为event_handler,其中核心为drive_machine函数,这与工作线程是一致的
        if (!(listen_conn_add = conn_new(sfd, conn_listening,  
                                        EV_READ | EV_PERSIST, 1,  
                                        transport, main_base))) {  
            fprintf(stderr, "failed to create listening connection\n");  
            exit(EXIT_FAILURE);  
        }  
  
        //将要监听的多个conn放到一个监听队列里面  
        listen_conn_add->next = listen_conn;  
        listen_conn = listen_conn_add;  
  
    }  
  
    freeaddrinfo(ai);  
  
    /* Return zero iff we detected no errors in starting up connections */  
    return success == 0;  
}  
  
  
static int new_socket(struct addrinfo *ai) {  
    int sfd;  
    int flags;  
    sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);  
    flags = fcntl(sfd, F_GETFL, 0);  
    fcntl(sfd, F_SETFL, flags | O_NONBLOCK);  
  
    return sfd;  
}

主线程为每一个监听socket 和接收到的每一个客户端连接socket都分配一个conn结构体,用于管理该socket的各种状态信息等。需要注意的是的,memcached并不是对每一个socket分别创建分配一个conn结构,而是在初始化时一次性分配若干(跟审定的允许的同时最大数量的客户端连接数有关)个conn结构的指针(注意不是conn结构体,因为每一个conn结构是比较大的,因此如果直接分配若干个conn结构需要占用较大空间),在都确实需要一个conn结构时,再从预分配的指针数组中取用一个,并实际为该指针分配空间,完成具体的初始化等。 这与前面的CQ_ITEM内存池是一致的——按配置预分配若干,按需取用,循环利用。 避免内存碎片,提高性能。

其中函数conn_init负责预分配设置的若干个conn的指针,由一个conn**指针维护。函数conn_new则在确实需要一个conn时从conn**维护的数组中取得一个conn*,并完成实际的空间分配等。

具体分析如下:

函数conn_init:

conn **conns; //conn数组指针 
static void conn_init(void) {  
    /* We‘re unlikely to see an FD much higher than maxconns. */  
    //已经dup返回当前未使用的最小正整数,所以next_fd等于此刻已经消耗了的fd个数  
    int next_fd = dup(1);//获取当前已经使用的fd的个数  
    //预留一些文件描述符。也就是多申请一些conn结构体。以免有别的需要把文件描述符  
    //给占了。导致socket fd的值大于这个数组长度  
    int headroom = 10;//预留一些文件描述符  /* account for extra unexpected open FDs */  
    struct rlimit rl;  
  
    //settings.maxconns的默认值是1024.  
    max_fds = settings.maxconns + headroom + next_fd;  
  
    /* But if possible, get the actual highest FD we can possibly ever see. */  
    if (getrlimit(RLIMIT_NOFILE, &rl) == 0) {  
        max_fds = rl.rlim_max;  
    } else {  
        fprintf(stderr, "Failed to query maximum file descriptor; "  
                        "falling back to maxconns\n");  
    }  
  
    close(next_fd);//next_fd只是用来计数的,并没有其他用途  
  
    //注意,申请的conn结构体数量是比settings.maxconns这个客户端同时在线数  
    //还要大的。因为memcached是直接用socket fd的值作为数组下标的。也正是  
    //这个??因,前面需要使用headroom预留一些空间给突发情况  
    if ((conns = calloc(max_fds, sizeof(conn *))) == NULL) {//注意是conn指针不是conn结构体  
        fprintf(stderr, "Failed to allocate connection structures\n");  
        /* This is unrecoverable so bail out early. */  
        exit(1);  
    }  
}

函数conn_new:

//为sfd分配一个conn结构体,并且为这个sfd建立一个event,然后注册到event_base上。
conn *conn_new(const int sfd, enum conn_states init_state,//init_state值为conn_listening  
                const int event_flags,  
                const int read_buffer_size, enum network_transport transport,  
                struct event_base *base) {  
    conn *c;  
  
    assert(sfd >= 0 && sfd < max_fds);  
    c = conns[sfd];//直接使用下标  
  
    if (NULL == c) {//之前没有哪个连接用过这个sfd值,需要申请一个conn结构体  
        if (!(c = (conn *)calloc(1, sizeof(conn)))) {  
            fprintf(stderr, "Failed to allocate connection object\n");  
            return NULL;  
        }  
      
        ...//初始化一些成员变量  
  
        c->sfd = sfd;  
        conns[sfd] = c; //将这个结构体交由conns数组管理  
    }  
  
    ...//初始化另外一些成员变量  
    c->state = init_state;//值为conn_listening  
  
    //等同于event_assign,会自动关联current_base。event的回调函数是event_handler  
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);  
    event_base_set(base, &c->event);  
    c->ev_flags = event_flags;  
  
    if (event_add(&c->event, 0) == -1) {  
        perror("event_add");  
        return NULL;  
    }  
  
    return c;  
}

从上可以看到,实际上所有的conn从预分配到实际分配,都是有conn**指针维护的。只需要通过判断该数组中的某元素conn指针是否为空:等于NULL即没被实际占用,处于空闲状态,反之,已经被一个实际的socket fd占用。

至此,主线程所有的准备工作已经就绪,接下来就是真正的客户端连接事件的处理了:

回调函数event_handler(工作线程的注册事件的回调函数也是它):

event_handler本身是简单的,其核心是drive_machine函数(一个有限状态机,负责处理所有的客户逻辑)。

void event_handler(const int fd, const short which, void *arg) {  
    conn *c;  
  
    c = (conn *)arg;  
    assert(c != NULL);  
  
    c->which = which;  
    if (fd != c->sfd) {  
        conn_close(c);  
        return;  
    }  
  
    drive_machine(c);  
    return;  
}

其中的drive_machine还是比较复杂的,接下来将分几个小节的内容,对此细细道来。

时间: 2024-10-25 21:26:20

分布式缓存系统 Memcached 主线程之main函数的相关文章

分布式缓存系统 Memcached 整体架构

分布式缓存系统 Memcached整体架构 Memcached经验分享[架构方向] Memcached 及 Redis 架构分析和比较

分布式缓存系统 Memcached 工作线程初始化

Memcached采用典型的Master-Worker模式,其核心思想是:有Master和Worker两类进程(线程)协同工作,Master进程负责接收和分配任务,Worker进程负责处理子任务.当各Worker进程将各个子任务处理完成后,将结果返回给Master进程,由Master进程做归纳和汇总. 工作示意图如下所示: 其中每个工作线程维护一个连接队列,以接收由主线程分配的客户连接:同时每个工作线程维护一个Libevent实例,以处理和主线程间的管道通信以及和客户连接间的socket通信事件

分布式缓存系统Memcached简介与实践

缘起: 在数据驱动的web开发中,经常要重复从数据库中取出相同的数据,这种重复极大的增加了数据库负载.缓存是解决这个问题的好办法.但是ASP.NET中的虽然已经可以实现对页面局部进行缓存,但还是不够灵活.此时Memcached或许是你想要的. Memcached是什么?Memcached是由Danga Interactive开发的,高性能的,分布式的内存对象缓存系统,用于在动态应用中减少数据库负载,提升访问速度. Memcached能缓存什么?通过在内存里维护一个统一的巨大的hash表,Memc

分布式缓存系统Memcached在Asp.net下的应用

Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度.Memcached基于一个存储键/值对的hashmap.其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信. 站下的session性能并不高,所以造成人们一种印象,大型WEB项目使用Java的错觉,致使很多人吐槽微软不给力,其实这好比拉不出怪地球引力,本

分布式缓存系统Memcached

Memcached是高性能的,分布式的内存对象缓存系统,用于在动态应用中减少数据库负载,提高访问速度. 通过在内存中维护一个巨大的统一的hash表,Memcached能够用来存储各种格式的数据,包括图像.视频.文件以及数据库检索的结果等. Memcached使用了libevent(如果可以的话,在linux下使用epoll) 来均衡任何数量的打开链接,使用非阻塞的网络I/O,对内部对象实现引用计数(因此,针对多样的客户端,对象可以处在多样的状态), 使用自己的页块分配器和哈希表, 因此虚拟内存不

分布式缓存系统 Memcached 状态机之socket连接与派发

上节已经分析到了主线程中监听socket注册事件和工作线程中连接socket注册事件的回调函数都是event_handler,且event_handler的核心部分都是一个有限状态机:drive_machine.因此接下来将对该状态机具体的业务处理进行深入的剖析. memcached将每个socket都封装为一个conn结构体,该结构体包含了比如socket的文件描述符sfd.注册事件event.连接状态结构体conn_states,等等诸多信息字段,其中的状态结构:conn_states中包含

分布式缓存系统Memcached(九)——状态机之socket连接与派发

上节已经分析到了主线程中监听socket注册事件和工作线程中连接socket注册事件的回调函数都是event_handler,且event_handler的核心部分都是一个有限状态机:drive_machine.因此接下来将对该状态机具体的业务处理进行深入的剖析. memcached将每个socket都封装为一个conn结构体,该结构体包含了比如socket的文件描述符sfd.注册事件event.连接状态结构体conn_states,等等诸多信息字段,其中的状态结构:conn_states中包含

分布式缓存系统 Memcached 基本配置与命令

为了方便测试,给出一个C客户端libmemcached链接:https://launchpad.net/libmemcached/ 以及memcacheclient-2.0 : http://code.jellycan.com/files/memcacheclient-2.0.zip(已生成 sln,在windows下直接用VS打开,编译成功) 在Memcached启动时,有很多配置参数可以选择,以下参数对应memcached1.4.15,现给出这些参数的具体含义: "a:" //un

分布式缓存系统 Memcached slab和item的主要操作

上节在分析slab内存管理机制时分析Memcached整个Item存储系统的初始化过程slabs_init()函数:分配slabclass数组空间,到最后将各slab划分为各种级别大小的空闲item并挂载到对应大小slab的空闲链表slots上.本节将继续分析对slab和item的主要操作过程. slab机制中所采用的LRU算法: 在memcached运行过程中,要把一个item调入内存,但内存已无空闲空间时,为了保证程序能正常运行,系统必须从内存中调出一部分数据,送磁盘的对换区中.但应将哪些数