redis学习笔记(12)---server基本流程

server工作流程

  当执行./redis-server后,redis数据库的server端就会启动。

  然后就会执行redis.c中的main()函数

  其中main()函数中的工作可以主要分为以下几个部分:

  

  • 1、初始化server端的配置信息- - -initServerConfig()
  • 2、解析运行时的命令参数,并根据参数进行处理,eg:./redis-server - -help
  • 3、如果设置了daemonize参数,则将server设为deamon进程- - -daemonize()
  • 4、启动server- - -initServer()
  • 5、设置周期性处理函数beforeSleep()
  • 6、开始工作- - -aeMain()

1、initServerConfig()

  初始化server端的配置信息,保存在服务器实例server中,包括监听端口、DB数、命令表等信息。

  

2、daemonize()  

  将进程设为守护进程。

  守护进程的相关知识之前在linux进程基础 中已经进行了简单介绍。  

void daemonize(void) {
    int fd;
    if (fork() != 0) exit(0); /* parent exits */
    setsid(); /* create a new session */
    if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
        dup2(fd, STDIN_FILENO);
        dup2(fd, STDOUT_FILENO);
        dup2(fd, STDERR_FILENO);
        if (fd > STDERR_FILENO) close(fd);
    }
}

3、initServer()

  这个函数中完成了非常多的任务,包括设置信号处理函数、 创建clients队列、slaves队列、创建数据库、创建共享对象等。

  除此之外最重要的两个任务是创建监听socket并监听client、以及创建周期性处理事件。

  我们知道,任何一个服务器的的事件都可以分为IO读写事件和时间处理事件,redis同样如此。

  1)IO读写事件,包括监听客户端的连接以及与客户端进行数据交互等

  2)时间处理事件,在设定的时间处理相关事件,包括周期性刷新数据库等。

  这两类事件都是通过server.el这个变量来保存的。

3.1、IO读写事件

  首先redis会为服务器创建一个监听fd,来监听来自客户端的连接,主要会调用到以下两个函数  

listenToPort(server.port,server.ipfd,&server.ipfd_count);
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL);
  • listenToPort:根据传入的port创建监听描述符,同时调用fcntl()将fd设为非阻塞的,然后调用bind()、listen()函数。注意redis会分别根据IPv4、IPv6两种地址分别创建一个socket
  • aeCreateFileEvent:创建读写事件。对于监听描述符而言,只需要创建一个读事件监听来自client的连接即可。注意,监听描述符的回调函数为acceptTcpHandler

  最后将该事件加入到server.el结构中

3.2、时间处理事件

  对于server端,redis会周期性的执行serverCron()来完成一些处理,因此将这个事件也加入到server.el结构中  

aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
//每1ms执行一次serverCron()

4、beforeSleep()

  在redis的事件主循环中,每次循环都会执行一次,其中包括向所有slave发送ACK、写AOF文件等操作

5、aeMain()  

  redis服务器端最重要的函数,为redis的事件主循环。如果redis没有接收到中断信号,那么就会一直循环执行这个函数。  

aeMain(server.el);
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

  可以发现,该函数就是死循环的执行beforesleep()和aeProcessEvents()。

  作为redis服务器端的核心流程,aeProcessEvents()的实现代码较长,但是主要也只有3个动作

  

  • 1、计算调用select、epoll等函数可以阻塞的时间
  • 2、调用aeApiPoll()等待IO事件发生,若有事件发生,则调用相应的回调函数
  • 3、调用processTimeEvents()处理时间事件
  •   

  由于select、epoll等IO复用机制在一定时间内没有事件发生时,会一直阻塞在那里。因此为了不影响后面时间事件的处理,必须在最近的一个时间事件到来之前,完成IO复用机制的调用。因此首先找到最近一个时间事件,计算距离当前时间的时间差,来作为调用aeApiPoll()的参数。  

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        //1、有时间事件时,计算时间差
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop); //找到最近一个时间事件
        if (shortest) {
            aeGetTime(&now_sec, &now_ms); //得到当前时间
            tvp = &tv;
            //计算时间差tvp
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            if (flags & AE_DONT_WAIT) {  //此时不阻塞,立即返回
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else { //否则可以永远等待
                tvp = NULL; /* wait forever */
            }
        }
        //2、调用IO复用机制,处理IO事件
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;
            if (fe->mask & mask & AE_READABLE) { //处理读事件
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) { //处理写事件
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    //3、处理时间事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
    return processed;
}

  这就是redis服务器侧主要的工作流程了。

具体例子:

1、当有一个client连接到来时

  此时redis服务器端的监听描述符就会有事件发生,之前已经提到过该fd上只注册了读事件acceptTcpHandler(),因此执行fe->rfileProc(eventLoop,fd,fe->clientData,mask);就会调用acceptTcpHandler()函数

  1)首先acceptTcpHandler()会调用accept获取连接描述符cfd

  2)然后调用acceptCommonHandler()创建一个client实例  

  3)在createClient()中会为每个连接描述符注册读事件readQueryFromClient() ,同时将每个client的默认数据库设为0

  这样client和server端的连接就建立好了。当有客户端请求到来时,就会执行readQueryFromClient()函数 ,来处理该请求了。

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); //1、调用accept获取连接描述符cfd
        if (cfd == ANET_ERR) {
            ......
        }
        acceptCommonHandler(cfd,0);
    }
}
static void acceptCommonHandler(int fd, int flags) {
    redisClient *c;
    if ((c = createClient(fd)) == NULL) { //2、创建client实例
        close(fd);
        return;
    }
}
redisClient *createClient(int fd) {
    redisClient *c = zmalloc(sizeof(redisClient));
    if (fd != -1) {
        anetNonBlock(NULL,fd);
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR){  //3、注册读事件
            /*  ......  */
        }
    }
    selectDb(c,0);
    /*  ......  */
    return c;
}

