对于redis框架的理解(三)

上一篇讲完了initServer的大体流程,其中aeCreateEventLoop(),这个函数

没有详细说明,我们在这一篇里讲述Ae.h和Ae.c, 这里面的api阐述了如何创建

eventLoop和添加文件读写事件等等。

ae.h中的解释

//文件读写事件回调函数
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);

//定时器回调函数
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
//事件结束回调函数,析构一些资源
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
//不是很清楚,应该是进程结束前做的回调函数
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);

//文件事件回调函数
 typedef struct aeFileEvent {
     int mask; /* one of AE_(READABLE|WRITABLE) */ //文件事件类型 读/写
     aeFileProc *rfileProc;
     aeFileProc *wfileProc;
     void *clientData;
 } aeFileEvent;

 /* A fired event */
 typedef struct aeFiredEvent {
     int fd;    ////已出现的事件的文件号对应的事件描述在aeEventLoop.events[]中的下标
     int mask;  //文件事件类型 AE_WRITABLE||AE_READABLE
 } aeFiredEvent;

 typedef struct aeTimeEvent {
     long long id; /* time event identifier. */  //由aeEventLoop.timeEventNextId进行管理
     long when_sec; /* seconds */
     long when_ms; /* milliseconds */
     aeTimeProc *timeProc;
     aeEventFinalizerProc *finalizerProc;
     void *clientData;
     struct aeTimeEvent *next;
 } aeTimeEvent;

 /* State of an event based program */
 typedef struct aeEventLoop {
     int maxfd;   //监听的最大文件号
     int setsize; //跟踪的文件描述符最大数量
     long long timeEventNextId;     //定时器事件的ID编号管理(分配ID号所用)
     time_t lastTime;     /* Used to detect system clock skew */
     aeFileEvent *events; //注册的文件事件,这些是需要进程关注的文件
     aeFiredEvent *fired;  //poll结果,待处理的文件事件的文件号和事件类型
     aeTimeEvent *timeEventHead; //定时器时间链表
     int stop;   //时间轮询是否结束?
     void *apidata;  //polling API 特殊的数据
     aeBeforeSleepProc *beforesleep;  //休眠前的程序
 } aeEventLoop;

 /* Prototypes */
 //创建eventLoop结构
 aeEventLoop *aeCreateEventLoop(int setsize);
 //删除eventloop
 void aeDeleteEventLoop(aeEventLoop *eventLoop);
 //事件派发停止
 void aeStop(aeEventLoop *eventLoop);
 //添加文件读写事件
 int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
     aeFileProc *proc, void *clientData);
 //删除文件读写事件
 void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
 //获取文件事件对应类型(读或写)
 int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
//创建定时器事件
 long long aeCreateTimeEvent(aeEventLoop *eventLoop,
     long long milliseconds,aeTimeProc *proc, void *clientData,
     aeEventFinalizerProc *finalizerProc);
 //删除定时器事件
 int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
 //派发事件
 int aeProcessEvents(aeEventLoop *eventLoop, int flags);
 //等待millionseconds直到文件描述符可读或者可写
 int aeWait(int fd, int mask, long long milliseconds);
 //ae事件轮询主函数
 void aeMain(aeEventLoop *eventLoop);
 //获取当前网络模型
 char *aeGetApiName(void);
 //进程休眠前回调函数
 void aeSetBeforeSleepProc(aeEventLoop *eventLoop,
     aeBeforeSleepProc *beforesleep);
 //获取eventloop所有的事件个数
 int aeGetSetSize(aeEventLoop *eventLoop);
 //重新设置eventloop事件个数
 int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);

ae.cpp中,一个函数一个函数解析

//定义了几个宏,根据不同的宏加载
//不同的网络模型
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif

aeCreateEventLoop,主要负责eventloop结构的创建和初始化,以及模型的初始化

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;
    //创建eventloop
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    //为进程要注册的文件开辟空间
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    //为激活的要处理的文件开辟空间
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    //开辟失败报错
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    //设置监听事件总数
    eventLoop->setsize = setsize;
    //更新为当前时间
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    //将不同模式的api注册到eventloop里
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let‘s initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        //将所有文件事件类型初始为空
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

//事件队列大小和重置

//获取eventloop事件队列大小
int aeGetSetSize(aeEventLoop *eventLoop) {
    return eventLoop->setsize;
}

