hiredis aeStop仅在redis命令的回调函数中生效 分析

hiredis 是 redis 的client端C语言 lib,  hiredis拥有同步和异步的API, 异步API的实现有多种方法,分别依赖libev, libevent, libuv, ae等等,其中ae是redis内部实现的一个异步事件处理模块。

稍微修改了hiredis的example-ae.c代码:在一个线程里面循环10次执行命令ping, 检查redisserver, 如下所示, 线程发完10次ping后,调用disconnect, 发现aeMain函数并未退出,程序一直阻塞住.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <thread>
#include<functional>

#ifdef __cplusplus
    extern "C"{
#endif

#include <hiredis.h>
#include <async.h>
#include <adapters/ae.h>

#ifdef __cplusplus
           }
#endif

/* Put event loop in the global scope, so it can be explicitly stopped */
static aeEventLoop *loop;

void getCallback(redisAsyncContext *c, void *r, void *privdata) {
    redisReply *reply = r;
    if (reply == NULL) return;
    printf("argv[%s]: %s\n", (char*)privdata, reply->str);
}

void connectCallback(const redisAsyncContext *c, int status) {
    if (status != REDIS_OK) {
        printf("Error: %s\n", c->errstr);
        aeStop(loop);
        return;
    }

    printf("Connected...\n");
}

void disconnectCallback(const redisAsyncContext *c, int status) {
    if (status != REDIS_OK) {
        printf("Error: %s\n", c->errstr);
        aeStop(loop);
        return;
    }

    printf("Disconnected...\n");
    aeStop(loop);
}

void quitConnCallBack(redisAsyncContext *c, void *r, void *privdata) {
    printf("quit");
    redisAsyncDisconnect(c);
}

void testThreadLoop(void * p) {
    static int num = 10;
    char c11[64];
    strcpy(c11, "test");
    while(1) {
        std::this_thread::sleep_for(std::chrono::milliseconds(1500));
        num--;
        if (num < 0) {

            //在这里调用disconnect, 并不能使aeMain退出
            redisAsyncDisconnect((redisAsyncContext *)p);
            //正确做法,应该调用如下
       //redisAsyncCommand((redisAsyncContext *)p, quitConnCallBack, c11, "quit");
            printf("exit\n");
            return;
        }

        redisAsyncCommand((redisAsyncContext *)p, getCallback, c11, "ping");
    }
}

int main (int argc, char **argv) {
    signal(SIGPIPE, SIG_IGN);

    redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);
    if (c->err) {
        /* Let *c leak for now... */
        printf("Error: %s\n", c->errstr);
        return 1;
    }

    loop = aeCreateEventLoop(64);
    redisAeAttach(loop, c);
    redisAsyncSetConnectCallback(c,connectCallback);
    redisAsyncSetDisconnectCallback(c,disconnectCallback);

    std::thread t(testThreadLoop, c);
    t.detach();
    aeMain(loop);
    return 0;
} 

首先检查下两个主要函数aeStop, aeMain的逻辑:

aeStop, aeMain函数代码如下:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}
void aeStop(aeEventLoop *eventLoop) {
    eventLoop->stop = 1;
}

1. stop分析

aeStop仅设置stop标志为true, aeMain里面在一直循环处理事件,第一印象是,直接设了stop为true后,aeMain在处理完事件后,跳出aeProcessEvents函数后,检查stop为true就会跳出while循环。但是事实是aeMain并未跳出循环,难道因为是不同线程间操作,要将stop设置为volatile类型?尝试修改了stop为volatile int类型,测试结果:aeMain 仍然未推出,程序阻塞,无法推出。

2.aeProcessEvents分析

这时就只能推测由于aeProcessEvents没有退出,导致aeMain执行无法检测stop值,分析该函数,推测可能阻塞在aeApiPoll函数,同时发现tvp变量是个NULL, 查看aeApiPoll代码(ae_epoll.c),如下

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
     //...
     //tvp 会是NULL, 推测阻塞在aeApiPoll, 查看aeApiPoll代码进行证实
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

        /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn‘t
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

  //真相在这边,epoll_wait, 第三个参数为-1, epoll_wait将一直等待下去!
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

整理下aeMain的流程如下图所示,

                      

我们的disconnect回调, 内部调用 aeStop函数,如果刚好发生在processEvents之后,aeMain检查stop值之前,那么就没问题,当然这种概率极其小,如果这都中了,那可以买彩票了~~,现在我们知道aestop调用是有立即生效的限制范围,我们最好在processEvents的时候,判断是否应该退出aeMain, 如果是就调用aeStop. processEvents内部会调用到我们外部定义的各种命令的回调函数, 刚好redis有个quit的命令(让redisserver关闭连接), 我们就增加一个quit命令回调函数调用aeStop:

redisAsyncCommand((redisAsyncContext *)p, quitConnCallBack, c11, "quit");

