Redis源码分析(二十二)--- networking网络协议传输

上次我只分析了Redis网络部分的代码一部分,今天我把networking的代码实现部分也学习了一遍,netWorking的代码更多偏重的是Client客户端的操作。里面addReply()系列的方法操作是主要的部分。光光这个系列的方法,应该占据了一半的API的数量。我把API分成了3个部分:

/* ------------ API ---------------------- */
void *dupClientReplyValue(void *o)	/* 复制value一份 */
int listMatchObjects(void *a, void *b) /* 比价2个obj是否相等 */
robj *dupLastObjectIfNeeded(list *reply) /* 返回回复列表中最后一个元素对象 */
void copyClientOutputBuffer(redisClient *dst, redisClient *src) /* 将源Client的输出buffer复制给目标Client */
static void acceptCommonHandler(int fd, int flags) /* 网络连接后的调用方法 */
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask)
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask)
void disconnectSlaves(void) /* 使server的slave失去连接 */
void replicationHandleMasterDisconnection(void)
void flushSlavesOutputBuffers(void) /* 从方法将会在freeMemoryIfNeeded(),释放内存空间函数,将存在内存中数据操作结果刷新到磁盘中 */
int processEventsWhileBlocked(void)

/* ------------- addReply API -----------------   */
int _addReplyToBuffer(redisClient *c, char *s, size_t len) /* 往客户端缓冲区中添加内容 */
void _addReplyObjectToList(redisClient *c, robj *o) /* robj添加到reply的列表中 */
void _addReplySdsToList(redisClient *c, sds s) /* 在回复列表中添加Sds字符串对象 */
void _addReplyStringToList(redisClient *c, char *s, size_t len) /* 在回复列表中添加字符串对象,参数中已经给定字符的长度 */
void addReply(redisClient *c, robj *obj) /* 在redisClient的buffer中写入数据,数据存在obj->ptr的指针中 */
void addReplySds(redisClient *c, sds s) /* 在回复中添加Sds字符串,下面的额addReply()系列方法原理基本类似 */
void addReplyString(redisClient *c, char *s, size_t len)
void addReplyErrorLength(redisClient *c, char *s, size_t len)
void addReplyError(redisClient *c, char *err) /* 往Reply中添加error类的信息 */
void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
void addReplyStatusLength(redisClient *c, char *s, size_t len)
void addReplyStatus(redisClient *c, char *status)
void addReplyStatusFormat(redisClient *c, const char *fmt, ...)
void *addDeferredMultiBulkLength(redisClient *c) /* 在reply list 中添加一个空的obj对象 */
void setDeferredMultiBulkLength(redisClient *c, void *node, long length)
void addReplyDouble(redisClient *c, double d) /* 在bulk reply中添加一个double类型值,bulk的意思为大块的,bulk reply的意思为大数据量的回复 */
void addReplyLongLongWithPrefix(redisClient *c, long long ll, char prefix)
void addReplyLongLong(redisClient *c, long long ll)
void addReplyMultiBulkLen(redisClient *c, long length)
void addReplyBulkLen(redisClient *c, robj *obj) /* 添加bulk 大块的数据的长度 */
void addReplyBulk(redisClient *c, robj *obj) /* 将一个obj的数据,拆分成大块数据的添加 */
void addReplyBulkCBuffer(redisClient *c, void *p, size_t len)
void addReplyBulkCString(redisClient *c, char *s)
void addReplyBulkLongLong(redisClient *c, long long ll)

