REdis命令处理流程处理分析

分析版本:REdis-5.0.4。

REdis命令处理流程可分解成三个独立的流程(不包括复制和持久化):

1) 接受连接请求流程;

2) 接收请求数据和处理请求流程,在这个过程并不会发送处理结果给Client,而只是将结果数据写入响应缓冲,将由响应请求流程来发送;

3) 响应请求流程。

上述三个流程均是异步化的,并且没有直接的联系。它们的共同点均是通过REdis的简单事件驱动(AE,A simple event-driven)触发,对于Linux实际是epoll的包装,对于macOS为evport的包装,对于FreeBSD对kqueue的包装,对于其它则是select的包装。

可以把ae.h/ae.c看成是抽象基类,而ae_epoll.c、ae_select.c、ae_evport.c、ae_kqueue.c看成是ea的具体实现,以面向对象来看,大致如下图所示:

从上图可以看出,当没有任何数据时,进程将阻塞在函数aeApiPoll(对于epoll实际为epoll_wait)处直接超时。

如果有连接请求进来,或者有连接发送数据过来,或者有响应数据还未发送完成(连接变成可写),aeApiPoll均会立即从阻塞状态返回。

注意,只有fd被塞进了epoll,并没有将client或aeFileEvent塞入epoll。因此当一个连接被激活(比如有数据需要接收)时,需要通过fd来查找到aeFileEvent,而client因为在创建aeFileEvent时就被赋值给了aeFileEvent的clientData,因此只需要找到aeFileEvent即可。

全局对象server(类型为redisServer,定义在server.h中)维护了一个全局的aeEventLoop在,则aeEventLoop维护了一个aeFileEvent数组,并且aeFileEvent的数组下标为fd,因此很容易通过fd找到对应的aeFileEvent。

之所以没有将aeFileEvent直接注入到epoll,是为了统一事件驱动,比如select就不支持。在进程启动执行initServer时,会调用aeCreateEventLoop初始化该数组,数组大小大于配置项maxclients指定的值(额外加128),这利用了fd作为操作系统内核资源是循环利用的特性。

1. 接受连接请求流程

接受一个连接后,为该连接创建一个client对象,并将该client注册到epoll中,注册事件为EPLLIN(对应于ea的AE_READABLE),。

对应的伪代码:


int main()

{

// “ae”为“A simple event-driven”的缩写

void aeMain()

{

while (!eventLoop->stop)

{

// 响应从beforesleep开始,

// 未完成部分才会走到aeApiPoll。

if (eventLoop->beforesleep != NULL)

eventLoop->beforesleep(eventLoop);

// aeProcessEvents处理各种事件,包括:

// 1) 接受连接请求,为每个连接创建一个client

// 2) 接收请求数据,和处理请求

// 3) 发送响应数据

// 4) 处理各类定时事件(调用processTimeEvents)

int aeProcessEvents()

{

// aeApiPoll实为epoll或select等

aeApiPoll();

acceptTcpHandler(int fd)

{

// fd为listen套接字

// anetTcpAccept底层调用的是accept

int cfd = anetTcpAccept(fd);

acceptCommonHandler(cfd)

{

// createClient会将c添加server.clients中,

// server.clients是一个链接。

client *c = createClient(cfd)

{

aeCreateFileEvent(

server.el,fd,AE_READABLE,

readQueryFromClient, c)

{

// mask值为AE_READABLE(对应于epoll的EPOLLIN),

// 对于epoll实际调用的是epoll_ctl。

aeApiAddEvent(eventLoop,fd,mask);

}

}

}

}

}

}

}

}

2. 接收请求数据和处理请求流程

这一块会调用相应命令的处理函数,比如SET命令的处理函数setCommand,GET命令的处理函数getCommand。命令处理函数会修改内存数据。

并将处理的结果写入响应缓冲区,但并不立即发送给client。同时也会将处理结果写入AOF缓冲区,如果开启了AOF。以及将命令写入到复制积压缓冲区,如果有开启或有需要。还会将命令写入到slaves的缓冲区,如果需要。

