memcached源代码分析-----set命令处理流程

转载请注明出处:http://blog.csdn.net/luotuo44/article/details/44236591

前一篇博文以get命令为样例把整个处理流程简单讲述了一遍。本篇博文将以set命令具体讲述memcached的处理流程。

具体的命令为“set tt 3 0 10”。并如果当然memcachedserver没有名为tt的item。

读取命令:

在前一篇博文的最后,conn的状态被设置为conn_new_cmd,回到了一開始的状态。

假设此时conn结构体里面的buff还有其它命令,或者该client的socket缓冲区里面还有数据(命令),那么就会继续处理命令而不会退出drive_machine函数。处理完后。又会回到conn_new_cmd状态。

半同步半异步网络模型》指明了memcached是通过worker线程运行client的命令,而且一个worker线程要处理多个client的命令。假设某一个恶意的client发送了大量的get命令,那么worker线程将不断地反复前一篇博文讲述的处理流程。换言之,worker线程将困死在drive_machine里面不能出来。这造成的后果是导致该worker线程负责的其它client处于饥饿状态,由于它们的命令得不到处理(要退出drive_machine才干知道其它client也发送了命令,进而进行处理)。

为了避免client发现饥饿现象,memcached的解决方法是:worker线程连续处理某一个client的命令数不能超过一个特定值。

这个特定值由全局变量settings.reqs_per_event确定(默认值是20), 能够在启动memcached的时候通过命令行參数设置。具体參考《memcached启动參数具体解释以及关键配置的默认值》。

static void drive_machine(conn *c) {
    bool stop = false;
    int nreqs = settings.reqs_per_event;//20

    assert(c != NULL);

	//drive_machine被调用会进行状态推断,并进行一些处理。但也可能发生状态的转换
	//此时就须要一个循环,当进行状态转换时。也能处理
    while (!stop) {

        switch(c->state) {

        case conn_new_cmd:

            --nreqs;
            if (nreqs >= 0) {
				//假设该conn的读缓冲区没有数据,那么将状态改成conn_waiting
				//假设该conn的读缓冲区有数据,  那么将状态改成conn_pase_cmd
                reset_cmd_handler(c);
            } else {

                if (c->rbytes > 0) {
                    /* We have already read in data into the input buffer,
                       so libevent will most likely not signal read events
                       on the socket (unless more data is available. As a
                       hack we should just put in a request to write data,
                       because that should be possible ;-)
                    */
                    if (!update_event(c, EV_WRITE | EV_PERSIST)) {
                        if (settings.verbose > 0)
                            fprintf(stderr, "Couldn't update event\n");
                        conn_set_state(c, conn_closing);
                        break;
                    }
                }
                stop = true;
            }
            break;

        }
    }

    return;
}

从上面代码能够得知。假设某个client的命令数过多,会被memcached强制退出drive_mahcine。假设该client的socket里面还有数据而且是libevent是水平触发的,那么libevent会自己主动触发事件,能再次进入drive_mahcine函数。

但假设该client的命令都读进conn结构体的读缓冲区,那么就必须等到client再次发送命令,libevent才会触发。

但client一直不再发送命令了呢?为了解决问题,memcached採用了一种非常巧妙的处理方法:为这个clientsocket设置可写事件。除非clientsocket的写缓冲区已满,否则libevent都会为这个client触发事件。事件一触发。那么worker线程就会进入drive_machine函数处理这个client的命令。

当然我们如果nreqs大于0,然后看一下reset_cmd_handler函数。该函数会推断conn的读缓冲区是否还有数据。此外,该函数另一个关键的数据:调节conn缓冲区的大小。前一篇博文已经说到,memcached会尽可能把clientsocket里面的数据读入conn的读缓冲区。这样的特性会撑大conn的读缓冲区。除了读缓冲区,用于回写数据的iovec和msghdr数组也会被撑大,这也要收缩。由于是在处理完一条命令后才进行的收缩,所以收缩不会导致数据的丢失。

写缓冲区呢?不须要收缩写缓冲区吗。conn结构体也是有写缓冲区的啊?这是由于写缓冲区不会被撑大。从前一篇博文的回应命令能够知道,回应命令时并没有使用到写缓冲区。写缓冲区是在向client返回错误信息时才会用到的,而错误信息不会太大。也就不会撑大写缓冲区了。

struct conn {
    int    sfd;//该conn相应的socket fd
    sasl_conn_t *sasl_conn;
    bool authenticated;
    enum conn_states  state;//当前状态
    enum bin_substates substate;
    rel_time_t last_cmd_time;
    struct event event;//该conn相应的event
    short  ev_flags;//event当前监听的事件类型
    short  which;   /** which events were just triggered */ //触发event回调函数的原因

	//读缓冲区
    char   *rbuf;   /** buffer to read commands into */
	//有效数据的開始位置。从rbuf到rcurr之间的数据是已经处理的了。变成无效数据了
    char   *rcurr;  /** but if we parsed some already, this is where we stopped */
	//读缓冲区的长度
    int    rsize;   /** total allocated size of rbuf */
	//有效数据的长度
    int    rbytes;  /** how much data, starting from rcur, do we have unparsed */

    char   *wbuf;
    char   *wcurr;
    int    wsize;
    int    wbytes;
    /** which state to go into after finishing current write */
    enum conn_states  write_and_go;
    void   *write_and_free; /** free this memory after finishing writing */

	//数据直通车
	char   *ritem;  /** when we read in an item's value, it goes here */
    int    rlbytes;

    /* data for the nread state */

    /**
     * item is used to hold an item structure created after reading the command
     * line of set/add/replace commands, but before we finished reading the actual
     * data. The data is read into ITEM_data(item) to avoid extra copying.
     */

    void   *item;     /* for commands set/add/replace  */

    /* data for the swallow state */
    int    sbytes;    /* how many bytes to swallow */

    /* data for the mwrite state */
	//ensure_iov_space函数会扩大数组长度.以下的msglist数组所使用到的
	//iovec结构体数组就是iov指针所指向的。所以当调用ensure_iov_space
	//分配新的iovec数组后,须要又一次调整msglist数组元素的值。

这个调整
	//也是在ensure_iov_space函数里面完毕的
    struct iovec *iov;//iovec数组指针
	//数组大小
    int    iovsize;   /* number of elements allocated in iov[] */
	//已经使用的数组元素个数
    int    iovused;   /* number of elements used in iov[] */

	//由于msghdr结构体里面的iovec结构体数组长度是有限制的。所以为了能
	//传输很多其它的数据,仅仅能添加msghdr结构体的个数.add_msghdr函数负责添加
    struct msghdr *msglist;//msghdr数组指针
	//数组大小
    int    msgsize;   /* number of elements allocated in msglist[] */
	//已经使用了的msghdr元素个数
    int    msgused;   /* number of elements used in msglist[] */
	//正在用sendmsg函数传输msghdr数组中的哪一个元素
    int    msgcurr;   /* element in msglist[] being transmitted now */
    //msgcurr指向的msghdr总共同拥有多少个字节
	int    msgbytes;  /* number of bytes in current msg */

