redis 文件事件模型

参考文献:

  1. 深入剖析 redis 事件驱动
  2. Redis 中的事件循环
  3. 深入了解epoll (转)
  4. Redis自己的事件模型 ae
  5. EPOLL(7)
  6. Linux IO模式及 select、poll、epoll详解
  7. epoll为什么这么快,epoll的实现原理

    概述

    在redis中,对于对于文件事件的处理采用了Reactor模型。总体来说,就是将io多路复用所监听到的文件去处,并放入一个队列中依次处理。接下去本文以一个io多路复用的例子开始,一步步还原redis文件事件的运行过程

epoll (本节从Linux IO模式及 select、poll、epoll详解摘抄)

epoll使用的过程中需要如下的三个接口:

int epoll_create(int size);//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

int epoll_create(int size)

创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。

当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

函数是对指定描述符fd执行op操作。

  • epfd:是epoll_create()的返回值。
  • op:表示op操作,用三个宏来表示:添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分别添加、删除和修改对fd的监听事件。
  • fd:是需要监听的fd(文件描述符)
  • epoll_event:是告诉内核需要监听什么事,struct epoll_event结构如下:
struct epoll_event {
  __uint32_t events;  /* Epoll events */
  epoll_data_t data;  /* User data variable */
};

//events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

等待epfd上的io事件,最多返回maxevents个事件。

参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

一个epoll的示例

有了上面的论述,用一个简单的例子来说明下epoll的使用(来自http://man7.org/linux/man-pages/man7/epoll.7.html):

#define MAX_EVENTS 10
struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, conn_sock, nfds, epollfd;

/* Code to set up listening socket, ‘listen_sock‘,
   (socket(), bind(), listen()) omitted */

epollfd = epoll_create1(0);
if (epollfd == -1) {
    perror("epoll_create1");
    exit(EXIT_FAILURE);
}

ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
    perror("epoll_ctl: listen_sock");
    exit(EXIT_FAILURE);
}

for (;;) {
    nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
    if (nfds == -1) {
        perror("epoll_wait");
        exit(EXIT_FAILURE);
    }

    for (n = 0; n < nfds; ++n) {
        if (events[n].data.fd == listen_sock) {
            conn_sock = accept(listen_sock,
                    (struct sockaddr *) &addr, &addrlen);
            if (conn_sock == -1) {
                perror("accept");
                exit(EXIT_FAILURE);
            }
            setnonblocking(conn_sock);
            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = conn_sock;
            if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
                        &ev) == -1) {
                perror("epoll_ctl: conn_sock");
                exit(EXIT_FAILURE);
            }
        } else {
            do_use_fd(events[n].data.fd);
        }
    }
}

如图所示,可以看出使用epoll的过程。接下来将介绍redis事件驱动模型中主要涉及的数据结构。

redis事件驱动模型

数据结构

