redis源码分析(2)——事件循环

redis作为服务器程序,网络IO处理是关键。redis不像memcached使用libevent,它实现了自己的IO事件框架,并且很简单、小巧。可以选择select、epoll、kqueue等实现。

作为 IO事件框架,需要抽象多种IO模型的共性,将整个过程主要抽象为:

1)初始化

2)添加、删除事件

3)等待事件发生

下面也按照这个步骤分析代码。

(1)初始化

回忆一下redis的初始化过程中,initServer函数会调用aeCreateEventLoop创建event loop对象,对事件循环进行初始化。下面看一下aeEventLoop结构,存储事件循环相关的属性。

typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    // <MM>
    // 存放的是上次触发定时器事件的时间
    // </MM>
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    // <MM>
    // 所有定时器事件组织成链表
    // </MM>
    aeTimeEvent *timeEventHead;
    // <MM>
    // 是否停止eventLoop
    // </MM>
    int stop;
    void *apidata; /* This is used for polling API specific data */
    // <MM>
    // 事件循环每一次迭代都会调用beforesleep
    // </MM>
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;

setsize:指定事件循环要监听的文件描述符集合的大小。这个值与配置文件中得maxclients有关。

events:存放所有注册的读写事件,是大小为setsize的数组。内核会保证新建连接的fd是当前可用描述符的最小值,所以最多监听setsize个描述符,那么最大的fd就是setsize - 1。这种组织方式的好处是,可以以fd为下标,索引到对应的事件,在事件触发后根据fd快速查找到对应的事件。

fired:存放触发的读写事件。同样是setsize大小的数组。

timeEventHead:redis将定时器事件组织成链表,这个属性指向表头。

apidata:存放epoll、select等实现相关的数据。

beforesleep:事件循环在每次迭代前会调用beforesleep执行一些异步处理。

io模型初始化的抽象函数为aeApiCreate。aeCreateEventLoop函数创建并初始化全局事件循环结构,并调用aeApiCreate初始化具体实现依赖的数据结构。

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    // <MM>
    // setsize指定事件循环监听的fd的数目
    // 由于内核保证新创建的fd是最小的正整数,所以直接创建setsize大小
    // 的数组,存放对应的event
    // </MM>
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let‘s initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

以epoll为例,aeApiCreate主要是创建epoll的fd,以及要监听的epoll_event,这些数据定义在:

typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

这里,监听到的事件组织方式与event_loop中监听事件一样,同样是setsize大小的数据,以fd为下标。

aeApiCreate会初始化这些属性,并将aeApiState结构存放到eventLoop->apidata。

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state;
    return 0;
}

(2)添加、删除事件

redis支持两类事件,网络io事件和定时器事件。定时器事件的添加、删除相对简单些,主要是维护定时器事件列表。首先看一下表示定时器事件的结构:

/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *next;
} aeTimeEvent;

when_sec和when_ms:表示定时器触发的事件戳,在事件循环迭代返回后,如果当前时间戳大于这个值就会回调事件处理函数。

timeProc:事件处理函数。

finalizerProc:清理函数,在删除定时器时调用。

clientData:需要传入事件处理函数的参数。

next:定时器事件组织成链表,next指向下一个事件。

aeCreateTimeEvent函数用于添加定时器事件,逻辑很简单,根据当前时间计算下一次触发的事件,对事件属性赋值,并插入到定时器链表表头之前。删除通过aeDeleteTimeEvent函数,根据id找到事件并从链表删除该节点,回调清理函数。具体定时器事件的处理见后文,下面看一下io事件。

io事件的添加通过aeCreateFileEvent,逻辑很简单,根据要注册的fd,获取其event,设置属性,会调用aeApiAddEvent函数添加到底层的io模型。

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

mask:指定注册的事件类型,可以是读或写。

proc:事件处理函数。

下面是io事件的结构,包括注册的事件类型mask,读写事件处理函数,以及对应的参数。

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

下面看一下epoll添加事件的实现,主要是调用epoll_ctl。

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee;
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.u64 = 0; /* avoid valgrind warning */
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