void quitConnCallBack(redisAsyncContext *c, void *r, void *privdata) {
    printf("quit");
    redisAsyncDisconnect(c);
}
时间: 2024-11-08 07:23:51

hiredis aeStop仅在redis命令的回调函数中生效 分析的相关文章

回调函数中调用类中的非静态成员变量或非静态成员函数

有关这方面的问题,首先说一点: 回调函数必须是静态成员函数或者全局函数来实现回调函数,大概原因是普通的C++成员函数都隐含了一个函数参数,即this指针,C++通过传递this指针给成员函数从而实现函数可以访问类的特定对象的数据成员.由于this指针的原因,使得一个普通成员函数作为回调函数时就会因为隐含的this指针问题使得函数参数个数不匹配,从而导致回调函数编译失败. 基于上面的理论,如何在类中封装回调函数呢? 回调函数只能是全局函数或者静态成员函数,但是由于全局函数会破坏封装性,所以只能用静

ajax回调函数中使用$(this)取不到对象的解决方法

如果在ajax的回调函数内使用$(this)的话,实践证明,是取不到任何对象的,需要的朋友可以参考下 $(".derek").each(function(){ $(this).click(function(){ var params = $(this).parent().serialize(); var obj=$(this).parent().siblings("div#caskContent"); var form=$(this).parent(); $.aja

理解 JS 回调函数中的 this

理解 JS 回调函数中的 this:https://www.cnblogs.com/gavinyyb/p/6286750.html 原文链接:http://www.tuicool.com/articles/z2Yvaq 任何变量或对象都有其赖以生存的上下文.如果简单地将对象理解为一段代码,那么对象处在不同的上下文,这段代码也会执行出不同的结果. 例如,我们定义一个函数 getUrl 和一个对象 pseudoWindow . function getUrl() { console.log(this

wx: wx.showModal 回调函数中调用自定义方法

一.在回调函数中调用自定义方法: 回调函数中不能直接使用this,需要在外面定义 var that = this 然后 that.自定义的方法.如下: //删除 onDelete: function (e) { var that = this; wx.showModal({ title: '提示', content: '确定要删除?', success: function (res) { if (res.confirm) { that.onEdit(e); } } }) }, //编辑 onEd

vue中this在回调函数中的使用

this在各类回调中使用: 如果是普通函数是没法使用的 所以需要先将this.变量赋值给新的变量,然后才能在回调函数中使用 例如: refund: function (id) { if (!this.url.refund) { this.$message.error("请设置url.refund属性!") return } var that = this; this.$confirm({ title: "确认退款", content: "是否要进行退款?&

在require回调函数中执行tooltipvalidator.init不需要另外再写逻辑

尽管每个人学习开发的过程会不一样,然而无论如何,系统的学习方法对每个学习者来说都是至关重要的.对于初学者,应该经常向资深的游戏开发者学习,通过他们的直播和视频,学习游戏开发的技巧.你从这些专家们身上学到的东西越多,你就可以越快成为优秀的开发者. 通过proceed()方法可以调用目标对象的相应方法,从而实现对目标方法的完全控制! angular2 的依赖注入包含了太多的内容,其中的一个重点就是注入器,而注入器又非常难理解,今天我们不深入介绍注入器的内容,可以参考官方文档,我们今天来说注入器的层级

=&gt; 应用在js回调函数中

=> 可以简化以前的回调函数的调用,具体来说: 今后,几乎所有的回调函数都可用箭头函数简化 比如: 1. 所有回调函数都可: 去function改=> 2. 如果函数体只有一句话: 可省略{} 如果这一句话还是return,可省略return 3. 如果只有一个参数: 可省略() 但是,如果没有参数,必须保留空() 更大用途: 箭头函数内外共用同一个this--取代bind 特殊: 如果不希望内外共用this,就不能用箭头函数 比如事件处理函数: elem.addEventListener(&

Swoole 关于reload重启与回调函数中代码的重载

Swoole 的 Server 中可以通过 PHP 来执行 reload 很方便的热重载, 但也有很多限制 需要注意的是, 直接写在 server.php 即 你的服务器启动脚本文件中的PHP代码即便是写在 WorkerStart 的事件回调中的代码 reload 也不会重载的, 必须是通过加载另一个文件来执行这样 reload 才会有意义 下面是测试代码和结果说明: <?php /** * Author: ZHOUZ * Blog: http://blog.csdn.net/zhouzme *

前端小组分享会之异步回调函数中的上下文

异步加载:又叫非阻塞加载,浏览器在下载执行js的同时,还会继续进行后续页面的处理.实现如:回调函数 .setTimeout . setInterval  回调函数(callback): 自己理解就是函数A里嵌套函数B B可能用到A中的变量,,B成为回调函数 function a (){ var x = 1; function b(){ console.log(++x) } b() } a() //2 上下文(Execution Context): 执行上下文(简称上下文)决定了Js执行过程中可以