	//worker线程须要占有这个item。直至把item的数据都写回给client了
	//故须要一个item指针数组记录本conn占有的item
    item   **ilist;   /* list of items to write out */
    int    isize;//数组的大小
    item   **icurr;//当前使用到的item(在释放占用item时会用到)
    int    ileft;//ilist数组中有多少个item须要释放

    enum protocol protocol;   /* which protocol this connection speaks */
    enum network_transport transport; /* what transport is used by this connection */

    bool   noreply;   /* True if the reply should not be sent. */
    /* current stats command */

	...

    conn   *next;     /* Used for generating a list of conn structures */

    LIBEVENT_THREAD *thread;//这个conn属于哪个worker线程
};

static void reset_cmd_handler(conn *c) {
    c->cmd = -1;
    c->substate = bin_no_state;
    if(c->item != NULL) {//conn_new_cmd状态下,item为NULL
        item_remove(c->item);
        c->item = NULL;
    }
    conn_shrink(c);
    if (c->rbytes > 0) {//读缓冲区里面有数据
        conn_set_state(c, conn_parse_cmd);//接着去解析读到的数据
    } else {
        conn_set_state(c, conn_waiting);//否则等待数据的到来
    }
}

#define DATA_BUFFER_SIZE 2048

/** Initial size of list of items being returned by "get". */
#define ITEM_LIST_INITIAL 200

/** Initial size of list of CAS suffixes appended to "gets" lines. */
#define SUFFIX_LIST_INITIAL 20

/** Initial size of the sendmsg() scatter/gather array. */
#define IOV_LIST_INITIAL 400

/** Initial number of sendmsg() argument structures to allocate. */
#define MSG_LIST_INITIAL 10

/** High water marks for buffer shrinking */
#define READ_BUFFER_HIGHWAT 8192
#define ITEM_LIST_HIGHWAT 400
#define IOV_LIST_HIGHWAT 600
#define MSG_LIST_HIGHWAT 100

 //收缩到初始大小
static void conn_shrink(conn *c) {
    assert(c != NULL);

    if (IS_UDP(c->transport))
        return;

	//c->rbytes指明了当前读缓冲区有效数据的长度。当其小于DATA_BUFFER_SIZE
	//才进行读缓冲区收缩。所以不会导致client命令数据的丢失。

if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
        char *newbuf;

        if (c->rcurr != c->rbuf)
            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);

        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);

        if (newbuf) {
            c->rbuf = newbuf;
            c->rsize = DATA_BUFFER_SIZE;
        }
        /* TODO check other branch... */
        c->rcurr = c->rbuf;
    }

    if (c->isize > ITEM_LIST_HIGHWAT) {
        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
        if (newbuf) {
            c->ilist = newbuf;
            c->isize = ITEM_LIST_INITIAL;
        }
    /* TODO check error condition? */
    }

    if (c->msgsize > MSG_LIST_HIGHWAT) {
        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
        if (newbuf) {
            c->msglist = newbuf;
            c->msgsize = MSG_LIST_INITIAL;
        }
    /* TODO check error condition? */
    }

    if (c->iovsize > IOV_LIST_HIGHWAT) {
        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
        if (newbuf) {
            c->iov = newbuf;
            c->iovsize = IOV_LIST_INITIAL;
        }
    /* TODO check return value */
    }
}

读取数据:

我们如果conn的读缓冲区里面没有数据,此时conn的状态被设置为conn_waiting,等待client发送命令数据。

如果client发送数据过来。libevent将检測到clientsocket变成可读,然后进入在libevent的回调函数中调用drive_machine函数,进入有限状态机。在有限状态机里面。conn的状态会被设置为conn_read。

接着在conn_read case中,memcached会把client发送的命令数据尽可能地读入到conn的读缓冲区中。

当然为了防止没有恶意的client,memcached也是有限度的:仅仅撑大读缓冲区4次。这对于正常的client命令来说已经是足够的了。

static void drive_machine(conn *c) {
    bool stop = false;
    int res;
    assert(c != NULL);

	//drive_machine被调用会进行状态推断,并进行一些处理。但也可能发生状态的转换
	//此时就须要一个循环,当进行状态转换时,也能处理
    while (!stop) {

        switch(c->state) {

        case conn_waiting://等待socket变成可读的
            if (!update_event(c, EV_READ | EV_PERSIST)) {//更新监听事件失败
                if (settings.verbose > 0)
                    fprintf(stderr, "Couldn't update event\n");
                conn_set_state(c, conn_closing);
                break;
            }

            conn_set_state(c, conn_read);
            stop = true;//竟然stop循环。只是没关系,由于event的可读事件是水平触发的
            break;

        case conn_read:
            res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);

            switch (res) {
            case READ_NO_DATA_RECEIVED://没有读取到数据
                conn_set_state(c, conn_waiting);//等待
                break;
            case READ_DATA_RECEIVED://读取到了数据,接着就去解析数据
                conn_set_state(c, conn_parse_cmd);
                break;
            case READ_ERROR://read函数的返回值等于0或者-1时。会返回这个值
                conn_set_state(c, conn_closing);//直接关闭这个client
                break;
            case READ_MEMORY_ERROR: /* Failed to allocate more memory */
                /* State already set by try_read_network */
                break;
            }
            break;

        case conn_parse_cmd :
			//返回1表示正在处理读取的一条命令
			//返回0表示须要继续读取socket的数据才干解析命令
			//假设读取到了一条完整的命令。那么函数内部会去解析,
			//并进行调用process_command函数进行一些处理.
			//像set、add、replace这些命令。会在处理的时候调用
			//conn_set_state(c, conn_nread)
            if (try_read_command(c) == 0) {
                /* wee need more data! */
                conn_set_state(c, conn_waiting);
            }

            break;

        }
    }

    return;
}

 //尽可能把socket的全部数据都读进c指向的一个缓冲区里面
static enum try_read_result try_read_network(conn *c) {
    enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
    int res;
    int num_allocs = 0;
    assert(c != NULL);

    if (c->rcurr != c->rbuf) {
		//rcurr 和 rbuf之间是一条已经解析了的命令。如今能够丢弃了
        if (c->rbytes != 0) /* otherwise there's nothing to copy */
            memmove(c->rbuf, c->rcurr, c->rbytes);
        c->rcurr = c->rbuf;
    }