struct epll_event用于指定要监听的事件,以及该文件描述符绑定的data,在事件触发时可以返回。这里将data直接存为fd,通过这个数据,便可以找到对应的事件,然后调用其处理函数。

      epoll的删除与添加类似,不再赘述。

(3)等待事件触发

通过调用aeMain函数进入事件循环:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

函数内部就是一个while循环,不断的调用aeProcessEvents函数,等待事件发生。在每次迭代前会调用会调用beforesleep函数,处理异步任务,后续会和serverCron一起介绍。

aeProcessEvents函数首先会处理定时器事件,然后是io事件,下面介绍这个函数的实现。

首先,声明变量记录处理的事件个数,以及触发的事件。flags表示此轮需要处理的事件类型,如果不需要处理定时器事件和io事件直接返回。

    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

redis中的定时器事件是通过epoll实现的。大体思路是,在每次事件迭代调用epoll_wait时需要指定此轮sleep的时间。如果没有io事件发生,则在sleep时间到了之后会返回。通过算出下一次最先发生的事件,到当前时间的间隔,用这个值设为sleep,这样就可以保证在事件到达后回调其处理函数。但是,由于每次返回后,还有处理io事件,所以定时器的触发事件是不精确的,一定是比预定的触发时间晚的。下面看下具体实现。

首先是,查找下一次最先发生的定时器事件,以确定sleep的事件。如果没有定时器事件,则根据传入的flags,选择是一直阻塞指导io事件发生,或者是不阻塞,检查完立即返回。通过调用aeSearchNearestTimer函数查找最先发生的事件,采用的是线性查找的方式,复杂度是O(n),可以将定时器事件组织成堆,加快查找。不过,redis中只有一个serverCron定时器事件,所以暂时不需优化。

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    // <MM>
    // 在两种情况下进入poll,阻塞等待事件发生:
    // 1)在有需要监听的描述符时(maxfd != -1)
    // 2)需要处理定时器事件,并且DONT_WAIT开关关闭的情况下
    // </MM>
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        // <MM>
        // 根据最快发生的定时器事件的发生时间,确定此次poll阻塞的时间
        // </MM>
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            // <MM>
            // 线性查找最快发生的定时器事件
            // </MM>
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            // <MM>
            // 如果有定时器事件,则根据它触发的时间,计算sleep的时间(ms单位)
            // </MM>
            long now_sec, now_ms;

            /* Calculate the time missing for the nearest
             * timer to fire. */
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            // <MM>
            // 如果没有定时器事件,则根据情况是立即返回,或者永远阻塞
            // </MM>
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

接着,调用aeApiPoll函数,传入前面计算的sleep时间,等待io事件放生。在函数返回后,触发的事件已经填充到eventLoop的fired数组中。epoll的实现如下,就是调用epoll_wait,函数返回后,会将触发的事件存放到state->events数组中的前numevents个元素。接下来,填充fired数组,设置每个触发事件的fd,以及事件类型。

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    // <MM>
    // 调用epoll_wait,state->events存放返回的发生事件的fd
    // </MM>
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        // <MM>
        // 有事件发生,将发生的事件存放于fired数组
        // </MM>
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

在事件返回后,需要处理事件。遍历fired数组,取得fd对应的事件,并根据触发的事件类型,回调其处理函数。

        for (j = 0; j < numevents; j++) {
            // <MM>
            // poll返回后,会将所有触发的时间存放于fired数组
            // </MM>
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

            /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn‘t
             * processed, so we check if the event is still valid. */
            // <MM>
            // 回调发生事件的fd,注册的事件处理函数
            // </MM>
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }

以上便是,io事件的处理,下面看一下定时器事件的处理。会调用processTimeEvents函数处理定时器事件。

首先会校验是否发生系统时钟偏差(system clock skew,修改系统事件会发生?把事件调到过去),如果发生就将所有事件的发生时间置为0,立即触发。

    /* If the system clock is moved to the future, and then set back to the
     * right value, time events may be delayed in a random way. Often this
     * means that scheduled operations will not be performed soon enough.
     *
     * Here we try to detect system clock skews, and force all the time
     * events to be processed ASAP when this happens: the idea is that
     * processing events earlier is less dangerous than delaying them
     * indefinitely, and practice suggests it is. */
    if (now < eventLoop->lastTime) {
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = 0;
            te = te->next;
        }
    }
    eventLoop->lastTime = now;