redis事件驱动模型中主要涉及到如下的几个数据结构:

  1. aeCreateEventLoop
  2. aeApiState
  3. aeFileEvent
  4. aeFiredEvent

    redis事件处理的核心是aeCreateEventLoop结构,如图可以看出主要的结构体如下:

    ```

    typedef struct aeEventLoop {

    // 目前已注册的最大描述符

    int maxfd; /* highest file descriptor currently registered */

    // 目前已追踪的最大描述符

    int setsize; /* max number of file descriptors tracked */

    // 用于生成时间事件 id

    long long timeEventNextId;

    // 最后一次执行时间事件的时间

    time_t lastTime; /* Used to detect system clock skew */

    // 已注册的文件事件

    aeFileEvent events; / Registered events */

    // 已就绪的文件事件

    aeFiredEvent fired; / Fired events */

    // 时间事件

    aeTimeEvent *timeEventHead;

    // 事件处理器的开关

    int stop;

    // 多路复用库的私有数据

    void apidata; / This is used for polling API specific data */

    // 在处理事件前要执行的函数

    aeBeforeSleepProc *beforesleep;

} aeEventLoop;

其中aeFileEvent 结构体为已经注册并需要监听的事件的结构体。在redis初始化的时候会创建一个 setSize*sizeof(aeFileEvent) 以及一个 setSize*siezeof(aeFiredEvent) 大小的内存,用文件描述符作为其索引。那么这个大小定位多少合适呢?在Linux个中,文件描述符是个有限的资源,当打开一个文件时就会消耗一个文件描述符,当关闭该文件描述符或者程序结束时会释放该文件描述符资源,从而供其他文件打开操作使用。当文件描述符超过最大值后,打开文件就会出错。那么这个最大值是多少呢?可以通过/proc/sys/fs/file-max看到系统支持的最大的文件描述符数。通过 ulimit -n 可以看到当前用户能打开的最大的文件描述符。在我这里的一台8g内存的机器上,系统支持最大的文件描述是365146。而在这台64bit的机器上 sizeof(aeFiredEvent) + sizeof(aeFileEvent) 大小为40byte。按系统最大支持的文件描述符来算,固定消耗内存为14.6M。这样以文件描述符作为数组的下标来索引,虽然这样的哈希在接入量不大的情况下会有大量的浪费。但是最多也就浪费14M 的内存,因此这样的设计是可取的。【4】

typedef struct aeFileEvent {

// 监听事件类型掩码,
// 值可以是 AE_READABLE 或 AE_WRITABLE ,
// 或者 AE_READABLE | AE_WRITABLE
int mask; /* one of AE_(READABLE|WRITABLE) */

// 读事件处理器
aeFileProc *rfileProc;

// 写事件处理器
aeFileProc *wfileProc;

// 多路复用库的私有数据
void *clientData;

} aeFileEvent;


aeFiredEvent结构体是已经监听到有事件发生的描述符的集合。

typedef struct aeFiredEvent {

// 已就绪文件描述符
int fd;

// 事件类型掩码,
// 值可以是 AE_READABLE 或 AE_WRITABLE
// 或者是两者的或
int mask;

} aeFiredEvent;


void *apidata;在ae创建的时候,会被赋值为aeApiState结构体,结构体的定义如下:

typedef struct aeApiState {

// epoll_event 实例描述符
int epfd;

// 事件槽
struct epoll_event *events;

} aeApiState;

可以见得,这个结构体是为了epoll所准备的数据结构。redis可以选择不同的io多路复用方法。因此 apidata 是个void类型,根据不同的io多路复用库来选择。

## Reactor模型的创建与使用
###  aeEventLoop 的创建

aeEventLoop aeCreateEventLoop(int setsize) {

aeEventLoop eventLoop;

int i;

// 创建事件状态结构
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;

// 初始化文件事件结构和已就绪文件事件结构数组
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
// 设置数组大小
eventLoop->setsize = setsize;
// 初始化执行最近一次执行时间
eventLoop->lastTime = time(NULL);

// 初始化时间事件结构
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;

eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
if (aeApiCreate(eventLoop) == -1) goto err;

/* Events with mask == AE_NONE are not set. So let‘s initialize the
 * vector with it. */
// 初始化监听事件
for (i = 0; i < setsize; i++)
    eventLoop->events[i].mask = AE_NONE;

// 返回事件循环
return eventLoop;

err:

if (eventLoop) {

zfree(eventLoop->events);

zfree(eventLoop->fired);

zfree(eventLoop);

}

return NULL;

}

如下图所示,可以见得在初始化的时候创建结构体的流程。

graph LR

创建aeFileEvent-->创建aeFireEvent

创建aeFireEvent-->调用aeApiCreate创建aeApiState

函数aeApiCreate则创建了一个epoll所需要的数据结构。

/*

  • 创建一个新的 epoll 实例,并将它赋值给 eventLoop

    /

    static int aeApiCreate(aeEventLoop eventLoop) {

    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;

    // 初始化事件槽空间

    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);

    if (!state->events) {

    zfree(state);

    return -1;

    }

    // 创建 epoll 实例

    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */

    if (state->epfd == -1) {

    zfree(state->events);

    zfree(state);

    return -1;

    }

    // 赋值给 eventLoop

    eventLoop->apidata = state;

    return 0;

    }

###  aeFileEvent的注册
在创建了aeEventLoop之后,对于需要监听的文件描述符需要进行注册,在aeFileEvent结构体中,可以看到如下的两个结构aeFileProc *rfileProc和aeFileProc *wfileProc,就是在注册监听事件的时候进行赋值的。
函数aeCreateFileEvent执行创建aeFileEvent和添加文件句柄到epoll中。

/*

  • 根据 mask 参数的值,监听 fd 文件的状态,
  • 当 fd 可用时,执行 proc 函数

    /

    int aeCreateFileEvent(aeEventLoop eventLoop, int fd, int mask,

    aeFileProc proc, void clientData)

    {

    if (fd >= eventLoop->setsize) {

    errno = ERANGE;

    return AE_ERR;

    }

    if (fd >= eventLoop->setsize) return AE_ERR;

    // 取出文件事件结构

    aeFileEvent *fe = &eventLoop->events[fd];

    // 监听指定 fd 的指定事件

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)

    return AE_ERR;

    // 设置文件事件类型,以及事件的处理器

    fe->mask |= mask;

    if (mask & AE_READABLE) fe->rfileProc = proc;

    if (mask & AE_WRITABLE) fe->wfileProc = proc;

    // 私有数据

    fe->clientData = clientData;

    // 如果有需要,更新事件处理器的最大 fd

    if (fd > eventLoop->maxfd)

    eventLoop->maxfd = fd;

    return AE_OK;

    }

其中aeApiAddEvent函数就是在开头之中epoll例子中添加一个文件描述符到监听集合中的方法封装函数:

/*

  • 关联给定事件到 fd

    /

    static int aeApiAddEvent(aeEventLoop eventLoop, int fd, int mask) {

    aeApiState *state = eventLoop->apidata;

    struct epoll_event ee;

    /* If the fd was already monitored for some event, we need a MOD

    • operation. Otherwise we need an ADD operation.
    • 如果 fd 没有关联任何事件,那么这是一个 ADD 操作。
    • 如果已经关联了某个/某些事件,那么这是一个 MOD 操作。

      */

      int op = eventLoop->events[fd].mask == AE_NONE ?

      EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    // 注册事件到 epoll

    ee.events = 0;

    mask |= eventLoop->events[fd].mask; /* Merge old events /

    if (mask & AE_READABLE) ee.events |= EPOLLIN;

    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;

    ee.data.u64 = 0; / avoid valgrind warning */

    ee.data.fd = fd;

    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;

    return 0;

    }

### 事件驱动模型的运行过程
到这里redis事件驱动的主要数据结构和初始化的方法已经介绍完毕。接下来将展示事件驱动的运行过程。在redis源码中,省略去其他部分,跟事件驱动相关的代码如下:
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
/* Create an event handler for accepting new connections in TCP and Unix
 * domain sockets. */
// 为 TCP 连接关联连接应答(accept)处理器
// 用于接受并应答客户端的 connect() 调用
for (j = 0; j < server.ipfd_count; j++) {
    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
        acceptTcpHandler,NULL) == AE_ERR)
        {
            redisPanic(
                "Unrecoverable error creating server.ipfd file event.");
        }
}

// 为本地套接字关联应答处理器
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
    acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
aeMain(server.el);
其中aeCreateEventLoop和aeCreateFileEvent函数在之前已经介绍过。接下来重点介绍下aeMain函数:

/*

  • 事件处理器的主循环

    /

    void aeMain(aeEventLoop eventLoop) {

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

    // 如果有需要在事件处理前执行的函数,那么运行它
    if (eventLoop->beforesleep != NULL)
        eventLoop->beforesleep(eventLoop);
    
    // 开始处理事件
    aeProcessEvents(eventLoop, AE_ALL_EVENTS);

    }

    }

    ```

    我们可以看出,aeMain函数中主要调用了aeProcessEvents处理事件,aeProcessEvents中我们略去其他的代码,主要关注如下的部分:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    ....
    // 处理文件事件,阻塞时间由 tvp 决定
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            // 从已就绪数组中获取事件
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

           /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn‘t
             * processed, so we check if the event is still valid. */
            // 读事件
            if (fe->mask & mask & AE_READABLE) {
                // rfired 确保读/写事件只能执行其中一个
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 写事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }

            processed++;
        }
    ....
}