//重新设置大小
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
    int i;

    if (setsize == eventLoop->setsize) return AE_OK;
    if (eventLoop->maxfd >= setsize) return AE_ERR;
    //不同的网络模型调用不同的resize
    if (aeApiResize(eventLoop,setsize) == -1) return AE_ERR;
    //重新开辟空间
    eventLoop->events = zrealloc(eventLoop->events,sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zrealloc(eventLoop->fired,sizeof(aeFiredEvent)*setsize);
    eventLoop->setsize = setsize;

    /* Make sure that if we created new slots, they are initialized with
     * an AE_NONE mask. */
    //重新初始化事件类型
    for (i = eventLoop->maxfd+1; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return AE_OK;
}

删除eventloop和stop事件轮询

//删除eventloop结构
void aeDeleteEventLoop(aeEventLoop *eventLoop) {
    aeApiFree(eventLoop);
    zfree(eventLoop->events);
    zfree(eventLoop->fired);
    zfree(eventLoop);
}

//设置eventloop停止标记
void aeStop(aeEventLoop *eventLoop) {
    eventLoop->stop = 1;
}

创建监听事件

//创建监听事件
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
                      aeFileProc *proc, void *clientData)
{
    //判断fd大于eventloop设置的事件队列大小
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }

    //取出对应的aeFileEvent事件
    aeFileEvent *fe = &eventLoop->events[fd];

    //添加读写事件到不同的模型
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    //文件类型按位或
    fe->mask |= mask;
    //根据最终的类型设置读写回调函数
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    //fe中读写操作的clientdata
    fe->clientData = clientData;
    //如果fd大于当前最大的eventLoop maxfdfd
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

删除监听事件

void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
    if (fd >= eventLoop->setsize) return;
    aeFileEvent *fe = &eventLoop->events[fd];
    if (fe->mask == AE_NONE) return;
    //网络模型里删除对应的事件
    aeApiDelEvent(eventLoop, fd, mask);
    //清除对应的类型标记
    fe->mask = fe->mask & (~mask);
    //如果删除的fd是maxfd,并且对应的事件为空,那么更新maxfd
    if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
        /* Update the max fd */
        int j;

        for (j = eventLoop->maxfd-1; j >= 0; j--)
            if (eventLoop->events[j].mask != AE_NONE) break;
        eventLoop->maxfd = j;
    }
}
//获取文件类型
int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
    if (fd >= eventLoop->setsize) return 0;
    aeFileEvent *fe = &eventLoop->events[fd];
    //返回对应的类型标记
    return fe->mask;
}

事件派发函数

//派发事件的函数
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    //为了休眠,直到有时间事件触发,即便是没有文件事件处理,我们也会
    //调用对应的事件时间
    //这部分不是很清楚,知道大体意思是设置时间,
    //为了aeApiPoll设置等待的时间
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            /* Calculate the time missing for the nearest
             * timer to fire. */
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }
        //调用不同的网络模型poll事件
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            //轮询处理就绪事件
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;
            //可读就绪事件
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            //可写就绪事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    /* Check time events */
    //处理所有定时器事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}
/等待millionseconds,直到有可读或者可写事件触发
int aeWait(int fd, int mask, long long milliseconds) {
    struct pollfd pfd;
    int retmask = 0, retval;

    memset(&pfd, 0, sizeof(pfd));
    pfd.fd = fd;
    if (mask & AE_READABLE) pfd.events |= POLLIN;
    if (mask & AE_WRITABLE) pfd.events |= POLLOUT;

    if ((retval = poll(&pfd, 1, milliseconds))== 1) {
        if (pfd.revents & POLLIN) retmask |= AE_READABLE;
        if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
    if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;
        if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
        return retmask;
    } else {
        return retval;
    }
}                                    
//ae主函数
void aeMain(aeEventLoop *eventLoop) {
    //stop初始为0
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        //调用beforesleep函数
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        //派发所有的事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

//获取api名字
char *aeGetApiName(void) {
    return aeApiName();
}

//sleep之前的回调函数
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
    eventLoop->beforesleep = beforesleep;
}

这就是ae文件里大体的几个api,其他的没理解的还在研究。

我的微信公众号:

时间: 2024-10-19 17:32:29

对于redis框架的理解(三)的相关文章

Redis 小白指南(三)- 事务、过期、消息通知、管道和优化内存空间

Redis 小白指南(三)- 事务.过期.消息通知.管道和优化内存空间 简介 <Redis 小白指南(一)- 简介.安装.GUI 和 C# 驱动介绍> 讲的是 Redis 的介绍,以及如何在 Windows 上安装并使用,一些 GUI 工具和自己简单封装的 RedisHelper. <Redis 小白指南(二)- 聊聊五大类型:字符串.散列.列表.集合和有序集合>讲的是 Redis 中最核心的内容,最常用的就是和数据类型打交道. 目录 事务 过期时间 消息通知 管道 优化内存空间

ServiceStack.Redis之IRedisClient&lt;第三篇&gt;

