使用aicp实现事件回调模式

相对于asio事件等待池aiop的reactor模式,基于proactor模式的事件回调池aicp封转的更加的上层。

  1. 在类unix系统上

    底层是基于aiop的实现,在一个线程里面进行事件等待,然后分发所有事件,在worker中处理完后调用回调返回到上层。
    并对不同系统的aiop支持力度,进行针对性优化。

    • 如果aiop支持边缘触发(例如:epoll、kqueue),尽量启用边缘触发,以减少系统api的频繁调用。
    • 使用sendfile对发送文件进行优化
    • 针对linux系统,启用native file io,实现file的dma读写,节省cpu的时间。(目前还没实现)
  2. 在windows系统上

    直接基于iocp进行封装,并针对windows的一些扩展socket api进行针对性优化。

    • 使用DisconnectEx对socket的进行重用,实现socket池来管理,优化整体性能
    • 使用TransmitFile对发送文件进行优化
    • 在高版本系统上,优先使用GetQueuedCompletionStatusEx来批量获取事件,减少系统调用

不管在哪个平台上,只需要两个线程(worker线程+timer线程),就能实现高性能并发io读写。

其中timer线程主要处理投递事件的超时维护,如果io事件长时间不响应,则会超时取消,超时事件是可以根据不同事件,自己设置。
如果确实想要快速取消,也有接口安全的强制kill掉,并在对应的回调里面监听到killed事件。

针对timer,aicp中了两种定时器:

  1. tb_ltimer_t:低精度定时器,精度为1s,主要用于超时维护,效率很高,采用简化的timing-wheel算法
  2. tb_timer_t:通用高精度定时器,精度为1ms,使用最小堆维护,主要用于维护所有定时任务,以及各种io限速操作

并且整个aicp参考nginx对于gettimeofday的优化,也对其进行了缓存,只在master loop中进行更新,其他地方直接使用缓存的值。

虽然aicp只需要一个worker线程进行loop就足够了,但是同时它也支持多个线程同时开启loop,来提高分发处理效率。

具体的aicp架构,参见:asio架构

下面看看aicp是如何使用的:

所有aicp事件投递,都是由aico对象完成,每个aico对象一个事件对象,目前支持三种事件对象:

  1. socket
  2. file
  3. task(用于投递定时任务)

具体aico的事件投递接口,参见:asio的事件投递接口说明

/* //////////////////////////////////////////////////////////////////////////////////////
 * includes
 */
#include "tbox/tbox.h"

/* //////////////////////////////////////////////////////////////////////////////////////
 * types
 */
typedef struct __tb_demo_context_t
{
    // the file
    tb_file_ref_t       file;

    // the aico
    tb_aico_ref_t       aico;

    // the size
    tb_hize_t           size;

}tb_demo_context_t;

/* //////////////////////////////////////////////////////////////////////////////////////
 * implementation
 */