响应请求在另一独立的流程中进行,本流程并不直接发送响应给client。

对应的伪代码:


// 不包括响应命令,

// 响应和接收处理是分开的两个过程。

int main() // server.c:4003

{

// “ae”为“A simple event-driven”的缩写

void aeMain() // ae.c:496

{

while (!eventLoop->stop)

{

// 发送响应先在beforesleep中进行,

// 如果在beforesleep中没有发送完(比如响应的数据量过大),

// 则后续的发送会由aeApiPoll触发。

if (eventLoop->beforesleep != NULL)

eventLoop->beforesleep(eventLoop);

int aeProcessEvents() // ae.c:358

{

// aeApiPoll实为epoll或select等

aeApiPoll();

// readQueryFromClient是个回调函数,

// 在创建client时注册:

// client *createClient(int fd) {

//   aeCreateFileEvent(

//       server.el, fd,

//       AE_READABLE,

//       readQueryFromClient, c);

// }

void readQueryFromClient() // networking.c:1494

{

// 这里调用read收数据

// client传过来的数据大小不能超过配置项client-query-buffer-limit指定的值。

// 默认大小为1G,足够覆盖大部场景。

// 如果超过大小,则可看到WARNING日志:

// Closing client that reached max query buffer length

// 实际中,一般远小于1G,所以可能将这个值调小一点,以增加对REdis的保护。

int nread = read(fd, c->querybuf, readlen);

int processCommand(client*) // networking.c:2543

{

redisCommand* lookupCommand(name)

{

// REdis所有命令存储

// 在struct redisServer的command表中:

// struct redisServer {

//    dict *commands; // Command table

// };

// 可将redisCommand看作一个C++抽象基本,

// 该抽象基本定义了纯虚函数proc:

// typedef void redisCommandProc(client *c);

// struct redisCommand {

//   redisCommandProc *proc;

// };

// 而command表中的每一个成员则为redisCommand的实现。

return dictFetchValue(commands,name);

}

void call(client*,flags) // server.c:2414

{

// 回调具体的命令处理:

// 如果是SET命令,

// 实际调用的是t_string.c中的函数setCommand;

// 如果是DEL命令,

// 实际调用的是db.c中的函数delCommand。

redisCommand::proc(client*);

void propagate(redisCommand*) // server.c:2315

{

// 数据写入到AOF文件

feedAppendOnlyFile(); // aof.c:555

// 数据复制给所有Slaves

void replicationFeedSlaves(slaves) // replication.c:173

{

// 数据写入到复制积压(Backlog)缓冲区,

// 注意积压缓冲区是一个循环缓冲区,

// 如果满了,则从头覆盖写,

// 循环缓冲区的大小,

// 则配置项repl-backlog-size决定

feedReplicationBacklog(); // replication.c:126

}

}

}

}

}

}

}

}

}

// 以GET命令为列:

// 这里的list实际为server.clients_pending_write

// 所以需响应的client都添加到server.clients_pending_write链表中(可视为队列)

// struct redisServer server; // Server global state

#0  listAddNodeHead (list=0x7fe88bc0f210, value=0x7fe88bc64ec0) at adlist.c:92

// 并不是所有的命令都需要WriteHandler,

// 因此有些并不会调用clientInstallWriteHandler。

#1  in clientInstallWriteHandler (c=0x7fe88bc64ec0) at networking.c:185

#2  in prepareClientToWrite (c=0x7fe88bc64ec0) at networking.c:228

#3  in addReplyString (c=0x7fe88bc64ec0, s=0x7ffdfc2e70c0 "$855\r\n", len=6) at networking.c:338

#4  in addReplyLongLongWithPrefix (c=0x7fe88bc64ec0, ll=855, prefix=36 ‘$‘) at networking.c:515

#5  in addReplyBulkLen (c=0x7fe88bc64ec0, obj=0x7fe889312840) at networking.c:557

#6  in addReplyBulk (c=0x7fe88bc64ec0, obj=0x7fe889312840) at networking.c:562

#7  in getGenericCommand (c=0x7fe88bc64ec0) at t_string.c:167