/* ------------- Client API -----------------   */
redisClient *createClient(int fd) /* 创建redisClient客户端,1.建立连接,2.设置数据库,3.属性设置 */
int prepareClientToWrite(redisClient *c) /* 此方法将会被调用于Client准备接受新数据之前调用,在fileEvent为客户端设定writer的handler处理事件 */
static void freeClientArgv(redisClient *c)
void freeClient(redisClient *c) /* 释放freeClient,要分为Master和Slave2种情况作不同的处理 */
void freeClientAsync(redisClient *c)
void freeClientsInAsyncFreeQueue(void) /* 异步的free客户端 */
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) /* 将Client中的reply数据存入文件中 */
void resetClient(redisClient *c)
int processInlineBuffer(redisClient *c) /* 处理redis Client的内链的buffer,就是c->querybuf */
static void setProtocolError(redisClient *c, int pos)
int processMultibulkBuffer(redisClient *c) /* 处理大块的buffer */
void processInputBuffer(redisClient *c) /* 处理redisClient的查询buffer */
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) /* 从Client获取查询query语句 */
void getClientsMaxBuffers(unsigned long *longest_output_list,
                          unsigned long *biggest_input_buffer) /* 获取Client中输入buffer和输出buffer的最大长度值 */
void formatPeerId(char *peerid, size_t peerid_len, char *ip, int port) /* 格式化ip,port端口号的输出,ip:port */
int genClientPeerId(redisClient *client, char *peerid, size_t peerid_len) /* 获取Client客户端的ip,port地址信息 */
char *getClientPeerId(redisClient *c) /* 获取c->peerid客户端的地址信息 */
sds catClientInfoString(sds s, redisClient *client) /* 格式化的输出客户端的属性信息,直接返回一个拼接好的字符串 */
sds getAllClientsInfoString(void) /* 获取所有Client客户端的属性信息,并连接成一个总的字符串并输出 */
void clientCommand(redisClient *c) /* 执行客户端的命令的作法 */
void rewriteClientCommandVector(redisClient *c, int argc, ...) /* 重写客户端的命令集合,旧的命令集合的应用计数减1,新的Command  Vector的命令集合增1 */
void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) /* 重写Client中的第i个参数 */
unsigned long getClientOutputBufferMemoryUsage(redisClient *c) /* 获取Client中已经用去的输出buffer的大小 */
int getClientType(redisClient *c)
int getClientTypeByName(char *name) /* Client中的名字的3种类型,normal,slave,pubsub */
char *getClientTypeName(int class)
int checkClientOutputBufferLimits(redisClient *c) /* 判断Clint的输出缓冲区的已经占用大小是否超过软限制或是硬限制 */
void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) /* 异步的关闭Client,如果缓冲区中的软限制或是硬限制已经到达的时候,缓冲区超出限制的结果会导致释放不安全, */

我们从最简单的_addReplyToBuffer在缓冲区中添加回复数据开始说起,因为后面的各种addReply的方法都或多或少的调用了和这个歌方法。

/* -----------------------------------------------------------------------------
 * Low level functions to add more data to output buffers.
 * -------------------------------------------------------------------------- */
/* 往客户端缓冲区中添加内容 */
int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
    size_t available = sizeof(c->buf)-c->bufpos;

    if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK;

    /* If there already are entries in the reply list, we cannot
     * add anything more to the static buffer. */
     //如果当前的reply已经存在内容,则操作出错
    if (listLength(c->reply) > 0) return REDIS_ERR;

    /* Check that the buffer has enough space available for this string. */
    if (len > available) return REDIS_ERR;

    memcpy(c->buf+c->bufpos,s,len);
    c->bufpos+=len;
    return REDIS_OK;
}

最直接影响的一句话,就是memcpy(c->buf+c->bufpos,s,len);所以内容是加到c->buf中的,这也就是客户端的输出buffer,添加操作还有另外一种形式是添加对象类型:

/* robj添加到reply的列表中 */
void _addReplyObjectToList(redisClient *c, robj *o) {
    robj *tail;

    if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;

    if (listLength(c->reply) == 0) {
        incrRefCount(o);
        //在回复列表汇总添加robj内容
        listAddNodeTail(c->reply,o);
        c->reply_bytes += zmalloc_size_sds(o->ptr);
    } else {
        tail = listNodeValue(listLast(c->reply));

        /* Append to this object when possible. */
        if (tail->ptr != NULL &&
            sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
        {
            c->reply_bytes -= zmalloc_size_sds(tail->ptr);
            tail = dupLastObjectIfNeeded(c->reply);
            tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
            c->reply_bytes += zmalloc_size_sds(tail->ptr);
        } else {
            incrRefCount(o);
            listAddNodeTail(c->reply,o);
            c->reply_bytes += zmalloc_size_sds(o->ptr);
        }
    }
    asyncCloseClientOnOutputBufferLimitReached(c);
}

把robj对象加载reply列表中,并且改变reply的byte大小,最后还调用了一个asyncCloseClientOnOutputBufferLimitReached(c);方法,这个方法我是在这个文件的最底部找到的,一开始还真不知道什么意思,作用就是当添加完数据后,当客户端的输出缓冲的大小超出限制时,会被异步关闭:

/* Asynchronously close a client if soft or hard limit is reached on the
 * output buffer size. The caller can check if the client will be closed
 * checking if the client REDIS_CLOSE_ASAP flag is set.
 *
 * Note: we need to close the client asynchronously because this function is
 * called from contexts where the client can't be freed safely, i.e. from the
 * lower level functions pushing data inside the client output buffers. */
/* 异步的关闭Client,如果缓冲区中的软限制或是硬限制已经到达的时候,缓冲区超出限制的结果会导致释放不安全, */
void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) {
    redisAssert(c->reply_bytes < ULONG_MAX-(1024*64));
    if (c->reply_bytes == 0 || c->flags & REDIS_CLOSE_ASAP) return;
    if (checkClientOutputBufferLimits(c)) {
        sds client = catClientInfoString(sdsempty(),c);

        freeClientAsync(c);
        redisLog(REDIS_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
        sdsfree(client);
    }
}

在addReply方法调用的时候,有时是需要一个前提的,我说的是在写数据事件发生的时候,你得先对写的文件创建一个监听事件:

/* 在回复中添加Sds字符串 */
void addReplySds(redisClient *c, sds s) {
	//在调用添加操作之前,都要先执行prepareClientToWrite(c),设置文件事件的写事件
    if (prepareClientToWrite(c) != REDIS_OK) {
        /* The caller expects the sds to be free'd. */
        sdsfree(s);
        return;
    }
    if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) {
        sdsfree(s);
    } else {
        /* This method free's the sds when it is no longer needed. */
        _addReplySdsToList(c,s);
    }
}

在这个prepareClientToWrite()里面是干嘛的呢?

/* This function is called every time we are going to transmit new data
 * to the client. The behavior is the following:
 *
 * If the client should receive new data (normal clients will) the function
 * returns REDIS_OK, and make sure to install the write handler in our event
 * loop so that when the socket is writable new data gets written.
 *
 * If the client should not receive new data, because it is a fake client,
 * a master, a slave not yet online, or because the setup of the write handler
 * failed, the function returns REDIS_ERR.
 *
 * Typically gets called every time a reply is built, before adding more
 * data to the clients output buffers. If the function returns REDIS_ERR no
 * data should be appended to the output buffers. */
