Redis源码-事件库

网上看了很多Redis事件库的解读,自己也研究了好几遍,还是记录下来,虽然水平有限,但是进步总会是有的

网络事件库封装了Epoll的操作(当然是指Linux下的多路复用了),并且实现一个定时器,定时器也是服务端程序的基石,很多问题都需要靠定时器解决

(一)数据结构+算法构成一个完整的程序,要一窥Redis网络库,需要先从数据结构开始学习

1.整个事件循环是用一个全局的数据结构描述的,aeEventLoop

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;

maxfd:维护的注册事件的最大fd

setsize:事件数的个数,这也是文件事件数组和就绪事件数组的最大值。对每一个fd进行的所有操作都需要进行边界检查

timeEventNextId:每加入一个时间事件,都需要给它一个ID,时间事件链虽然不是有序的,但是这个ID是一直自增的,这个就是最大的ID

LastTime:用来修正系统时间的

events和fired:文件事件,就绪事件

stop:开关

apidata:不同实现代表不同,epoll里面是这样一个数据结构

typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

beforesleep:每次进入主循环都要执行的,这里会做很多的事情,具体后面会遇到

2.文件事件

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

可以看到文件事件维护了回调和相应fd的标志,这里可能就会好奇为什么没有维护fd呢,因为fd是存在就绪事件结构中的

3.就绪事件

/* A fired event */
typedef struct aeFiredEvent {
    int fd;
    int mask;
} aeFiredEvent;

就绪事件数组的下标就是自己维护的fd,同时通过这个fd,也可以找到对应的回调,这里先贴上这部分代码

aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fe->wfileProc(eventLoop,fd,fe->clientData,mask);

4.时间事件

这里目前我觉得redis的实现不够完美,当然文档的注释中也提到了,使用链表去维护时间事件,查找的复杂度就会0(n),听别人说可以用小根堆实现,目前就简单分析一下

/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *next;
} aeTimeEvent;

数据结构分析完了,接下来就看它的实现了,主循环部分:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

逻辑都在aeProcessEvents:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            /* Calculate the time missing for the nearest
             * timer to fire. */
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

	    /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

代码有点长,其实抽象出来就三个步骤:

1根据flag获取epoll_wait等待的时间,有这样几种情况,

如果有时间事件,那么就从事件事件中找最快超时的时间,并等待这个时间,这个策略很巧妙

如果设置为不等待,那么就立马返回

如果设置为其它标志,就永久阻塞直到触发事件

2.等到事件发生,并根据回调处理事件

3.处理时间事件

其中aeApiPoll的实现也就是封装了epoll_wait

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

既然这里有封装epoll_wait,必然想去看看epoll_ctl和epoll_create的封装了,如下封装了创建Epoll句柄

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state;
    return 0;
}

而epoll_ctl的封装就是系统需要暴露给外界的接口,即创建文件事件和时间事件的接口

例如:创建一个文件事件

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

这里首先判断fd的值,前面有说过,然后取出fe,根据mask设置fe的值,并且调用aeApiEvent,里面才是调用了epoll_ctl

最后还要记得擦屁股,可能要修改一下maxfd

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee;
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.u64 = 0; /* avoid valgrind warning */
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

首先判断fd是否已近注册,没有就增加,有就需要修改,然后进行注册,这里底层接口的调用都是在直接上层,即ae_epoll进行处理的,上上层,即ae层只是对本层数据结构的维护,这种代码逻辑很严密,可见作者水平

再比如:删除一个文件事件

void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
    if (fd >= eventLoop->setsize) return;
    aeFileEvent *fe = &eventLoop->events[fd];
    if (fe->mask == AE_NONE) return;

    aeApiDelEvent(eventLoop, fd, mask);
    fe->mask = fe->mask & (~mask);
    if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
        /* Update the max fd */
        int j;

        for (j = eventLoop->maxfd-1; j >= 0; j--)
            if (eventLoop->events[j].mask != AE_NONE) break;
        eventLoop->maxfd = j;
    }
}

首先判断fd是否未注册,未注册就直接返回了,注册了就删除fd上的事件,然后对fe进行处理,最后也有可能要修改maxfd的值

至于aeApiDelEvent的实现

static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee;
    int mask = eventLoop->events[fd].mask & (~delmask);

    ee.events = 0;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.u64 = 0; /* avoid valgrind warning */
    ee.data.fd = fd;
    if (mask != AE_NONE) {
        epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
    } else {
        /* Note, Kernel < 2.6.9 requires a non null event pointer even for
         * EPOLL_CTL_DEL. */
        epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
    }
}

可以看到,只是对fd进行修改注册或者删除上面的事件

处理时间事件

static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;
    time_t now = time(NULL);

    if (now < eventLoop->lastTime) {
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = 0;
            te = te->next;
        }
    }
    eventLoop->lastTime = now;

    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    while(te) {
        long now_sec, now_ms;
        long long id;

        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        aeGetTime(&now_sec, &now_ms);
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;

            id = te->id;
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;

            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                aeDeleteTimeEvent(eventLoop, id);
            }
            te = eventLoop->timeEventHead;
        } else {
            te = te->next;
        }
    }
    return processed;
}

其实就是搜索,然后处理,这里处理的时候把这个节点删除了,在aeDeleteTimeEvent中如下

int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
{
    aeTimeEvent *te, *prev = NULL;

    te = eventLoop->timeEventHead;
    while(te) {
        if (te->id == id) {
            if (prev == NULL)
                eventLoop->timeEventHead = te->next;
            else
                prev->next = te->next;
            if (te->finalizerProc)
                te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            return AE_OK;
        }
        prev = te;
        te = te->next;
    }
    return AE_ERR; /* NO event with the specified ID found */
}

目前就分析到这里,带有几个问题后面再去阅读源码:

beforesleep到底干了什么?

真个程序的流程,包括网络连接那部分又是如何组织到Epoll中的?

时间: 2024-12-26 09:30:15

Redis源码-事件库的相关文章

Redis源码阅读(一)事件机制

Redis源码阅读(一)事件机制 Redis作为一款NoSQL非关系内存数据库,具有很高的读写性能,且原生支持的数据类型丰富,被广泛的作为缓存.分布式数据库.消息队列等应用.此外Redis还有许多高可用特性,包括数据持久化,主从模式备份等等,可以满足对数据完整有一定要求的场景. 而且Redis的源码结构简单清晰,有大量材料可以参阅:通过阅读Redis源码,掌握一些常用技术在Redis中的实现,相信会对个人编程水平有很大帮助.这里记录下我阅读Redis源码的心得.从我自己比较关心的几个技术点出发,

如何阅读 Redis 源码?ZZ

原文链接 在这篇文章中, 我将向大家介绍一种我认为比较合理的 Redis 源码阅读顺序, 希望可以给对 Redis 有兴趣并打算阅读 Redis 源码的朋友带来一点帮助. 第 1 步:阅读数据结构实现 刚开始阅读 Redis 源码的时候, 最好从数据结构的相关文件开始读起, 因为这些文件和 Redis 中的其他部分耦合最少, 并且这些文件所实现的数据结构在大部分算法书上都可以了解到, 所以从这些文件开始读是最轻松的.难度也是最低的. 下表列出了 Redis 源码中, 各个数据结构的实现文件: 文

Redis源码学习:字符串

Redis源码学习:字符串 1.初识SDS 1.1 SDS定义 Redis定义了一个叫做sdshdr(SDS or simple dynamic string)的数据结构.SDS不仅用于 保存字符串,还用来当做缓冲区,例如AOF缓冲区或输入缓冲区等.如下所示,整数len和free分别表示buf数组中已使用的长度和剩余可用的长度,buf是一个原生C字符串,以\0结尾. sds就是sdshdr中char buf[]的别名,后面能看到,各种操作函数的入参和返回值都是sds而非sdshdr.那sdshd

redis源码分析(1)--makefile和目录结构分析

一.redis源码编译 redis可以直接在官网下载(本文使用版本 3.0.7):https://redis.io/download 安装: $ tar xzf redis-3.0.7.tar.gz $ cd redis-3.0.7 $ make make执行以后主要编译产物在src/redis-server src/redis-cli 如果想把redis-server直接install到可执行目录/usr/local/bin,还需要执行: $ make install Run Redis wi

如何阅读 Redis 源码?(转)

在这篇文章中, 我将向大家介绍一种我认为比较合理的 Redis 源码阅读顺序, 希望可以给对 Redis 有兴趣并打算阅读 Redis 源码的朋友带来一点帮助. 第 1 步:阅读数据结构实现刚开始阅读 Redis 源码的时候, 最好从数据结构的相关文件开始读起, 因为这些文件和 Redis 中的其他部分耦合最少, 并且这些文件所实现的数据结构在大部分算法书上都可以了解到, 所以从这些文件开始读是最轻松的.难度也是最低的.下表列出了 Redis 源码中, 各个数据结构的实现文件: 文件内容 sds

Redis源码解析:13Redis中的事件驱动机制

Redis中,处理网络IO时,采用的是事件驱动机制.但它没有使用libevent或者libev这样的库,而是自己实现了一个非常简单明了的事件驱动库ae_event,主要代码仅仅400行左右. 没有选择libevent或libev的原因大概在于,这些库为了迎合通用性造成代码庞大,而且其中的很多功能,比如监控子进程,复杂的定时器等,这些都不是Redis所需要的. Redis中的事件驱动库只关注网络IO,以及定时器.该事件库处理下面两类事件: a:文件事件(file  event):用于处理Redis

为什么C/C++程序员都要阅读Redis源码之:Redis学习事件驱动设计

为什么我说C/C++程序员都要阅读Redis源码 主要原因就是『简洁』.如果你用源码编译过Redis,你会发现十分轻快,一步到位.其他语言的开发者可能不会了解这种痛,作为C/C++程序员,如果你源码编译安装过Nginx/Grpc/Thrift/Boost等开源产品,你会发现有很多依赖,而依赖本身又有依赖,十分痛苦.通常半天一天就耗进去了.由衷地羡慕 npm/maven/pip/composer/...这些包管理器.而Redis则给人惊喜,一行make了此残生. 除了安装过程简洁,代码也十分简洁.

redis 源码分析(一) 内存管理

一,redis内存管理介绍 redis是一个基于内存的key-value的数据库,其内存管理是非常重要的,为了屏蔽不同平台之间的差异,以及统计内存占用量等,redis对内存分配函数进行了一层封装,程序中统一使用zmalloc,zfree一系列函数,其对应的源码在src/zmalloc.h和src/zmalloc.c两个文件中,源码点这里. 二,redis内存管理源码分析 redis封装是为了屏蔽底层平台的差异,同时方便自己实现相关的函数,我们可以通过src/zmalloc.h 文件中的相关宏定义

Redis源码解析:15Resis主从复制之从节点流程

Redis的主从复制功能,可以实现Redis实例的高可用,避免单个Redis 服务器的单点故障,并且可以实现负载均衡. 一:主从复制过程 Redis的复制功能分为同步(sync)和命令传播(commandpropagate)两个操作: 同步操作用于将从节点的数据库状态更新至主节点当前所处的数据库状态: 命令传播操作则用于在主节点的数据库状态被修改,导致主从节点的数据库状态不一致时,让主从节点的数据库重新回到一致状态: 1:同步 当客户端向从节点发送SLAYEOF命令,或者从节点的配置文件中配置了