可以看出函数aeProcessEvents调用了aeApiPoll获取已经就绪的事件。在for循环中,从eventLoop->fired(已经就绪的事件)中取出事件结构体,然后根据是读时间还是写事件进行处理。在aeApiPoll中,就可以看到我们熟悉的

epoll_wait的身影。可以见得通过调用系统的epoll_wait函数,然后将已经就绪的事件放入 eventLoop->fired中。

/*
 * 获取可执行事件
 */
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    // 等待时间
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);

    // 有至少一个事件就绪?
    if (retval > 0) {
        int j;

        // 为已就绪事件设置相应的模式
        // 并加入到 eventLoop 的 fired 数组中
        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;

            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }

    // 返回已就绪事件个数
    return numevents;
}

到这里还有一个疑问,在redis初始化的时候只注册了tcp和本地套接字的描述符,那么当有个新的客户端连接进来的时候,是怎么将客户端的描述符加到监听队列里面的呢?答案就在最开始的acceptTcpHandler函数中。在这个函数中依次调用了acceptCommonHandler->createClient->aeCreateFileEvent函数。可以见得当监听的一个tcp或者本地socket产生了connect 事件的时候,就会依次调用这些函数,然后将新的客户端端描述符加入监听中。

总结

redis的事件驱动模型分析就到这里,总体而言还是比较直观的。这中间也学习了很多,包括epoll的原理等。

原文地址:https://www.cnblogs.com/bush2582/p/8968997.html

时间: 2024-10-01 17:17:56

redis 文件事件模型的相关文章

Redis与Memcached区别?Redis线程模型?

一.redis 和 memcached 有啥区别? 1. redis 支持复杂的数据结构 redis 相比 memcached 来说,拥有更多的数据结构,能支持更丰富的数据操作.如果需要缓存能够支持更复杂的结构和操作, redis 会是不错的选择. 2. redis 原生支持集群模式 在 redis3.x 版本中,便能支持 cluster 模式,而 memcached 没有原生的集群模式,需要依靠客户端来实现往集群中分片写入数据. 3. 性能对比 由于 redis 只使用单核,而 memcach