2、当有客户端请求到来时(eg:执行命令 set key value )

2.1、readQueryFromClient()读入请求并处理

  首先连接fd上的读事件会被触发,因此server端会调用readQueryFromClient()来进行处理。主要过程是:

    

  • 1、为接收缓存区申请内存
  • 2、调用read从客户端读入请求数据到c->querybuf中
  • 3、调用processInputBuffer对请求进行处理
  •   

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);  //1、申请空间
    nread = read(fd, c->querybuf+qblen, readlen);  //2、读请求
    processInputBuffer(c);   //3、处理输入请求
}

  当执行命令 set key value后,打印c->querybuf得到如下结果:

  

  即其中内容的格式为:

  

2.2、processInputBuffer()处理请求

  在processInputBuffer()中

  1)首先调用processMultibulkBuffer,按照协议格式,将接收缓冲区中的内容解析出来,并为每个参数创建一个字符串对象robject  

//主要处理如下
ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll); //1、首先解析出参数个数并转换成数字
c->multibulklen = ll; //将参数个数赋给multibulklen
c->argv = zmalloc(sizeof(robj*)*c->multibulklen);  //2、为对象分配内存
c->argv[c->argc++] = createStringObject(c->querybuf+pos,c->bulklen); //3、依次创建ll个对象

  对于本例,ll = 3,最终会生成3个字符串对象,字符串的内容分别为”set” 、”key” 、”value”。

  c->argv的类型为robj **argv; ,因此可以将其看作一个数组,数组中的每一项指向一个robj。

  在上一章已经讲过,当字符串长度小于39字节时,会采用embstr编码方式来组织数据,因此最终c->argv的内容如下:

  

    

  2)调用processCommand对命令进行处理

  首先查找命令,当命令不存在或参数个数不对时,错误则直接返回。

  然后中间会进行一系列判断,暂时不管

  最后就调用call()处理命令

  如本例中的命令为set,因此会在redisCommandTable中找到set命令,然后执行set命令对应的函数setCommand()

int processCommand(redisClient *c) {
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    if (!c->cmd) {  //没有找到命令
        return REDIS_OK;
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
               (c->argc < -c->cmd->arity)) { //命令参数个数错误
        return REDIS_OK;
    }
    if (c->flags & REDIS_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        call(c,REDIS_CALL_FULL);  //执行命令
    }
    return REDIS_OK;
}

  3)调用setCommand()执行命令

  setCommand()在上一篇中已经进行了简单的介绍,需要注意的是最后setCommand()会执行

  addReply(c, ok_reply ? ok_reply : shared.ok); 将操作的结果返回给客户端

  

2.3、调用addReply将结果返回给client

  1)该函数会调用prepareClientToWrite()首先将要返回给客户端的结果按照一定的格式保存到缓冲区中

  2)然后调用aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,sendReplyToClient, c) 在该连接fd上注册写事件。

  这样当server下一次执行aeMain函数时,就会检测到有写事件发生,就会调用sendReplyToClient()函数了。

  3)在sendReplyToClient()函数中,就会调用write系统调用将结果通过socket返回给client了。

  这样整个set key value命令就执行完了

本文所引用的源码全部来自Redis3.0.7版本

redis学习参考资料:

https://github.com/huangz1990/redis-3.0-annotated

Redis 设计与实现(第二版)

时间: 2024-10-29 03:17:48

redis学习笔记(12)---server基本流程的相关文章

(转)redis 学习笔记(1)-编译、启动、停止

redis 学习笔记(1)-编译.启动.停止 一.下载.编译 redis是以源码方式发行的,先下载源码,然后在linux下编译 1.1 http://www.redis.io/download 先到这里下载Stable稳定版,目前最新版本是2.8.17 1.2 上传到linux,然后运行以下命令解压 tar xzf redis-2.8.17.tar.gz 1.3 编译 cd redis-2.8.17make 注:make命令需要linux上安装gcc,若机器上未安装gcc,redhat环境下,如