    while (1) {
		//由于本函数会尽可能把socket数据都读取到rbuf指向的缓冲区里面,
		//所以可能出现当前缓冲区不够大的情况(即rbytes>=rsize)
        if (c->rbytes >= c->rsize) {
			//可能有坏蛋发无穷无尽的数据过来,而本函数又是尽可能把全部数据都
			//读进缓冲区。为了防止坏蛋耗光server的内存,所以就仅仅分配4次内存
            if (num_allocs == 4) {
                return gotdata;
            }
            ++num_allocs;
            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
            if (!new_rbuf) {
				//尽管分配内存失败,但realloc保证c->rbuf还是合法可用的指针
                c->rbytes = 0; /* ignore what we read */

                out_of_memory(c, "SERVER_ERROR out of memory reading request");
                c->write_and_go = conn_closing;//关闭这个conn
                return READ_MEMORY_ERROR;
            }
            c->rcurr = c->rbuf = new_rbuf;
            c->rsize *= 2;
        }

        int avail = c->rsize - c->rbytes;
        res = read(c->sfd, c->rbuf + c->rbytes, avail);
        if (res > 0) {
            pthread_mutex_lock(&c->thread->stats.mutex);
            c->thread->stats.bytes_read += res;//记录该线程读取了多少字节
            pthread_mutex_unlock(&c->thread->stats.mutex);
            gotdata = READ_DATA_RECEIVED;
            c->rbytes += res;
            if (res == avail) {//可能还有数据没有读出来
                continue;
            } else {
                break;//socket临时还没数据了(即已经读取完)
            }
        }
        if (res == 0) {
            return READ_ERROR;
        }
        if (res == -1) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                break;
            }
            return READ_ERROR;
        }
    }
    return gotdata;
}

假设conn没有读取到clientsocket的数据,那么conn的状态又会设置为conn_waiting(等待数据状态)。假设读取到数据后,就会把状态设置为conn_parse_cmd,接着就会去解析该数据。因为网络原因,可能这一次并没有接收到完整的一条命令。在解析命令的时候会发现这样的情况。此时将conn的状态设置为conn_waiting。再次等待socket数据。

解析命令:

通信协议:

在解说memcached怎么解析命令前,先说一下memcached的通信协议。平时使用的都是”sett 3 0 10”这种命令形式,还真不知道有什么通信协议。事实上memcached同一时候支持文本协议和二进制这两种协议,memcached同意client使用二进制和文本两种通信协议中的一种。平时我们使用的是文本协议。之所以我们不须要显式地选择某一种协议,是由于client选择哪种协议,由client第一次发送的命令确定(一旦确定就不能更改)。Memcached推断client选定哪种协议的方法也非常easy:推断命令的第一个字符。假设第一个字符等于128,那么就是二进制协议,否则就是文本协议。

这样行得通,是由于文本协议中不论什么字符(ascii码)都不会取128这个值。本文仅仅解说文本协议。

推断命令的完整性:

在详细解析client命令的内容之前,还须要做一个工作:推断是否接收到完整的一条命令。Memcached推断的方法也简单:假设接收的数据中包括换行符就说明接收到完整的一条命令,否则就不完整,须要又一次读取clientsocket(把conn状态设置为conn_waiting)。

因为不同的平台对于行尾有不同的处理,有的为”\r\n”,有的为”\n”。memcached必须处理这样的情况。Memcached的解决方式是:不管它!

直接把命令最后一个字符的后一个字符(the character past the end of the command)改为’\0’,这样命令数据就变成一个C语言的字符串了。更巧妙的是。memcached还用一个暂时变量指向’\n’字符的下一个字符。

这样,不管行尾是”\r\n”还是”\n”都不重要了。

static int try_read_command(conn *c) {
    assert(c != NULL);
    assert(c->rcurr <= (c->rbuf + c->rsize));
    assert(c->rbytes > 0);

	//memcached支持文本和二进制两种协议。

对于TCP这种有连接协议,memcached为该
	//fd分配conn的时候,并不指明其是用哪种协议的。此时用negotiating_prot代表待
	//协商的意思(negotiate是谈判、协商)。而是在client第一次发送数据给
	//memcached的时候用第一个字节来指明.之后的通信都是使用指明的这种协议。
	//对于UDP这种无连接协议,指明每次都指明使用哪种协议了
    if (c->protocol == negotiating_prot || c->transport == udp_transport)  {
		//对于TCP仅仅会进入该推断体里面一次,而UDP就要次次都进入了

		//PROTOCOL_BINARY_REQ为0x80,即128。对于ascii的文本来说。是不会取这个值的
        if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) {
            c->protocol = binary_prot;
        } else {
            c->protocol = ascii_prot;
        }

    }

    if (c->protocol == binary_prot) {
    ...//二进制协议。这里不展开解说

    } else {//文本协议
        char *el, *cont;

        if (c->rbytes == 0)//读缓冲区里面没有数据,被耍啦
            return 0;//返回0表示须要继续读取socket的数据才干解析命令

        el = memchr(c->rcurr, '\n', c->rbytes);
        if (!el) {//没有找到\n,说明没有读取到一条完整的命令
            if (c->rbytes > 1024) {//接收了1024个字符都没有回车符,值得怀疑
                /*
                 * We didn't have a '\n' in the first k. This _has_ to be a
                 * large multiget, if not we should just nuke the connection.
                 */
                char *ptr = c->rcurr;
                while (*ptr == ' ') { /* ignore leading whitespaces */
                    ++ptr;
                }

                if (ptr - c->rcurr > 100 || //太多的空格符
                    (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {//是get或者gets命令,但一次获取太多信息了

                    conn_set_state(c, conn_closing);//必须干掉这种扯蛋的connclient
                    return 1;
                }
            }

            return 0;//返回0表示须要继续读取socket的数据才干解析命令
        }

		//来到这里,说明已经读取到至少一条完整的命令

		cont = el + 1;//用cont指向下一行的開始。不管行尾是\n还是\r\n

		//不同的平台对于行尾有不同的处理,有的为\r\n有的则是\n。

所以memcached
		//还要推断一下\n前面的一个字符是否为\r
        if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
            el--;//指向行尾的開始字符
        }

		//'\0',C语言字符串结尾符号。结合c->rcurr这个開始位置,就能够确定
		//这个命令(如今被看作一个字符串)的開始和结束位置。rcurr指向了一个字符串。

//注意,下一条命令的開始位置由前面的cont指明了
        *el = '\0';

        assert(cont <= (c->rcurr + c->rbytes));

        c->last_cmd_time = current_time;
		//处理这个命令
        process_command(c, c->rcurr);//命令字符串由c->rcurr指向

		//cont指明下一条命令的開始位置
		//更新curr指针和剩余字节数
        c->rbytes -= (cont - c->rcurr);
        c->rcurr = cont;

        assert(c->rcurr <= (c->rbuf + c->rsize));
    }

    return 1;//返回1表示正在处理读取的一条命令
}

符号化命令内容:

为了能运行命令。必须能识别出client发送的详细是什么命令以及有什么參数。

为了做到这一步,就得先命令字符串(try_read_command函数中已经把命令数据当作一个C语言的字符串了)里面的每个词切割出来。

比方将字符串"set tt 3 0 10"切割为”set”、”tt”、”3”、”0”和”10”这个5个词,在memcached里面用一个专门的名称token表示这些词。

Memcached在判别详细的命令前,要做的一步就是将命令内容进行符号化。

在process_command函数中,memcached会调用tokenize_command函数把命令字符串符号化。

process_command函数还定义了一个局部数组tokens用于指明命令字符串里面每个token。以下是tokenize_command函数的详细实现。