事实上,IRedisClient里面的很多方法,其实就是Redis的命令名.只要对Redis的命令熟悉一点就能够非常快速地理解和掌握这些方法,趁着现在对Redis不是特别了解,我也对着命令来了解一下这些方法. 一.属性 IRedisClient的属性如下: 属性 说明 ConnectTimeout  连接超时 Db 当前数据库的ID或下标 DbSize  当前数据库的 key 的数量 HadExceptions    Hashes  存储复杂对象,一个value中有几个field  Host 

Redis笔记整理(三):进阶操作与高级部分

[TOC] Redis笔记整理(三):进阶操作与高级部分 Redis发布订阅 Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息. Redis客户端可以订阅任意数量的频道. 下图展示了频道channel1,以及订阅这个频道的三个客户端--client1,client2,client5之间的关系. 当有新消息通过PUBLISH命令发送给频道channel1时,这个消息就会被发送给订阅它的三个客户端: 相关操作命令如下: 命令 描述 PSUBS

redis 单线程的理解

1. redis单线程问题 单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程. 2. 为什么说redis能够快速执行 (1) 绝大部分请求是纯粹的内存操作(非常快速) (2) 采用单线程,避免了不必要的上下文切换和竞争条件 (3) 非阻塞IO - IO多路复用 3. redis的内部实现 内部实现采用epoll,采用了epoll+自己实现的简单的事件框架.epoll中的读.写.关闭.连接都转化成了事件,然后利用epoll的多路复

BEGINNING SHAREPOINT&amp;#174; 2013 DEVELOPMENT 第2章节--SharePoint 2013 App 模型概览 理解三个SharePoint 部署模型 Apps

BEGINNING SHAREPOINT? 2013 DEVELOPMENT 第2章节--SharePoint 2013 App 模型概览 理解三个SharePoint 部署模型 Apps 由于SharePoint 2013 正逐步移动到云,有三类部署模型可用来帮助你完毕这个目标(关于SharePoint Apps): SharePoint-hosted: Autohosted: Provider-hosted: 每一类部署模型都含有特色,使它成为针对不同类型App开发的理想的(选择).以下的部

spring框架学习(三)junit单元测试

spring框架学习(三)junit单元测试 单元测试不是头一次听说了,但只是听说从来没有用过.一个模块怎么测试呢,是不是得专门为一单元写一个测试程序,然后将测试单元代码拿过来测试? 我是这么想的.学到spring框架这才知道单元测试原来是这么回事儿. 下面以上一篇文章中set注入的第一个实例为测试对象.进行单元测试. 1,拷贝jar包 junit-3.8.2.jar(4.x主要增加注解应用) 2,写业务类 [java] view plaincopyprint? public class Use

对于iOS前端框架的理解

iOS前端的架构设计,我所理解的意思就是,使用什么样的模式或者结构敲代码就是各个类怎么协同工作的.或者文件存放的结构.各个类到底如何分工的. 国外有好多关于iOS端的架构的文章,无奈实在英语水平有限,只看了几篇被翻译过的文章.MVC.MVVM.VIPER等还有几个记不住名字的,但是无论什么架构感觉理解的都不是很深刻,具体写代码的时候也没有分的太清楚,也是怎么方便怎么来.最后导致了来回几次的重构,但是重构完成后,过一段时间再看代码的时候还是感觉结构不够清晰,各个类之间的关系比较混乱.现在的项目也是

BEGINNING SHAREPOINT&#174; 2013 DEVELOPMENT 第2章节--SharePoint 2013 App 模型概览 理解三个SharePoint 部署模型 Apps

BEGINNING SHAREPOINT? 2013 DEVELOPMENT 第2章节--SharePoint 2013 App 模型概览 理解三个SharePoint 部署模型 Apps 因为SharePoint 2013 正逐步移动到云,有三类部署模型可用来帮助你完成这个目标(关于SharePoint Apps): SharePoint-hosted: Autohosted: Provider-hosted: 每一类部署模型都含有特色,使它成为针对不同类型App开发的理想的(选择).下面的部

BI技术框架的理解

如何梳理BI技术框架? 首先,我们需要从各个菜市场(数据源)挑选我们需要的蔬菜.肉类.水果,然后我们开始挑拣.清洗,并根据菜式的要求,将各种原材料切好(ETL),摆放到厨房(数据仓库):然后厨师根据不同的菜式及口味,将原材料进行必要的搭配(OLAP),最后辅以调料,通过炒.焖.炖等不同烹饪的手法制作出不同的菜肴,最后端上桌的红烧鱼.铁板牛肉.凉拌青瓜.水果拼盘.玉米炖排骨则像是报表.仪表盘.柱状图.趋势线等各种各样的BI前端展示界面. 技术实现的过程也和做菜一样: 领导想吃什么菜,就得研究这道菜