接下来遍历所有定时器事件,查找触发的事件,然后回调处理函数。定时器事件处理函数的返回值,决定这个事件是一次性的,还是周期性的。如果返回AE_NOMORE,则是一次性事件,在调用完后会删除该事件。否则的话,返回值指定的是下一次触发的时间。

    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    while(te) {
        long now_sec, now_ms;
        long long id;

        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        aeGetTime(&now_sec, &now_ms);
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            // <MM>
            // 定时器事件的触发时间已过,则回调注册的事件处理函数
            // </MM>
            int retval;

            id = te->id;
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;
            /* After an event is processed our time event list may
             * no longer be the same, so we restart from head.
             * Still we make sure to don‘t process events registered
             * by event handlers itself in order to don‘t loop forever.
             * To do so we saved the max ID we want to handle.
             *
             * FUTURE OPTIMIZATIONS:
             * Note that this is NOT great algorithmically. Redis uses
             * a single time event so it‘s not a problem but the right
             * way to do this is to add the new elements on head, and
             * to flag deleted elements in a special way for later
             * deletion (putting references to the nodes to delete into
             * another linked list). */
            // <MM>
            // 根据定时器事件处理函数的返回值,决定是否将该定时器删除。
            // 如果retval不等于-1(AE_NOMORE),则更改定时器的触发时间为
            // now + retval(ms)
            // </MM>
            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                // <MM>
                // 如果返回AE_NOMORE,则删除该定时器
                // </MM>
                aeDeleteTimeEvent(eventLoop, id);
            }
            te = eventLoop->timeEventHead;
        } else {
            te = te->next;
        }
    }

在回调处理函数时,有可能会添加新的定时器事件,如果不断加入,存在死循环的风险,所以需要避免这种情况,每次循环不处理新添加的事件,这是通过下面的代码实现的。

        if (te->id > maxId) {
            te = te->next;
            continue;
        }

事件循环部分分析到此结束,感觉比较直观、清晰,完全可以抽出来,作为一个独立的库使用。下面一节,会介绍请求的处理。

时间: 2024-08-25 23:16:22

redis源码分析(2)——事件循环的相关文章

redis源码分析3---结构体---字典

redis源码分析3---结构体---字典 字典,简单来说就是一种用于保存键值对的抽象数据结构: 注意,字典中每个键都是独一无二的:在redis中,内部的redis的数据库就是使用字典作为底层实现的: 1 字典的实现 在redis中,字典是使用哈希表作为底层实现的,一个hash表里面可以有多个hash表节点,而每个hash表节点就保存了字典中的一个键值对: hash表定义 table属性是一个数组,数组中的每个元素都是一个指向dictEntry结构的指针,每个dictEntry结构保存着一个键值

redis源码分析4---结构体---跳跃表

redis源码分析4---结构体---跳跃表 跳跃表是一种有序的数据结构,他通过在每个节点中维持多个指向其他节点的指针,从而达到快速访问节点的目的: 跳跃表支持平均O(logN),最坏O(N)复杂度的节点查找,还可以通过顺序性操作来批量处理节点.性能上和平衡树媲美,因为事先简单,常用来代替平衡树. 在redis中,只在两个地方使用了跳跃表,一个是实现有序集合键,另一个是在集群节点中用作内部数据结构. 1 跳跃表节点 1.1 层 层的数量越多,访问其他节点的速度越快: 1.2 前进指针 遍历举例

redis 源码分析(一) 内存管理

一,redis内存管理介绍 redis是一个基于内存的key-value的数据库,其内存管理是非常重要的,为了屏蔽不同平台之间的差异,以及统计内存占用量等,redis对内存分配函数进行了一层封装,程序中统一使用zmalloc,zfree一系列函数,其对应的源码在src/zmalloc.h和src/zmalloc.c两个文件中,源码点这里. 二,redis内存管理源码分析 redis封装是为了屏蔽底层平台的差异,同时方便自己实现相关的函数,我们可以通过src/zmalloc.h 文件中的相关宏定义