Redis学习笔记4-Redis配置具体解释

在Redis中直接启动redis-server服务时, 採用的是默认的配置文件.採用redis-server   xxx.conf 这种方式能够依照指定的配置文件来执行Redis服务. 依照本Redis学习笔记中Redis的依照方式依照后,Redis的配置文件是/etc/redis/6379.conf.以下是Redis2.8.9的配置文件各项的中文解释. #daemonize no 默认情况下, redis 不是在后台运行的.假设须要在后台运行,把该项的值更改为 yes daemonize ye

Redis学习笔记7--Redis管道(pipeline)

redis是一个cs模式的tcp server,使用和http类似的请求响应协议.一个client可以通过一个socket连接发起多个请求命令.每个请求命令发出后client通常会阻塞并等待redis服务处理,redis处理完后请求命令后会将结果通过响应报文返回给client.基本的通信过程如下: Client: INCR X Server: 1 Client: INCR X Server: 2 Client: INCR X Server: 3 Client: INCR X Server: 4

Redis学习笔记(简单了解与运行)

Redis学习笔记(简单了解与运行) 开源的非关系型数据库 是REmote Dictionary Server(远程字典服务器)的缩写,以字典结构存储数据 允许其他应用通过TCP协议读写字典中的内容. Redis支持存储的键值数据类型 字符串类型 散列类型 列表类型 集合类型 有序集合类型 Redis的特性 通过一个列子看出Mysql和Redis的存储区别 例如: (存储一篇文章,文章包括:标题(title),正文(content),阅读量(views),标签(tags)) 需求: 把数据存储在

Redis学习笔记4-Redis配置详解

原文:  http://blog.csdn.net/mashangyou/article/details/24555191 在Redis中直接启动redis-server服务时, 采用的是默认的配置文件.采用redis-server   xxx.conf 这样的方式可以按照指定的配置文件来运行Redis服务.按照本Redis学习笔记中Redis的按照方式按照后,Redis的配置文件是/etc/redis/6379.conf.下面是Redis2.8.9的配置文件各项的中文解释. 1 #daemon

Redis学习笔记

Redis学习笔记:Redis是什么?redis是开源BSD许可高级的key-vlue存储系统可以用来存储字符串哈希结构链表.结构.集合,因此常用来提供数据结构服务. redis和memcache相比的独特之处:1.redis可以用来做存储,而memcache是用来做缓存 这个特点主要因为其有"持久化"的功能.2.存储的数据有"结构",对于memcache来说,存储的数据只有1种类型"字符串"而 redis则可以存储字符串.链表.哈希机构.集合.

Redis学习笔记~目录

redis是一个key-value存储系统.和Memcached类似,它支持存储的value类型相对更多,包括string(字符串).list(链表).set(集合).zset(sorted set --有序集合)和hashs(哈希类型).这些数据类型都 支持push/pop.add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的.在此基础上,redis支持各种不同方式的排 序.与memcached一样,为了保证效率,数据都是缓存在内存中.区别的是redis会周期性的把更

python基础教程_学习笔记12:充电时刻——模块

充电时刻--模块 python的标准安装包括一组模块,称为标准库. 模块 >>> import math >>> math.sin(0) 0.0 模块是程序 任何python程序都可以作为模块导入. $ cat hello.py #!/usr/bin/python print "Hello,signjing!" $ ./hello.py Hello,signjing! 假设将python程序保存在/home/ggz2/magiccube/mysh/p

python 学习笔记 12 -- 写一个脚本获取城市天气信息

最近在玩树莓派,前面写过一篇在树莓派上使用1602液晶显示屏,那么能够显示后最重要的就是显示什么的问题了.最容易想到的就是显示时间啊,CPU利用率啊,IP地址之类的.那么我觉得呢,如果能够显示当前时间.温度也是甚好的,作为一个桌面小时钟还是很精致的. 1. 目前有哪些工具 目前比较好用的应该是 weather-util, 之前我获取天气信息一般都是通过它. 使用起来也很简单: (1) Debian/Ubuntu 用户使用 sudo apt-get install weather-util 安装

Swift学习笔记(12)--数组和字典的复制

Swift中,数组Array和字典Dictionary是用结构来实现的,但是数组与字典和其它结构在进行赋值或者作为参数传递给函数的时候有一些不同. 并且数组和字典的这些操作,又与Foundation中的NSArray和NSDictionary不同,它们是用类来实现的. 注意:下面的小节将会介绍数组,字典,字符串等的复制操作.这些复制操作看起来都已经发生,但是Swift只会在确实需要复制的时候才会完整复制,从而达到最优的性能. 字典的赋值和复制操作 每次将一个字典Dictionary类型赋值给一个