#8  in getCommand (c=0x7fe88bc64ec0) at t_string.c:173

#9  in call (c=0x7fe88bc64ec0, flags=15) at server.c:2437

#10 in processCommand (c=0x7fe88bc64ec0) at server.c:2729

#11 in processInputBuffer (c=0x7fe88bc64ec0) at networking.c:1451

#12 in processInputBufferAndReplicate (c=0x7fe88bc64ec0) at networking.c:1486

#13 in readQueryFromClient (el=0x7fe88bc30050, fd=8, privdata=0x7fe88bc64ec0, mask=1) at networking.c:1568

#14 in aeProcessEvents (eventLoop=0x7fe88bc30050, flags=11) at ae.c:443

#15 in aeMain (eventLoop=0x7fe88bc30050) at ae.c:501

#16 in main (argc=2, argv=0x7ffdfc2e75b8) at server.c:4197

3. 响应请求流程

对于每一个有响应的命令,它的响应总是首先在beforesleep中进行,但如果一次没能发送完成,则会交给sendReplyToClient后续异步处理(以epoll为例,通过注册epoll的EPOLLOUT事件)。

对应的伪代码:


// 响应和接收处理是分开的两个过程。

int main()

{

// “ae”为“A simple event-driven”的缩写

void aeMain()

{

while (!eventLoop->stop)

{

// 调用eventLoop->beforesleep(eventLoop);

// 但实际调用的是server.c中的beforeSleep:

void beforeSleep(struct aeEventLoop*)

{

int handleClientsWithPendingWrites()

{

// REdis接收和处理

// 命令流程会设置clients_pending_write,

// clients_pending_write实为一个队列链接。

// 当处理完一个命令后,调用clientInstallWriteHandler

// 将当前client添加到clients_pending_write中。

// 但是有些命令并不需要响应,因此没有这个动作。

listRewind(server.clients_pending_write,&li);

while((ln = listNext(&li)))

{

int writeToClient(int fd,client* c)

{

write(fd,c->buf);

// 如果全部发送完了,

// 则调用aeDeleteFileEvent

// 将fd从epoll中移除。

if (!clientHasPendingReplies(c))

{

aeDeleteFileEvent(

server.el, c->fd, AE_WRITABLE); // 从epoll中删除EPOLLOUT

}

}

// 如果一次writeToClient调用没有发完,

// 则将fd注册到epoll

if (clientHasPendingReplies(c))

{

// 下列动作是设置epoll的EPOLLOUT

int ae_flags = AE_WRITABLE; // 将EPOLLOUT添加到epoll中

aeCreateFileEvent(

server.el, c->fd, ae_flags,

sendReplyToClient, c);

}

}

}

}

// REdis接收和处理一个命令流程

aeProcessEvents();

}

}

}

原文地址:https://www.cnblogs.com/aquester/p/10634863.html

时间: 2024-11-10 16:38:02

REdis命令处理流程处理分析的相关文章

redis 命令的调用过程

参考文献: Redis 是如何处理命令的(客户端) 我是如何通过添加一条命令学习redis源码的 从零开始写redis客户端(deerlet-redis-client)之路--第一个纠结很久的问题,restore引发的血案 redis命令执行流程分析 通信协议(protocol) Redis主从复制原理 Redis配置文件详解 当用户在redis客户端键入一个命令的时候,客户端会将这个命令发送到服务端.服务端会完成一系列的操作.一个redis命令在服务端大体经历了以下的几个阶段: 读取命令请求

安装logstash+kibana+elasticsearch+redis搭建集中式日志分析平台

本文是参考logstash官方文档实践的笔记,搭建环境和所需组件如下: Redhat 5.7 64bit / CentOS 5.x JDK 1.6.0_45 logstash 1.3.2 (内带kibana) elasticsearch 0.90.10 redis 2.8.4 搭建的集中式日志分析平台流程如下: elasticsearch 1.下载elasticsearch. wget https://download.elasticsearch.org/elasticsearch/elasti

第10课:[实战] Redis 网络通信模块源码分析(3)