#define MAX_TOKENS 8

typedef struct token_s {
    char *value;
    size_t length;
} token_t;

static void process_command(conn *c, char *command) {

    token_t tokens[MAX_TOKENS];
	size_t ntokens;

	ntokens = tokenize_command(command, tokens, MAX_TOKENS);
	...
}

//将一条命令切割成一个个的token,并用tokens数组一一相应的指向
//比方命令"set tt 3 0 10",将被切割成"set"、"tt"、"3"、"0"、"10"
//并用tokens数组的5个元素相应指向。token_t类型的value成员指向相应token
//在command里面的位置。length则指明该token的长度
//返回token的数目,最后一个token是无意义的
static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
    char *s, *e;
    size_t ntokens = 0;
    size_t len = strlen(command);
    unsigned int i = 0;

    assert(command != NULL && tokens != NULL && max_tokens > 1);

    s = e = command;
    for (i = 0; i < len; i++) {
        if (*e == ' ') {//假设有连续多个空格符。那么须要跳过
            if (s != e) {//s此时指向非空格符,而且是某个token的第一个字符
                tokens[ntokens].value = s;//指向token的開始位置
                tokens[ntokens].length = e - s;//这个token的长度
                ntokens++;
                *e = '\0';//赋值为'\0',这样这个token就是s開始的一个字符串
                if (ntokens == max_tokens - 1) {
					//这条命令至少有max_tokens-2个token
                    e++;
                    s = e; /* so we don't add an extra token */
                    break;
                }
            }
            s = e + 1;//最后s会指向第一个非空格符
        }
        e++;
    }

	//当这条命令是以空格符结尾的。那么上面那个循环结束后,s等于e。

//否则s 不等于 e。此时s指向最后一个token的開始位置,e则指向token
	//最后一个字符的下一个字符(the first element past the end)
    if (s != e) {//处理最后一个token
        tokens[ntokens].value = s;
        tokens[ntokens].length = e - s;
        ntokens++;
    }

    /*
     * If we scanned the whole string, the terminal value pointer is null,
     * otherwise it is the first unprocessed character.
     */
    //最多仅仅处理max_tokens-1(等于7)个token。剩下的不处理
    tokens[ntokens].value =  *e == '\0' ?

NULL : e;
    tokens[ntokens].length = 0;
    ntokens++;

    return ntokens;
}

经过命令符号化后,使用起来就会非常easy的了。

比方依据tokens[0]的内容能够推断这个命令是什么命令,假设是set命令(tokens[0]的内容等于”get”),自然tokens[1]就是键值了。接下来的tokens[2]、tokens[3]、tokens[4]就是键值的三个參数了。

运行命令:

依据token推断命令和提取參数:

把命令符号化后,非常easy就能提取出命令和相应的參数。

typedef struct token_s {
    char *value;
    size_t length;
} token_t;

#define COMMAND_TOKEN 0
#define KEY_TOKEN 1

#define MAX_TOKENS 8

static void process_command(conn *c, char *command) {

    token_t tokens[MAX_TOKENS];
    size_t ntokens;
    int comm;

    assert(c != NULL);

    ntokens = tokenize_command(command, tokens, MAX_TOKENS);//将命令记号化
    if (ntokens >= 3 &&
        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {

		...//get命令

    } else if ((ntokens == 6 || ntokens == 7) &&
               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
                (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {

		//set命令
        process_update_command(c, tokens, ntokens, comm, false);

    }
	...
}

#define KEY_MAX_LENGTH 250

static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
    char *key;
    size_t nkey;
    unsigned int flags;
    int32_t exptime_int = 0;
    time_t exptime;
    int vlen;

    assert(c != NULL);

	//server不须要回复信息给client,这能够降低网络IO进而提快速度
	//这样的设置是一次性的。不影响下一条命令
    set_noreply_maybe(c, tokens, ntokens);//处理用户命令里面的noreply

	//键值的长度太长了。KEY_MAX_LENGTH为250
    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
        out_string(c, "CLIENT_ERROR bad command line format");
        return;
    }

    key = tokens[KEY_TOKEN].value;
    nkey = tokens[KEY_TOKEN].length;

	//将字符串转成unsigned long,获取flags、exptime_int、vlen。
	//它们的字符串形式必须是纯数字,否则转换失败,返回false
    if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
           && safe_strtol(tokens[3].value, &exptime_int)
           && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
        out_string(c, "CLIENT_ERROR bad command line format");
        return;
    }

    /* Ubuntu 8.04 breaks when I pass exptime to safe_strtol */
    exptime = exptime_int;

	...
}

static inline bool set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
{
    int noreply_index = ntokens - 2;

    /*
      NOTE: this function is not the first place where we are going to
      send the reply.  We could send it instead from process_command()
      if the request line has wrong number of tokens.  However parsing
      malformed line for "noreply" option is not reliable anyway, so
      it can't be helped.
    */
    if (tokens[noreply_index].value
        && strcmp(tokens[noreply_index].value, "noreply") == 0) {
        c->noreply = true;
    }
    return c->noreply;
}

分配item:

好了,如今已经知道是set命令,而且键值和相应的參数都已经提取出来了。接下来能够真正处理set命令了。

set命令是:键值已存在则更新,不存在则加入。但在这里无论那么多,直接调用item_alloc申请一个item。事实上process_update_command函数处理的命令不不过set,还包含replace、add、append等等。这些命令也是直接申请一个新的item。

item_alloc函数会直接调用do_item_alloc函数申请一个item。前面的非常多博文一直在部分介绍do_item_alloc函数,但都没有给出过完整版。如今就给出神奇函数的所有代码。对于这个函数一些讨论參数前面的一些博文吧。

static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
    char *key;//键值
    size_t nkey;//键值长度
    unsigned int flags;//item的flags
    time_t exptime;//item的超时
    int vlen;//item数据域的长度
    uint64_t req_cas_id=0;
    item *it;

    /* Negative exptimes can underflow and end up immortal. realtime() will
       immediately expire values that are greater than REALTIME_MAXDELTA, but less
       than process_started, so lets aim for that. */
    if (exptime < 0)//此时会马上过期失效
        exptime = REALTIME_MAXDELTA + 1;//REALTIME_MAXDELTA等于30天

	//在存储item数据的时候,都会自己主动在数据的最后加上"\r\n"
    vlen += 2;//+2是由于data后面还要加上"\r\n"这两个字符
    if (vlen < 0 || vlen - 2 < 0) {
        out_string(c, "CLIENT_ERROR bad command line format");
        return;
    }

	//依据所需的大小分配相应的item,并给这个item赋值。
	//除了time和refcount成员外,其它的都赋值了。并把键值、flag这些值都拷贝
	//到item后面的buff里面了,至于data。由于如今都还没拿到所以还没赋值
	//realtime(exptime)是直接赋值给item的exptime成员
    it = item_alloc(key, nkey, flags, realtime(exptime), vlen);

    if (it == 0) {
        if (! item_size_ok(nkey, flags, vlen))
            out_string(c, "SERVER_ERROR object too large for cache");
        else
            out_of_memory(c, "SERVER_ERROR out of memory storing object");
        /* swallow the data line */
        c->write_and_go = conn_swallow;
        c->sbytes = vlen;

        /* Avoid stale data persisting in cache because we failed alloc.
         * Unacceptable for SET. Anywhere else too? */
        if (comm == NREAD_SET) {
            it = item_get(key, nkey);
            if (it) {
                item_unlink(it);
                item_remove(it);
            }
        }

        return;
    }
    ITEM_set_cas(it, req_cas_id);

	//本函数并不会把item插入到哈希表和LRU队列。这个插入工作由
	//complete_nread_ascii函数完毕。
    c->item = it;
    c->ritem = ITEM_data(it); //数据直通车
    c->rlbytes = it->nbytes;//等于vlen(要比用户输入的长度大2,由于要加上\r\n)
    c->cmd = comm;
    conn_set_state(c, conn_nread);
}