monkey源码分析之事件注入方法变化

在上一篇文章<Monkey源码分析之事件注入>中,我们看到了monkey在注入事件的时候用到了<Monkey源码分析番外篇之Android注入事件的三种方法比较>中的第一种方法,通过Internal API的WindowManager的injectKeyEvent之类的方法注入事件.这种方法在android api level 16也就是android4.1.2之后已经发生了变化: 在此之后注入事件的方式变成了使用InputManager的injectInputEvent方法了 而

redis源码分析之事务Transaction(下)

接着上一篇,这篇文章分析一下redis事务操作中multi,exec,discard三个核心命令. 原文地址:http://www.jianshu.com/p/e22615586595 看本篇文章前需要先对上面文章有所了解: redis源码分析之事务Transaction(上) 一.redis事务核心命令简介 redis事务操作核心命令: //用于开启事务 {"multi",multiCommand,1,"sF",0,NULL,0,0,0,0,0}, //用来执行事

redis源码分析之内存布局

redis源码分析之内存布局 1. 介绍 众所周知,redis是一个开源.短小.高效的key-value存储系统,相对于memcached,redis能够支持更加丰富的数据结构,包括: 字符串(string) 哈希表(map) 列表(list) 集合(set) 有序集(zset) 主流的key-value存储系统,都是在系统内部维护一个hash表,因为对hash表的操作时间复杂度为O(1).如果数据增加以后,导致冲突严重,时间复杂度增加,则可以对hash表进行rehash,以此来保证操作的常量时

Redis源码分析(二十三)--- CRC循环冗余算法和RAND随机数算法

今天开始研究Redis源码中的一些工具类的代码实现,工具类在任何语言中,实现的算法原理应该都是一样的,所以可以借此机会学习一下一些比较经典的算法.比如说我今天看的Crc循环冗余校验算法和rand随机数产生算法. CRC算法全称循环冗余校验算法.CRC校验的基本思想是利用线性编码理论,在发送端根据要传送的k位二进制码序列,以一定的规则产生一个校验用的监督码(既CRC码)r位,并附在信息后边,构成一个新的二进制码序列数共(k+r)位,最后发送出去.在接收端, 则根据信息码和CRC码之间所遵循的规则进

Redis源码分析(一)--Redis结构解析

从今天起,本人将会展开对Redis源码的学习,Redis的代码规模比较小,非常适合学习,是一份非常不错的学习资料,数了一下大概100个文件左右的样子,用的是C语言写的.希望最终能把他啃完吧,C语言好久不用,快忘光了.分析源码的第一步,先别急着想着从哪开始看起,先浏览一下源码结构,可以模块式的渐入,不过比较坑爹的是,Redis的源码全部放在在里面的src目录里,一下90多个文件统统在里面了,所以我选择了拆分,按功能拆分,有些文件你看名字就知道那是干什么的.我拆分好后的而结果如下: 11个包,这样每

redis源码分析(1)--makefile和目录结构分析

一.redis源码编译 redis可以直接在官网下载(本文使用版本 3.0.7):https://redis.io/download 安装: $ tar xzf redis-3.0.7.tar.gz $ cd redis-3.0.7 $ make make执行以后主要编译产物在src/redis-server src/redis-cli 如果想把redis-server直接install到可执行目录/usr/local/bin,还需要执行: $ make install Run Redis wi

Monkey源码分析之事件注入

本系列的上一篇文章<Monkey源码分析之事件源>中我们描述了monkey是怎么从事件源取得命令,然后将命令转换成事件放到事件队列里面的,但是到现在位置我们还没有了解monkey里面的事件是怎么一回事,本篇文章就以这个问题作为切入点,尝试去搞清楚monkey的event架构是怎么样的,然后为什么是这样架构的,以及它又是怎么注入事件来触发点击等动作的. 在看这篇文章之前,希望大家最好先去看下另外几篇博文,这样理解起来就会更容易更清晰了: <Monkey源码分析番外篇之Android注入事件