redis-server 接收到客户端的第一条命令 redis-cli 给 redis-server 发送的第一条数据是 *1\r\n\$7\r\nCOMMAND\r\n .我们来看下对于这条数据如何处理,单步调试一下 readQueryFromClient 调用 read 函数收取完数据,接着继续处理 c→querybuf 的代码即可.经实际跟踪调试,调用的是 processInputBuffer 函数,位于 networking.c 文件中: /* This function is call

Redis 命令执行过程(下)

在上一篇文章中<Redis 命令执行过程(上)>中,我们首先了解 Redis 命令执行的整体流程,然后细致分析了从 Redis 启动到建立 socket 连接,再到读取 socket 数据到输入缓冲区,解析命令,执行命令等过程的原理和实现细节.接下来,我们来具体看一下 set 和 get 命令的实现细节和如何将命令结果通过输出缓冲区和 socket 发送给 Redis 客户端. set 和 get 命令具体实现 前文讲到 processCommand 方法会从输入缓冲区中解析出对应的 redi

Redis 发布/订阅机制原理分析

Redis 通过 PUBLISH. SUBSCRIBE 和 PSUBSCRIBE 等命令实现发布和订阅功能. 这些命令被广泛用于构建即时通信应用,比如网络聊天室(chatroom)和实时广播.实时提醒等. 本文通过分析 Redis 源码里的 pubsub.c 文件,了解发布和订阅机制的底层实现,籍此加深对 Redis 的理解. 订阅.发布和退订 在开始研究源码之前,不妨先来回顾一下几个相关命令的使用方式. PUBLISH 命令用于向给定的频道发送信息,返回值为接收到信息的订阅者数量: redis

Redis命令执行全过程

这个问题说简单也很简单,无非就是客户端发送命令请求,服务器读取命令请求,然后是命令执行器查找命令实现,执行预备操作,调用命令实现函数,执行后续工作. 但是我们想要了解的不能简简单单的就是这些.下面我们详细的来分析一下Redis命令执行的全过程. 发送命令请求 但用户通过客户端输入一个命令请求的时候,客户端首先会对用户输入的命令请求进行一个格式转换,转换成协议格式,然后通过连接到服务器的套接字把这个已经格式化的命令发送给服务器(服务器是通过套节字和客户端或者是其他服务器连接通信的) 读取命令请求

Shell命令和流程控制

Shell命令和流程控制 在shell脚本中可以使用三类命令: 1)Unix 命令: 虽然在shell脚本中可以使用任意的unix命令,但是还是由一些相对更常用的命令.这些命令通常是用来进行文件和文字操作的. 常用命令语法及功能 echo "some text": 将文字内容打印在屏幕上 ls: 文件列表 wc –l filewc -w filewc -c file: 计算文件行数 计算文件中的单词数 计算文件中的字符数 cp sourcefile destfile: 文件拷贝 mv

深入Redis命令的执行过程

深入Redis命令的执行过程 Redis 服务器: Redis 服务器实现与多个客户端的连接,并处理这些客户端发送过来的请求,同时保存客户端执行命令所产生的数据到数据库中.Redis 服务器依靠资源管理器来维持自身的运转,其主要作用是管理 Redis 服务. 服务器处理命令的过程 我们向客户端发送了一条命令:SET city〝beijing〝 第一步 用户将命令 SET city〝beijing〝输入客户端,客户端接收到此命令. 第二步 客户端会先将接收到的命令转化为服务器可以识别的协议格式,然

如何编写自定义标签?具体流程与分析(自定义标签快速入门)

1.自定义标签简介 自定义标签主要用于移除Jsp页面中的java代码 使用自定义标签移除jsp页面中的java代码,只需要完成以下两个步骤: 1.编写一个实现Tag接口的Java类(标签处理器类) 2.在WEB-INF中编写标签库描述符(tld)文件,在tld文件中对标签处理器类描述成一个标签 (参考tomcat中的examples 项目中jsp部分) 2.自定义标签的执行流程的分析 JSP引擎将遇到自定义标签时,首先创建标签处理器类的实例对象,然后按照JSP规范定义的通信规则依次调用它的方法.