item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
    item *it;
    /* do_item_alloc handles its own locks */
    it = do_item_alloc(key, nkey, flags, exptime, nbytes, 0);
    return it;
}

/*@[email protected]*/
//key、flags、exptime三个參数是用户在使用set、add命令存储一条数据时输入的參数。
//nkey是key字符串的长度。nbytes则是用户要存储的data长度+2,由于在data的结尾处还要加上"\r\n"
//cur_hv则是依据键值key计算得到的哈希值。

item *do_item_alloc(char *key, const size_t nkey, const int flags,
                    const rel_time_t exptime, const int nbytes,
                    const uint32_t cur_hv) {
    uint8_t nsuffix;
    item *it = NULL;
    char suffix[40];
	//要存储这个item须要的总空间
    size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);
    if (settings.use_cas) {
        ntotal += sizeof(uint64_t);
    }

	//依据大小推断从属于哪个slab
    unsigned int id = slabs_clsid(ntotal);
    if (id == 0)//0表示不属于不论什么一个slab
        return 0;

    mutex_lock(&cache_lock);
    /* do a quick check if we have any expired items in the tail.. */
    int tries = 5;
    /* Avoid hangs if a slab has nothing but refcounted stuff in it. */
    int tries_lrutail_reflocked = 1000;
    int tried_alloc = 0;
    item *search;
    item *next_it;
    void *hold_lock = NULL;
    rel_time_t oldest_live = settings.oldest_live;

    search = tails[id];
    /* We walk up *only* for locked items. Never searching for expired.
     * Waste of CPU for almost all deployments */
     //第一次看这个for循环,直接觉得search等于NULL,直接看for循环后面的代码
     //这个循环里面会在相应LRU队列中查找过期失效的item。最多尝试tries个item。
     //从LRU的队尾開始尝试。

假设item被其它worker线程引用了,那么就尝试下一
     //个。假设没有的被其它worker线程所引用,那么就測试该item是否过期失效。
     //假设过期失效了。那么就能够使用这个item(终于会返回这个item)。假设没有
     //过期失效,那么不再尝试其它item了(由于是从LRU队列的队尾開始尝试的),
     //直接调用slabs_alloc申请一个新的内存存储item。假设申请新内存都失败,
     //那么在同意LRU淘汰的情况下就会启动踢人机制。
    for (; tries > 0 && search != NULL; tries--, search=next_it) {
        /* we might relink search mid-loop, so search->prev isn't reliable */
        next_it = search->prev;
        if (search->nbytes == 0 && search->nkey == 0 && search->it_flags == 1) {
            /* We are a crawler, ignore it. */
			//这是一个爬虫item,直接跳过
            tries++;//爬虫item不计入尝试的item数中
            continue;
        }
        uint32_t hv = hash(ITEM_key(search), search->nkey);
        /* Attempt to hash item lock the "search" item. If locked, no
         * other callers can incr the refcount
         */
        /* Don't accidentally grab ourselves, or bail if we can't quicklock */
		//尝试抢占锁,抢不了就走人,不等待锁。
        if (hv == cur_hv || (hold_lock = item_trylock(hv)) == NULL)
            continue;

        /* Now see if the item is refcount locked */
        if (refcount_incr(&search->refcount) != 2) {//引用数>=3
            /* Avoid pathological case with ref'ed items in tail */
			//刷新这个item的訪问时间以及在LRU队列中的位置
            do_item_update_nolock(search);
            tries_lrutail_reflocked--;
            tries++;
            refcount_decr(&search->refcount);
			//此时引用数>=2

            itemstats[id].lrutail_reflocked++;
            /* Old rare bug could cause a refcount leak. We haven't seen
             * it in years, but we leave this code in to prevent failures
             * just in case */
            //考虑这种情况:某一个worker线程通过refcount_incr添加了一个
            //item的引用数。

但由于某种原因(可能是内核出了问题),这个worker
            //线程还没来得及调用refcount_decr就挂了。此时这个item的引用数
            //就肯定不会等于0。也就是总有worker线程占用着它.但实际上这个
            //worker线程早就挂了。

所以对于这种情况须要修复。直接把这个item
            //的引用计数赋值为1。
            //依据什么推断某一个worker线程挂了呢?首先在memcached里面,一般
            //来说,不论什么函数都的调用都不会耗时太大的。即使这个函数须要加锁
            //所以假设这个item的最后一次訪问时间距离如今都比較遥远了,但它
            //却还被一个worker所引用。那么就差点儿能够推断这个worker线程挂了.
            //在1.4.16版本号之前,这个时间距离都是固定的为3个小时。

从1.4.16开
            //就使用settings.tail_repair_time存储时间距离。能够在启动memcached
            //的时候设置,默认时间距离为1个小时。如今这个版本号1.4.21默认都不
            //进行这个修复了,settings.tail_repair_time的默认值为0。由于
            //memcached的作者非常少看到这个bug了。预计是由于操作系统的进一步稳定
            //http://brionas.github.io/2014/01/06/memcached-manage/
            //http://www.oschina.net/news/46787/memcached-1-4-16
            if (settings.tail_repair_time &&
                    search->time + settings.tail_repair_time < current_time) {
                itemstats[id].tailrepairs++;
                search->refcount = 1;
                do_item_unlink_nolock(search, hv);
            }
            if (hold_lock)
                item_trylock_unlock(hold_lock);

            if (tries_lrutail_reflocked < 1)
                break;

            continue;
        }

		//search指向的item的refcount等于2,这说明此时这个item除了本worker
		//线程外,没有其它不论什么worker线程索引其。能够放心释放并重用这个item

		 //由于这个循环是从lru链表的后面開始遍历的。所以一開始search就指向
         //了最不经常使用的item。假设这个item都没有过期。那么其它的比其更经常使用
        //的item就不要删除了(即使它们过期了)。此时仅仅能向slabs申请内存
        /* Expired or flushed */
        if ((search->exptime != 0 && search->exptime < current_time)
            || (search->time <= oldest_live && oldest_live <= current_time)) {
			//search指向的item是一个过期失效的item,能够使用之
			itemstats[id].reclaimed++;
            if ((search->it_flags & ITEM_FETCHED) == 0) {
                itemstats[id].expired_unfetched++;
            }
            it = search;
			//又一次计算一下这个slabclass_t分配出去的内存大小
			//直接霸占旧的item就须要又一次计算
            slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);
            do_item_unlink_nolock(it, hv);//从哈希表和lru链表中删除
            /* Initialize the item block: */
            it->slabs_clsid = 0;
        } else if ((it = slabs_alloc(ntotal, id)) == NULL) {//申请内存失败
			//此刻,过期失效的item没有找到。申请内存又失败了。看来仅仅能使用
			//LRU淘汰一个item(即使这个item并没有过期失效)

			tried_alloc = 1;//标志尝试过了alloc
            if (settings.evict_to_free == 0) {//设置了不进行LRU淘汰item
            	//此时仅仅能向client回复错误了
                itemstats[id].outofmemory++;
            } else {
                itemstats[id].evicted++;//添加被踢的item数
                itemstats[id].evicted_time = current_time - search->time;
				//即使一个item的exptime成员设置为永不超时(0)。还是会被踢的
				if (search->exptime != 0)
                    itemstats[id].evicted_nonzero++;
                if ((search->it_flags & ITEM_FETCHED) == 0) {
                    itemstats[id].evicted_unfetched++;
                }
                it = search;
				//又一次计算一下这个slabclass_t分配出去的内存大小
				//直接霸占旧的item就须要又一次计算
                slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);
                do_item_unlink_nolock(it, hv);//从哈希表和lru链表中删除
                /* Initialize the item block: */
                it->slabs_clsid = 0;

                /* If we've just evicted an item, and the automover is set to
                 * angry bird mode, attempt to rip memory into this slab class.
                 * TODO: Move valid object detection into a function, and on a
                 * "successful" memory pull, look behind and see if the next alloc
                 * would be an eviction. Then kick off the slab mover before the
                 * eviction happens.
                 */
                //一旦发现有item被踢,那么就启动内存页重分配操作
                //这个太频繁了,不推荐
                if (settings.slab_automove == 2)
                    slabs_reassign(-1, id);
            }
        }

		//引用计数减一。此时该item已经没有不论什么worker线程索引其,而且哈希表也
		//不再索引其
        refcount_decr(&search->refcount);
        /* If hash values were equal, we don't grab a second lock */
        if (hold_lock)
            item_trylock_unlock(hold_lock);
        break;
    }

	//没有尝试过alloc。而且在查找特定次数后还是没有找到可用的item
    if (!tried_alloc && (tries == 0 || search == NULL))
        it = slabs_alloc(ntotal, id);

    if (it == NULL) {
        itemstats[id].outofmemory++;
        mutex_unlock(&cache_lock);
        return NULL;
    }

    assert(it->slabs_clsid == 0);
    assert(it != heads[id]);

    /* Item initialization can happen outside of the lock; the item's already
     * been removed from the slab LRU.
     */
    it->refcount = 1;     /* the caller will have a reference */
    mutex_unlock(&cache_lock);

	//脱离之前的前后关系
    it->next = it->prev = it->h_next = 0;
    it->slabs_clsid = id;

	//此时这个item没有插入不论什么LRU队列和没有插入到哈希表中

    DEBUG_REFCNT(it, '*');
	//默认情况下memcached是支持CAS的,假设想取消能够在启动memcached的时候添加
	//參数C(大写的c)
    it->it_flags = settings.use_cas ? ITEM_CAS : 0;
    it->nkey = nkey;
    it->nbytes = nbytes;
    memcpy(ITEM_key(it), key, nkey);
    it->exptime = exptime;
    memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
    it->nsuffix = nsuffix;
    return it;
}