/* 此方法将会被调用于Client准备接受新数据之前调用,在fileEvent为客户端设定writer的handler处理事件 */
int prepareClientToWrite(redisClient *c) {
    if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;
    if ((c->flags & REDIS_MASTER) &&
        !(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;
    if (c->fd <= 0) return REDIS_ERR; /* Fake client */
    if (c->bufpos == 0 && listLength(c->reply) == 0 &&
        (c->replstate == REDIS_REPL_NONE ||
         c->replstate == REDIS_REPL_ONLINE) &&
        //在这里创建写的文件事件
        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
        sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
    return REDIS_OK;
}

在addReply的方法里提到了一个addReplyBulk类型方法,Bulk的中文意思为大块的,说明addReplyBulk添加的都是一些比较大块的数据,找一个方法看看:

/* Add a Redis Object as a bulk reply */
/* 将一个obj的数据,拆分成大块数据的添加 */
void addReplyBulk(redisClient *c, robj *obj) {
	//reply添加长度
    addReplyBulkLen(c,obj);
    //reply添加对象
    addReply(c,obj);
    addReply(c,shared.crlf);
}

将原本一个robj的数据拆分成可3个普通的addReply的方法调用。就变成了数据量变大了的数据。大数据的回复一个比较不好的地方是到时解析的时候或者是Data的复制的时候会比较耗时。在networking的方法里还提供了freeClient()的操作:

/* 释放freeClient,要分为Master和Slave2种情况作不同的处理 */
void freeClient(redisClient *c) {
    listNode *ln;

    /* If this is marked as current client unset it */
    if (server.current_client == c) server.current_client = NULL;

    /* If it is our master that's beging disconnected we should make sure
     * to cache the state to try a partial resynchronization later.
     *
     * Note that before doing this we make sure that the client is not in
     * some unexpected state, by checking its flags. */
    if (server.master && c->flags & REDIS_MASTER) {
        redisLog(REDIS_WARNING,"Connection with master lost.");
        if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY|
                          REDIS_CLOSE_ASAP|
                          REDIS_BLOCKED|
                          REDIS_UNBLOCKED)))
        {
        	//如果是Master客户端,需要做缓存Client的处理,可以迅速重新启用
            replicationCacheMaster(c);
            return;
        }
    }

...后面代码略去了

当Client中的输出buffer数据渐渐变多了的时候就要准备持久化到磁盘文件了,要调用下面这个方法了,

/* Helper function used by freeMemoryIfNeeded() in order to flush slave
 * output buffers without returning control to the event loop. */
/* 从方法将会在freeMemoryIfNeeded(),释放内存空间函数,将存在内存中数据操作结果刷新到磁盘中 */
void flushSlavesOutputBuffers(void) {
    listIter li;
    listNode *ln;

    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = listNodeValue(ln);
        int events;

        events = aeGetFileEvents(server.el,slave->fd);
        if (events & AE_WRITABLE &&
            slave->replstate == REDIS_REPL_ONLINE &&
            listLength(slave->reply))
        {
        	//在这里调用了write的方法
            sendReplyToClient(server.el,slave->fd,slave,0);
        }
    }
}

这个方法的核心调用又在sendReplyToClient()方法,就是把Client的reply内容和buf内容存入文件。以上就是我的理解了,代码量有点大,的确看的我头有点大。

时间: 2024-08-03 16:13:32

Redis源码分析(二十二)--- networking网络协议传输的相关文章

Redis源码分析(十二)--- redis-check-dump本地数据库检测