// aico对象关闭回调函数
static tb_bool_t tb_demo_aico_clos(tb_aice_t const* aice)
{
    // check
    tb_assert_and_check_return_val(aice && aice->aico && aice->code == TB_AICE_CODE_CLOS, tb_false);

    // trace
    tb_trace_d("aico[%p]: clos: %s", aice->aico, tb_state_cstr(aice->state));

    // exit aico
    tb_aico_exit(aice->aico);

    // ok
    return tb_true;
}
static tb_void_t tb_demo_context_exit(tb_demo_context_t* context)
{
    if (context)
    {
        // clos aico
        if (context->aico) tb_aico_clos(context->aico, tb_demo_aico_clos, tb_null);
        context->aico = tb_null;

        // exit
        tb_free(context);
    }
}
// 发送文件事件回调函数
static tb_bool_t tb_demo_sock_sendf_func(tb_aice_t const* aice)
{
    // check
    tb_assert_and_check_return_val(aice && aice->code == TB_AICE_CODE_SENDF, tb_false);

    // the context
    tb_demo_context_t* context = (tb_demo_context_t*)aice->priv;
    tb_assert_and_check_return_val(context, tb_false);

    // ok?
    if (aice->state == TB_STATE_OK)
    {
        // trace
        tb_trace_d("sendf[%p]: real: %lu, size: %llu", aice->aico, aice->u.sendf.real, aice->u.sendf.size);

        // save size
        context->size += aice->u.sendf.real;

        // 如果还没有发送完,则继续发送剩余文件数据
        if (aice->u.sendf.real < aice->u.sendf.size)
        {
            // post sendf from file
            if (!tb_aico_sendf(aice->aico, context->file, context->size, aice->u.sendf.size - aice->u.sendf.real, tb_demo_sock_sendf_func, context)) return tb_false;
        }
        else
        {
            tb_trace_i("sendf[%p]: finished", aice->aico);
            tb_demo_context_exit(context);
        }
    }
    // closed or failed?
    else
    {
        tb_trace_i("sendf[%p]: state: %s", aice->aico, tb_state_cstr(aice->state));
        tb_demo_context_exit(context);
    }

    // ok
    return tb_true;
}
// accept事件回调函数
static tb_bool_t tb_demo_sock_acpt_func(tb_aice_t const* aice)
{
    // check
    tb_assert_and_check_return_val(aice && aice->code == TB_AICE_CODE_ACPT, tb_false);

    // the file path
    tb_char_t const* path = (tb_char_t const*)aice->priv;
    tb_assert_and_check_return_val(path, tb_false);

    // the aicp
    tb_aicp_ref_t aicp = tb_aico_aicp(aice->aico);
    tb_assert_and_check_return_val(aicp, tb_false);

    // acpt ok?
    if (aice->state == TB_STATE_OK)
    {
        // trace
        tb_trace_i("acpt[%p]: %p", aice->aico, aice->u.acpt.aico);

        // done
        tb_bool_t           ok = tb_false;
        tb_demo_context_t*  context = tb_null;
        do
        {
            // make context
            context = tb_malloc0_type(tb_demo_context_t);
            tb_assert_and_check_break(context);

            // init file
            context->file = tb_file_init(path, TB_FILE_MODE_RO | TB_FILE_MODE_ASIO);
            tb_assert_and_check_break(context->file);

            // 获取客户端连接的aico对象,用于发送数据
            context->aico = aice->u.acpt.aico;
            tb_assert_and_check_break(context->aico);

            // 投递一个发送文件事件
            if (!tb_aico_sendf(context->aico, context->file, 0ULL, tb_file_size(context->file), tb_demo_sock_sendf_func, context)) break;

            // ok
            ok = tb_true;

        } while (0);

        // failed?
        if (!ok)
        {
            // exit context
            if (context) tb_demo_context_exit(context);
        }
    }
    // failed?
    else
    {
        // exit loop
        tb_trace_i("acpt[%p]: state: %s", aice->aico, tb_state_cstr(aice->state));

        // accept失败,关闭用于监听的aico对象
        if (aice->aico) tb_aico_clos(aice->aico, tb_demo_aico_clos, tb_null);
    }

    // ok
    return tb_true;
}

/* //////////////////////////////////////////////////////////////////////////////////////
 * main
 */
tb_int_t main(tb_int_t argc, tb_char_t** argv)
{
    // check
    tb_assert_and_check_return_val(argv[1], 0);

    // done
    tb_aico_ref_t aico = tb_null;
    do
    {
        // 初始化tbox库
        if (!tb_init(tb_null, tb_null, 0)) break;

        /* 初始化一个用于监听的aico对象
         *
         * 注:这里为了简化代码,直接使用了全局的tb_aicp()对象,
         * 全局的aicp对象,用起来更加方便,内部回去自己开一个线程运行loop来驱动
         * 一般用于客户端应用。
         *
         * 如果想要更高的并发性能,启用更多的线程去驱动loop,需要手动调用tb_aicp_init,指定需要的并发量,来创建。
         * 并且需要手动运行tb_aicp_loop
         */
        aico = tb_aico_init(tb_aicp());
        tb_assert_and_check_break(aico);

        // 打开这个aico对象,并为其创建一个tcp socket
        if (!tb_aico_open_sock_from_type(aico, TB_SOCKET_TYPE_TCP)) break;

        // 绑定监听端口
        if (!tb_socket_bind(tb_aico_sock(aico), tb_null, 9090)) break;

        // 设置监听
        if (!tb_socket_listen(tb_aico_sock(aico), 20)) break;

        /* 投递accept事件,仅需一次投递
         *
         * 注:
         * 只有accept事件是一次投递,长期有效,只要有事件过来,就回去调用对应的回调函数
         * 不需要重复投递,这样设计也是为了尽可能的接受更多地并发连接
         */
        if (!tb_aico_acpt(aico, tb_demo_sock_acpt_func, argv[1])) break;

        // 等待
        getchar();

    } while (0);

    // exit tbox
    tb_exit();
    return 0;
}

使用aicp实现事件回调模式

时间: 2024-07-30 06:19:50

使用aicp实现事件回调模式的相关文章

Java设计模式补充:回调模式、事件监听器模式、观察者模式(转)