process_update_command函数申请分配一个item后。并没有直接直接把这个item插入到LRU队列和哈希表中,而不过用conn结构体的item成员指向这个申请得到的item,而且用ritem成员指向item结构体的数据域(这为了方便写入数据)。最后conn的状态改动为conn_nread。就这样process_update_command函数曳然而止了。

填充item数据域:

值得注意的是,前面的命令处理过程是没有把item的数据写入到item结构体中。

如今要退出到有限自己主动机drive_machine函数中,查看memcached是怎么处理conn_nread状态的。尽管process_update_command留下了手尾,但它也用conn的成员变量记录了一些重要值,用于填充item的数据域。

比方rlbytes表示须要用多少字节填充item。rbytes表示读缓冲区还有多少字节能够使用;ritem指向数据填充地点。

static void drive_machine(conn *c) {
    bool stop = false;
	int res;

    while (!stop) {

        switch(c->state) {
	case conn_nread:
			//对于set、add、replace这种命令会将state设置成conn_nread
			//由于在conn_read。它仅仅读取了一行的数据。就去解析。但数据是
			//在第二行输入的(client输入进行操作的时候),此时,rlbytes
			//等于data的长度。

本case里面会从conn的读缓冲区、socket读缓冲区
			//读取数据到item里面。

			//rlbytes标识还有多少字节须要读取到item里面。仅仅要没有读取足够的
			//数据,conn的状态都是保持为conn_nread。

即使读取到足够的数据
			//状态还是不变,但此时rlbytes等于0。此刻会进入以下的这个if里面
            if (c->rlbytes == 0) {
				//处理完毕后会调用out_string函数。假设用户明白要求不须要回复
				//那么conn的状态变成conn_new_cmd。

假设须要回复,那么状态改为
				//conn_write,而且write_and_go成员赋值为conn_new_cmd
                complete_nread(c);//完毕对一个item的操作
                break;
            }

            /* first check if we have leftovers in the conn_read buffer */
            if (c->rbytes > 0) {//conn读缓冲区里面还有数据,那么把数据直接赋值到item里面
            	//rlbytes是须要读取的字节数, rbytes是读缓冲区拥有的字节数
                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
                if (c->ritem != c->rcurr) {
                    memmove(c->ritem, c->rcurr, tocopy);
                }
                c->ritem += tocopy;
                c->rlbytes -= tocopy;
                c->rcurr += tocopy;
                c->rbytes -= tocopy;
                if (c->rlbytes == 0) {//conn读缓冲区的数据能满足item的所需数据,无需从socket中读取
                    break;
                }
            }

			//以下的代码中,仅仅要不发生socket错误,那么不管是否读取到足够的数据
			//都不会改变conn的状态,也就是说。下一次进入状态机还是为conn_nread状态
            /*  now try reading from the socket */
            res = read(c->sfd, c->ritem, c->rlbytes);//直接从socket中读取数据
            if (res > 0) {
                if (c->rcurr == c->ritem) {
                    c->rcurr += res;
                }
                c->ritem += res;
                c->rlbytes -= res;
                break;
            }
            if (res == 0) { /* end of stream */
                conn_set_state(c, conn_closing);
                break;
            }
            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {//socket里面没有数据
                if (!update_event(c, EV_READ | EV_PERSIST)) {

                    conn_set_state(c, conn_closing);
                    break;
                }
                stop = true;//此时就不要再读了。停止状态机,等待libevent通知有数据可读
                break;
            }
            /* otherwise we have a real error, on which we close the connection */
            conn_set_state(c, conn_closing);
            break;

		}
	}
}