这个文件我在今天分析学习的时候,一直有种似懂非懂的感觉,代码量700+的代码,最后开放给系统的就是一个process()方法.这里说的说的数据库检测,是针对key的检测,会用到,下面提到的结构体: /* Data type to hold opcode with optional key name an success status */ /* 用于key的检测时使用,后续检测操作都用到了entry结构体 */ typedef struct { //key的名字 char* key; //类型

ABP源码分析三十二:ABP.SignalR

Realtime Realtime是ABP底层模块提供的功能,用于管理在线用户.它是使用SignalR实现给在线用户发送通知的功能的前提 IOnlineClient/OnlineClient: 封装在线用户的信息 OnlineClientManager/IOnlineClientManager: 用于提供基本维护在线用户的方法.其内部维护了一个字典来保存在线的客户信息. SingalR SignalRRealTimeNotifier: 实现了给在线用户发送通知的功能.其从IOnlineClien

Redis源码分析(十八)--- db.c内存数据库操作

我们知道Redis数据库作为一个内存数据库,与memcached比较类似,基本的操作都是存储在内存缓冲区中,等到缓冲区中数据满后,在持久化到磁盘中.今天,我主要研究了对于redis中对于内存数据库的操作.与普通的数据操作比较,并没有什么特别多的其他的一些操作.下面是我分类出的一些API: /*----------------------------------------------------------------------------- * C-level DB API *-------

hbase源码系列(十二)Get、Scan在服务端是如何处理?

继上一篇讲了Put和Delete之后,这一篇我们讲Get和Scan, 因为我发现这两个操作几乎是一样的过程,就像之前的Put和Delete一样,上一篇我本来只打算写Put的,结果发现Delete也可以走这个过程,所以就一起写了. Get 我们打开HRegionServer找到get方法.Get的方法处理分两种,设置了ClosestRowBefore和没有设置的,一般来讲,我们都是知道了明确的rowkey,不太会设置这个参数,它默认是false的. if (get.hasClosestRowBef

libevent源码深度剖析十二

libevent源码深度剖析十二 --让libevent支持多线程张亮 Libevent本身不是多线程安全的,在多核的时代,如何能充分利用CPU的能力呢,这一节来说说如何在多线程环境中使用libevent,跟源代码并没有太大的关系,纯粹是使用上的技巧. 1 错误使用示例 在多核的CPU上只使用一个线程始终是对不起CPU的处理能力啊,那好吧,那就多创建几个线程,比如下面的简单服务器场景.1 主线程创建工作线程1:2 接着主线程监听在端口上,等待新的连接:3 在线程1中执行event事件循环,等待事

redis 源码分析(一) 内存管理

一,redis内存管理介绍 redis是一个基于内存的key-value的数据库,其内存管理是非常重要的,为了屏蔽不同平台之间的差异,以及统计内存占用量等,redis对内存分配函数进行了一层封装,程序中统一使用zmalloc,zfree一系列函数,其对应的源码在src/zmalloc.h和src/zmalloc.c两个文件中,源码点这里. 二,redis内存管理源码分析 redis封装是为了屏蔽底层平台的差异,同时方便自己实现相关的函数,我们可以通过src/zmalloc.h 文件中的相关宏定义

redis源码分析之事务Transaction(下)

接着上一篇,这篇文章分析一下redis事务操作中multi,exec,discard三个核心命令. 原文地址:http://www.jianshu.com/p/e22615586595 看本篇文章前需要先对上面文章有所了解: redis源码分析之事务Transaction(上) 一.redis事务核心命令简介 redis事务操作核心命令: //用于开启事务 {"multi",multiCommand,1,"sF",0,NULL,0,0,0,0,0}, //用来执行事

redis源码分析4---结构体---跳跃表

redis源码分析4---结构体---跳跃表 跳跃表是一种有序的数据结构,他通过在每个节点中维持多个指向其他节点的指针,从而达到快速访问节点的目的: 跳跃表支持平均O(logN),最坏O(N)复杂度的节点查找,还可以通过顺序性操作来批量处理节点.性能上和平衡树媲美,因为事先简单,常用来代替平衡树. 在redis中,只在两个地方使用了跳跃表,一个是实现有序集合键,另一个是在集群节点中用作内部数据结构. 1 跳跃表节点 1.1 层 层的数量越多,访问其他节点的速度越快: 1.2 前进指针 遍历举例

redis源码分析3---结构体---字典

redis源码分析3---结构体---字典 字典,简单来说就是一种用于保存键值对的抽象数据结构: 注意,字典中每个键都是独一无二的:在redis中,内部的redis的数据库就是使用字典作为底层实现的: 1 字典的实现 在redis中,字典是使用哈希表作为底层实现的,一个hash表里面可以有多个hash表节点,而每个hash表节点就保存了字典中的一个键值对: hash表定义 table属性是一个数组,数组中的每个元素都是一个指向dictEntry结构的指针,每个dictEntry结构保存着一个键值

redis源码分析之内存布局

redis源码分析之内存布局 1. 介绍 众所周知,redis是一个开源.短小.高效的key-value存储系统,相对于memcached,redis能够支持更加丰富的数据结构,包括: 字符串(string) 哈希表(map) 列表(list) 集合(set) 有序集(zset) 主流的key-value存储系统,都是在系统内部维护一个hash表,因为对hash表的操作时间复杂度为O(1).如果数据增加以后,导致冲突严重,时间复杂度增加,则可以对hash表进行rehash,以此来保证操作的常量时