一.回调函数 为什么首先会讲回调函数呢?因为这个是理解监听器.观察者模式的关键. 什么是回调函数 所谓的回调,用于回调的函数. 回调函数只是一个功能片段,由用户按照回调函数调用约定来实现的一个函数. 有这么一句通俗的定义:就是程序员A写了一段程序(程序a),其中预留有回调函数接口,并封装好了该程序.程序员B要让a调用自己的程序b中的一个方法,于是,他通过a中的接口回调自己b中的方法. 举个例子: 这里有两个实体:回调抽象接口.回调者(即程序a) 回调接口(ICallBack ) public i

java设计模式--观察者模式和事件监听器模式

文章转载于:http://www.java2000.net/p9452 复习设计模式,看到observer观察者模式,说法是该模式和iterator迭代器模式类似已经被整合进jdk,但是jdk提供了两种接口: 一.java.util.Observer -- 观察者接口 对应: java.util.Observable --受查者根类 二.java.util.EventListener -- 事件监听/处理接口 对应: java.util.EventObject -- 事件(状态)对象根类 研究了

IOS开发之自定义Button(集成三种回调模式)

前面在做东西的时候都用到了storyboard,在今天的代码中就纯手写代码自己用封装个Button.这个Button继承于UIView类,在封装的时候用上啦OC中的三种回调模式:目标动作回调,委托回调,Block回调.具体的内容请参考之前的博客:“Objective-C中的Block回调模式”,“Target-Action回调模式”,“Objective-C中的委托(代理)模式”.在接下来要封装的button中将要用到上面的知识点.之前在做新浪微博中的Cell的时候用到了Block回调来确定是那

JavaScript:回调模式(Callback Pattern)

函数就是对象,所以他们可以作为一个参数传递给其它函数: 当你将introduceBugs()作为一个参数传递给writeCode(),然后在某个时间点,writeCode()有可能执行(调用)introduceBugs(): 这种情况下,introduceBugs()被称为回调函数(callback function)或简称为回调(callback:): [javascript] view plaincopyprint? function writeCode(callback) { // do 

OC3大回调模式使用总结(一)目标动作回调

OC 3大回调模式使用总结(一)目标动作回调 1.目标动作主要用于 (UIButton ,UIStepper,UISwitch,UISegmentControl,UISlider) ,是用来监听按钮等类似控件状态的编程模式,该模式产生的原因是 某些事件是不确定何时会发生,但是发生的时候就需要这么去处理,所以你需要提前写好处理这个事件的代码,当这个事件发生时,系统会调用你写的预备性质的代码(相当于预防性措施) 2.事件: UIButton对应的事件是:UIControlEventTouchUpIn

OC3大回调模式使用总结(二)委托模式回调

OC 3大回调模式使用总结(二)委托模式回调 1.委托模式回调 主要用于 UITableView(UITableViewController),UICollectionView,UIPickerView,UITextField,UITextField 这几类控件,是使用委托模式封装的,使用方法和按钮类的控件不一样 委托模式,实际上也是一种对自身状态的汇报机制,某个状态或者事件的变化是不确定时间发生的,但是发生的时候就就得有某些应对措施,这些应对措施是提前写在协议中的; 与目标动作回调不同的是,委

OC3大回调模式使用总结(三)block回调

OC 3大回调模式使用总结(三)block回调 block 又称 代码块,闭包等 是一个匿名的函数,它可以当做一个对象来使用,只不过这个对象很特殊,是一段代码,他可以保存你写的一段预备性质代码,待某个不确定的事件发生时再调用;事件发生时,它可能会给你传递一些状态参数(回传),来方便你的使用 block常用类型(可以看做一个匿名函数的类型): typedef int(^Add)(int,int);//定义一个block类型 typedef void(^Logg)(NSString *);//有一个

微信企业号回调模式配置讲解 Java Servlet+Struts2版本 echostr校验失败解决

微信企业号回调模式配置讲解 Java Servlet+Struts2版本 echostr校验失败解决 echostr校验失败,请您检查是否正确解密并输出明文echostr 异常java.security.InvalidKeyException:illegal Key Size 也就是echostr校验失败,请您检查是否正确解密并输出明文echostr这个错误 企业微信登陆地址http://qy.weixin.qq.com/ 配置成功以后 Servlet public void doGet(Htt

node.js 事件发射器模式

目录: 前言 Node.js事件驱动介绍 Node.js事件 注册并发射自定义Node.js事件 EventEmitter介绍 EventEmitter常用的API error事件 继承EventEmitter 前言: 今天事儿太多了,没有发太多的东西.不好意思了各位. 本篇主要介绍Node.js中的事件驱动,至于Node.js事件概念的东西,太多了. 本系列课程主要抱着的理念就是,让大家慢慢的入门,我也尽量写的简单一点. 所以呢,本文事件驱动,大家的目标应该是:理解Node.js的事件驱动.会