存储item:

填充数据还是比較简单的。填充数据后这个item就是完整的了,此时须要把item插入到LRU队列和哈希表中。

Memcached是调用complete_nread函数完毕这操作。

complete_nread内部会间接调用函数do_store_item,后者会先调用do_item_get函数查询当前memcachedserver是否已经存在同样键值的item,然后依据不同的命令(add、replace、set)进行不同的处理。

static void complete_nread(conn *c) {
    assert(c != NULL);
    assert(c->protocol == ascii_prot
           || c->protocol == binary_prot);

    if (c->protocol == ascii_prot) {//文本协议
        complete_nread_ascii(c);
    } else if (c->protocol == binary_prot) {//二进制协议
        complete_nread_binary(c);
    }
}

/*
 * we get here after reading the value in set/add/replace commands. The command
 * has been stored in c->cmd, and the item is ready in c->item.
 */
static void complete_nread_ascii(conn *c) {
    assert(c != NULL);

	//此时这个item不在LRU队列,也不在哈希表中
	//而且引用数等于1(就是本worker线程在引用它)

    item *it = c->item;
    int comm = c->cmd;
    enum store_item_type ret;

    pthread_mutex_lock(&c->thread->stats.mutex);
    c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
    pthread_mutex_unlock(&c->thread->stats.mutex);

	//保证最后的两个字符是"\r\n",否则就是错误数据
    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
        out_string(c, "CLIENT_ERROR bad data chunk");
    } else {
      ret = store_item(it, comm, c);//将这个item存放到LRU对和哈希表中

	  //输出回应信息
      switch (ret) {
      case STORED:
          out_string(c, "STORED");
          break;
      case EXISTS:
          out_string(c, "EXISTS");
          break;
      case NOT_FOUND:
          out_string(c, "NOT_FOUND");
          break;
      case NOT_STORED:
          out_string(c, "NOT_STORED");
          break;
      default:
          out_string(c, "SERVER_ERROR Unhandled storage type.");
      }

    }

	//本worker线程取消对这个item的引用
    item_remove(c->item);       /* release the c->item reference */
    c->item = 0;
}

enum store_item_type store_item(item *item, int comm, conn* c) {
    enum store_item_type ret;
    uint32_t hv;

    hv = hash(ITEM_key(item), item->nkey);
    item_lock(hv);
    ret = do_store_item(item, comm, c, hv);
    item_unlock(hv);
    return ret;
}

 //主调函数store_item会加item_lock(hv)锁
 //set、add、replace命令终于都会调用本函数进行存储的
 //comm參数保存了详细是哪个命令
enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) {
    char *key = ITEM_key(it);
    item *old_it = do_item_get(key, it->nkey, hv);//查询旧值
    enum store_item_type stored = NOT_STORED;

    item *new_it = NULL;
    int flags;

    if (old_it != NULL && comm == NREAD_ADD) {
        /* add only adds a nonexistent item, but promote to head of LRU */
		//由于已经有同样键值的旧item了。所以add命令使用失败。但
		//还是会刷新旧item的訪问时间以及LRU队列中的位置
        do_item_update(old_it);
    } else if (!old_it && (comm == NREAD_REPLACE
        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
    {
        /* replace only replaces an existing value; don't store */
    } else if (comm == NREAD_CAS) {
        /* validate cas operation */
        if(old_it == NULL) {
            // LRU expired
            stored = NOT_FOUND;
            pthread_mutex_lock(&c->thread->stats.mutex);
            c->thread->stats.cas_misses++;
            pthread_mutex_unlock(&c->thread->stats.mutex);
        }
        else if (ITEM_get_cas(it) == ITEM_get_cas(old_it)) {
            // cas validates
            // it and old_it may belong to different classes.
            // I'm updating the stats for the one that's getting pushed out
            pthread_mutex_lock(&c->thread->stats.mutex);
            c->thread->stats.slab_stats[old_it->slabs_clsid].cas_hits++;
            pthread_mutex_unlock(&c->thread->stats.mutex);

            item_replace(old_it, it, hv);
            stored = STORED;
        } else {
            pthread_mutex_lock(&c->thread->stats.mutex);
            c->thread->stats.slab_stats[old_it->slabs_clsid].cas_badval++;
            pthread_mutex_unlock(&c->thread->stats.mutex);

            if(settings.verbose > 1) {
                fprintf(stderr, "CAS:  failure: expected %llu, got %llu\n",
                        (unsigned long long)ITEM_get_cas(old_it),
                        (unsigned long long)ITEM_get_cas(it));
            }
            stored = EXISTS;
        }
    } else {
        /*
         * Append - combine new and old record into single one. Here it's
         * atomic and thread-safe.
         */
        if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
            /*
             * Validate CAS
             */
            if (ITEM_get_cas(it) != 0) {
                // CAS much be equal
                if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) {
                    stored = EXISTS;
                }
            }

            if (stored == NOT_STORED) {
                /* we have it and old_it here - alloc memory to hold both */
                /* flags was already lost - so recover them from ITEM_suffix(it) */

                flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);

				//由于是追加数据,先前分配的item可能不够大,所以要又一次申请item
                new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */, hv);

                if (new_it == NULL) {
                    /* SERVER_ERROR out of memory */
                    if (old_it != NULL)
                        do_item_remove(old_it);

                    return NOT_STORED;
                }

                /* copy data from it and old_it to new_it */

                if (comm == NREAD_APPEND) {
                    memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
                    memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
                } else {
                    /* NREAD_PREPEND */
                    memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
                    memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
                }

                it = new_it;
            }
        }

		//add、set、replace命令还没处理,但之前已经处理了不合理的情况
		//即add命令已经确保了眼下哈希表还没存储相应键值的item,replace命令
		//已经保证哈希表已经存储了相应键值的item
        if (stored == NOT_STORED) {
            if (old_it != NULL)//replace和set命令会进入这里
                item_replace(old_it, it, hv);//删除旧item,插入新item
            else//add和set命令会进入这里
                do_item_link(it, hv);//对于一个没有存在的key,使用set命令会来到这里

            c->cas = ITEM_get_cas(it);

            stored = STORED;
        }
    }

    if (old_it != NULL)
        do_item_remove(old_it);         /* release our reference */
    if (new_it != NULL)
        do_item_remove(new_it);

    if (stored == STORED) {
        c->cas = ITEM_get_cas(it);
    }

    return stored;
}

int item_replace(item *old_it, item *new_it, const uint32_t hv) {
    return do_item_replace(old_it, new_it, hv);
}