redis单线程模型分析

redis原理 redis采用自己实现的事件分离器,效率比较高,内部采用非阻塞的执行方式,吞吐能力比较大. 不过,因为一般的内存操作都是简单存取操作,线程占用时间相对较短,主要问题在io上,因此,redis这种模型是合适的,但是如果某一个线程出现问题导致线程占用很长时间,那么reids的单线程模型效率可想而知. 引自网络: 总体来说快速的原因如下: 1)绝大部分请求是纯粹的内存操作(非常快速) 2)采用单线程,避免了不必要的上下文切换和竞争条件 3)非阻塞IO 内部实现采用epoll,采用了ep

精讲Redis内存模型

前言 Redis是目前最火爆的内存数据库之一,通过在内存中读写数据,大大提高了读写速度,可以说Redis是实现网站高并发不可或缺的一部分. 我们使用Redis时,会接触Redis的5种对象类型(字符串.哈希.列表.集合.有序集合),丰富的类型是Redis相对于Memcached等的一大优势.在了解Redis的5种对象类型的用法和特点的基础上,进一步了解Redis的内存模型,对Redis的使用有很大帮助,例如: 1.估算Redis内存使用量.目前为止,内存的使用成本仍然相对较高,使用内存不能无所顾

【转】深入学习Redis(1):Redis内存模型

前言 Redis是目前最火爆的内存数据库之一,通过在内存中读写数据,大大提高了读写速度,可以说Redis是实现网站高并发不可或缺的一部分. 我们使用Redis时,会接触Redis的5种对象类型(字符串.哈希.列表.集合.有序集合),丰富的类型是Redis相对于Memcached等的一大优势.在了解Redis的5种对象类型的用法和特点的基础上,进一步了解Redis的内存模型,对Redis的使用有很大帮助,例如: 1.估算Redis内存使用量.目前为止,内存的使用成本仍然相对较高,使用内存不能无所顾

深入学习Redis(1):Redis内存模型

原文:深入学习Redis(1):Redis内存模型 前言 Redis是目前最火爆的内存数据库之一,通过在内存中读写数据,大大提高了读写速度,可以说Redis是实现网站高并发不可或缺的一部分. 我们使用Redis时,会接触Redis的5种对象类型(字符串.哈希.列表.集合.有序集合),丰富的类型是Redis相对于Memcached等的一大优势.在了解Redis的5种对象类型的用法和特点的基础上,进一步了解Redis的内存模型,对Redis的使用有很大帮助,例如: 1.估算Redis内存使用量.目前

Redis内存模型及应用解读 读后随笔

文章出处: Redis内存模型及应用解读 https://dbaplus.cn/news-158-2127-1.html 第一部分:Redis内存统计 随笔:这一部分略显枯燥,是通过redis-cli连接redis后对于info命令的结果字段解读,属于较底层的部分,熟悉redis在操作系统中的实现会更容易理解这部分. 这段对于我的帮助 1.redis进程运行本身会需要内存和内存碎片,同时redis中还存在虚拟内存 2.mem_fragmentation_ratio表示内存碎片比率,mem_fra

一文带你深入了解 Redis 内存模型

作者:编程迷思 链接:https://www.cnblogs.com/kismetv/p/8654978.html 前言 Redis是目前最火爆的内存数据库之一,通过在内存中读写数据,大大提高了读写速度,可以说Redis是实现网站高并发不可或缺的一部分. 我们使用Redis时,会接触Redis的5种对象类型(字符串.哈希.列表.集合.有序集合),丰富的类型是Redis相对于Memcached等的一大优势.在了解Redis的5种对象类型的用法和特点的基础上,进一步了解Redis的内存模型,对Red

redis线程模型

0. 前言 Redis 基于 Reactor 模式开发了自己的网络事件处理器: 这个处理器被称为文件事件处理器(file event handler): 文件事件处理器使用 I/O 多路复用(multiplexing)程序来同时监听多个套接字, 并根据套接字目前执行的任务来为套接字关联不同的事件处理器. 当被监听的套接字准备好执行连接应答(accept).读取(read).写入(write).关闭(close)等操作时, 与操作相对应的文件事件就会产生, 这时文件事件处理器就会调用套接字之前关联

【Reactor】学习redis线程模型有感

https://my.oschina.net/u/1859679/blog/1844109 该模式可以有简单实现.也可以多线程实现.要看场景,比喻redis的实现就是简单,因为都是基于内存操作. 学习下! 基于Reactor Pattern 处理模式中,定义以下三种角色: Reactor 将I/O事件分派给对应的Handler Acceptor 处理客户端新连接,并分派请求到处理器链中 Handlers 执行非阻塞读/写 任务 在 Douglas Schmidt 的大作<POSA2>中有关于事