//把旧的删除,插入新的。replace命令会调用本函数.
//不管旧item是否有其它worker线程在引用,都是直接将之从哈希表和LRU队列中删除
int do_item_replace(item *it, item *new_it, const uint32_t hv) {
    MEMCACHED_ITEM_REPLACE(ITEM_key(it), it->nkey, it->nbytes,
                           ITEM_key(new_it), new_it->nkey, new_it->nbytes);
    assert((it->it_flags & ITEM_SLABBED) == 0);

    do_item_unlink(it, hv);//直接丢弃旧item
    return do_item_link(new_it, hv);//插入新item。作为替换
}

关于do_item_unlink和do_item_link函数能够參考《插入和删除item》。

至此已经完毕了item的存储。

回应命令:

在complete_nread_ascii函数中,不管是存储成功还是失败都会调用out_string函数回应client。

static void out_string(conn *c, const char *str) {
    size_t len;

    assert(c != NULL);

    if (c->noreply) {//不须要回复信息给client
        if (settings.verbose > 1)
            fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
        c->noreply = false; //重置
        conn_set_state(c, conn_new_cmd);
        return;
    }

    /* Nuke a partial output... */
    c->msgcurr = 0;
    c->msgused = 0;
    c->iovused = 0;
    add_msghdr(c);

    len = strlen(str);
    if ((len + 2) > c->wsize) {///2是后面的\r\n
        /* ought to be always enough. just fail for simplicity */
        str = "SERVER_ERROR output line too long";
        len = strlen(str);
    }

    memcpy(c->wbuf, str, len);
    memcpy(c->wbuf + len, "\r\n", 2);
    c->wbytes = len + 2;
    c->wcurr = c->wbuf;

    conn_set_state(c, conn_write);//写状态
    c->write_and_go = conn_new_cmd;//写完后的下一个状态
    return;
}

static void drive_machine(conn *c) {
    bool stop = false;
    int res;

    assert(c != NULL);

	//drive_machine被调用会进行状态推断,并进行一些处理。但也可能发生状态的转换
	//此时就须要一个循环,当进行状态转换时。也能处理
    while (!stop) {

        switch(c->state) {

        case conn_write:

            if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) {
                if (add_iov(c, c->wcurr, c->wbytes) != 0) {
                    if (settings.verbose > 0)
                        fprintf(stderr, "Couldn't build response\n");
                    conn_set_state(c, conn_closing);
                    break;
                }
            }

            /* fall through... */

        case conn_mwrite:
			...
		}
	}

}

对于状态conn_mwrite的详细处理,能够參考前一篇博文的《回应命令》。须要注意的是,当memcached回应完client后。还须要释放conn对保存item的占有。这和前一篇博文是一样的,參考前一篇博文就可以。

时间: 2024-10-28 16:26:04

memcached源代码分析-----set命令处理流程的相关文章

Memcached源代码分析 - Memcached源代码分析之消息回应(3)

文章列表: <Memcached源代码分析 - Memcached源代码分析之基于Libevent的网络模型(1)> <Memcached源代码分析 - Memcached源代码分析之命令解析(2)> <Memcached源代码分析 - Memcached源代码分析之消息回应(3)  > <Memcached源代码分析 - Memcached源代码分析之HashTable(4) > <Memcached源代码分析 - Memcached源代码分析之增删

memcached源码分析-----set命令处理流程

转载请注明出处:http://blog.csdn.net/luotuo44/article/details/44236591 前一篇博文以get命令为例子把整个处理流程简单讲述了一遍,本篇博文将以set命令详细讲述memcached的处理流程.具体的命令为"set tt 3 0 10",并假设当然memcached服务器没有名为tt的item. 读取命令: 在前一篇博文的最后,conn的状态被设置为conn_new_cmd,回到了一开始的状态.如果此时conn结构体里面的buff还有其

memcached源码分析-----get命令处理流程

转载请注明出处:http://blog.csdn.net/luotuo44/article/details/44217383 本文以get命令为例子,探讨memcached是如何处理命令的.本文只是探讨memcached处理命令的工作流程,具体的代码细节在不影响阅读的前提下能省略的就省略.能取默认值就取默认值.内存是足够的(不需要动态申请空间就够用了).涉及到数组.缓存区的就假设已经分配好了. 现在假定memcached里面有了一个键值为"tk"的item,此时我们使用命令"

openVswitch(OVS)源代码分析之工作流程(数据包处理)

上篇分析到数据包的收发,这篇开始着手分析数据包的处理问题.在openVswitch中数据包的处理是其核心技术,该技术分为三部分来实现:第一.根据skb数据包提取相关信息封装成key值:第二.根据提取到key值和skb数据包进行流表的匹配:第三.根据匹配到的流表做相应的action操作(若没匹配到则调用函数往用户空间传递数据包):其具体的代码实现在 datapath/datapath.c 中的,函数为: void ovs_dp_process_received_packet(struct vpor

openVswitch(OVS)源代码分析之工作流程(收发数据包)

前面已经把分析openVswitch源代码的基础(openVswitch(OVS)源代码分析之数据结构)写得非常清楚了,虽然访问的人比较少,也因此让我看到了一个现象:第一篇,openVswitch(OVS)源代码分析之简介其实就是介绍了下有关于云计算现状和openVswitch的各个组成模块,还有笼统的介绍了下其工作流程,个人感觉对于学习openVswitch源代码来说没有多大含金量.云计算现状是根据公司发展得到的个人体会,对学习openVswitch源代码其实没什么帮助:openVswitch

Monkey源代码分析之执行流程

在<MonkeyRunner源代码分析之与Android设备通讯方式>中.我们谈及到MonkeyRunner控制目标android设备有多种方法.当中之中的一个就是在目标机器启动一个monkey服务来监听指定的一个port,然后monkeyrunner再连接上这个port来发送命令.驱动monkey去完毕对应的工作. 当时我们仅仅分析了monkeyrunner这个client的代码是怎么实现这一点的,但没有谈monkey那边是怎样接受命令,接受到命令又是怎样处理的. 所以自己打开源代码看了一个

Raid1源代码分析--读流程

我阅读的代码的linux内核版本是2.6.32.61.刚进实验室什么都不懂,处于摸索阶段,近期的任务就是阅读raid1的源码.第一次接触raid相关的东西,网上分析源码的资料又比较少,不详细.逐行阅读代码,做了笔记.如果要对raid1的读流程有个整体上的把握,需要将笔记中的主线提炼出来,这里不写了.理解不足或者有误之处,希望批评指正. 读流程主要涉及以下函数: 请求函数make_request 读均衡read_balance 回调函数raid1_end_read_request 读出错处理rai

【Heritrix源代码分析4】开始一个爬虫抓取的全流程代码分析

在创建一个job后,就要开始job的运行,运行的全流程如下: 1.在界面上启动job 2.index.jsp 查看上述页面对应的源代码 <a href='"+request.getContextPath()+"/console/action.jsp?action=start'>Start</a> 3.action.jsp String sAction = request.getParameter("action"); if(sAction !

hbase0.96 put流程 源代码分析

1.HashMap的遍历 package com.sheepmu; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; public class KMPText { public static void main(String[] args) { Map<String,String> map=new HashMap<String,Str