多线程实现简单的事件异步处理框架

老实说,多线程在web开发里面非常常见,很多web容器本身就支持多线程,所以很多时候我们在进行web开发的时候并不需要考虑多线程相关的负责问题,而只需要实现相关的业务功能即可。所以,可以概括地讲,很多时候的web开发,并没有多线程方面的考虑,因为web应用本身就是在多线程基础上的了。

但是,有些时候为了提高程序性能,在用户的一个请求中中如果包含过多的业务操作或者包含耗时比较长的业务操作,我们就需要考虑使用异步的方式来提高程序响应的速度了。这篇博客简单介绍了在java中如何使用多线程实现一个简单的异步框架。

这个事件异步处理框架主要的工作过程是这样的:通过producer类对事件实体类序列化后,存储在redis的list队列中,而comsumer则负责读取事件队列中的事件模型对象并反序列化后调用相应的handler实现类对象进行事件处理,这些handler实现类的对象,通过spring完成handler具体类的注册操作,存在在一个map结构中,更具体的请读者往下看,欢迎指正不足的地方!

一、同步、异步的概念

  在学习多线程的时候,我们接触最多的概念估计是同步的概念了,多线程中同步的意思大概是这样:线程访问资源时一直在等待,知道资源访问结束。所以,有同步的概念,我们可以大概理解与之相对的异步的概念:线程在访问资源(或者处理耗时较长的数据)时,不必一直等待资源访问完成或者数据处理完,在等待期间线程可以做其他事情,而当资源访问完成之后,会采取回调的方式执行相应的代码。

  例如,在IO读写中,同步的方式就是在IO 操作的阻塞过程一直阻塞,直到IO操作完成;而异步的意思就是在io操作阻塞过程线程去做其他事情,当IO操作完成后,采取回调的方式执行相应的操作。

二、异步框架的模型原理

  1、生产者--消费者模式

    有了解过设计模式的读者应该听过这个大名鼎鼎的设计模式了,它大概的思路如下示例图:

大概意思就是:生产者负责数据的产生,它将数据放到内存中去(一般是一个队列),而消费者则负责处理内存中的数据,处理完成后,可以通过回调的方式进行响应。上面的图比较粗略,下面是具体的实现示意图:

上面示意图具体说明了生产者消费者的具体实现方式:

eventProducer(当然,也可以是dataProducer等)是生产者,它会将前端传输过来的数据或者说需要处理的事件封装好,然后将这些封装好的数据放进一个队列里面去;

而eventConsumer是消费者,它会读取队列里面的数据,然后进行处理。

在这个过程,程序是以异步的方式运行的:生产者无需等待消费者处理完成,它的职责只是将数据推到内存里面去,然后就可以进行响应;而消费者只需要处理数据即可,它不用管数据是哪来的。显然,这样的方式可以提高响应的速度,同时使得异步的实现方式变得简单起来。

  2、web开发中的异步框架思路

  上面的生产者--消费者为我们实现web的异步框架提供了一种很好的思路:在复杂的业务操作或者耗时比较长的业务中,我们可以采用异步的方式提高程序的响应速度,而生产者消费者的模式正是我们实现异步框架的参考模型--复杂业务的service层使对应的生产者,它只需要将要处理的数据放进一个队列里面,然后即可相应用户;而相应的handler类则负责具体的数据处理。

  3、为什么用异步?

  显然,在上面描述的思路中,我们大概可以知道什么时候应该使用异步框架:对相应速度要求比较高请求,但是该请求的相关业务操作允许一定的延迟。

  举个具体的例子:在一个社交网站中,很多时候会有点赞的操作,A给B点赞,一般来说会包含两个操作,第一个操作是告诉A点赞成功了,第二个操作是告诉B他被A点赞了;如果不采用异步的方式,那就需要在在这两个操作都完成后,才响应A说点赞成功,但是第二个操作显然会耗时很长(例如需要发邮件通知),所以不采用异步方式时A就会有这样一种感觉:怎么点个赞要等半天才响应的,什么垃圾系统!所以,这时候为了提高对A的相应速度,我们可以采用异步的方式:A点赞请求发出之后,程序不需要等到B收到A的点赞通知了,才告诉A说你点赞成功了,因为B收到A的点赞通知相对于A知道自己点赞成功来说,是允许延迟的。

  好吧,上面的解说可能有点绕,不过如果你理解了上面的这个例子,大概也就知道异步的适用场景了。

三、简单的事件处理异步框架

  前面啰啰嗦嗦铺垫了那么多,下面就用一个比较简单的例子来说明web开发中异步框架的应用场景以及如何实现一个简单的异步框架吧。

  首先说明的是,在下面的代码中,我是将最近做的一个项目中的部分业务功能抽取出来的,所以会用到spring的框架以及redis(用于存储生产者产生的数据)相关知识,同时为了提高程序的扩展性,我采用了面向接口编程的方式,利用spring的内置功能实现消费者的自动注册,看不懂可以稍微百度下(其实只是用到了redis的一点皮毛功能,毕竟我也是刚接触redis的菜鸟而已,所以不用担心看不懂)

  1、框架的大体模型

  主要是包括三个部分:生产者producer类,消费者comsumer类,事件处理的handler接口以及对应的实现类,具体的事件eventModel类(对应数据)。

  在这里,producer类会将前端传输过来的eventModel对象进行序列化,将它加入到一个异步队列中,这里采用redis的list数据结构实现。

  消费者comsumer则负责将redis中队列的数据读取出来,反序列化后,根据eventModel中的eventType来调用相应的handler具体实现类(handler实现类存储在一个map结构里面,key对应的是eventType,value对应的是具体handler实现类)进行业务处理。

  handler实现类负责具体事件的处理,它需要实现一个handler接口(该接口是通过spring进行自动注册的关键,具体后面会讲)。

  eventModel是事件模型,它主要存储与事件有关的数据,包括事件类型,时间触发者,事件所属者等数据。具体的后面会讲解。

  下面就各个模块进行具体的讲解以及给出相应的代码实现。

  2、eventModel事件模型

    在讲解其他部分之前,我觉得首先应该简单讲解下我们应该如何组织一个事件模型。直接上代码吧,请注意看注释理解如何组织事件模型:

  

/**
 * 事件模型:用于表示个事件
 */
public class EventModel {
    /**
     * 事件类型,用于标识事件,同时在comsumer中根据这个值确定handler的具体实现类,一般可用一个枚举类型实现
     * 例如点赞通知对应的事件类型和注册发邮件进行激活的事件就应该属于不同的eventType,应该对应不同的handler实现类
     */
    private EventType eventType;
    /**
     * 事件触发者,例如用户A给用户B点赞,A就是时间触发者
     */
    private int actionId;/**
     * 事件发生对应的关联者,例如A给B点赞,A对应actionId,actionOwnerId
     */
    private int actionOwnerId;
    /**时间处理需要的额外的数据,采用map的方式可以保证程序的扩展性
     * 例如注册发送邮件的操作需要的数据和点赞通知需要的数据并不一样,所以用map存储最大程度地保证程序的灵活性
     */
    private Map<String,String> exts = new HashMap<>();

    /**
     * 注意序列化需要显式有一个无参构造函数
     */
    public EventModel(){

    }

    /**
     * getter 和setter,这部分省略
     */

}

  在组织eventModel时,我们应该保证灵活性,将必须的变量抽取出来之余,用一个map结构来存储具体业务可能需要的额外数据。

  3、producer类

    producer的功能较为加单,只是将eventModel进行序列化,然后将它添加进相应的时间队列,具体代码如下:

    

/**
 * 事件生产者
 */
@Component
public class EventProducer {
    @Autowired
    private JedisEventHandlerAdaptor jedisEventHandlerAdaptor;
    @Autowired
    private JedisKeyUtil jedisKeyUtil;

    public void add(EventModel model){
        String modelJson = JSONObject.toJSONString(model);
        jedisEventHandlerAdaptor.add(jedisKeyUtil.getEventHandlerKey(),modelJson);
    }
}

没有接触过redis的读者可以认为上面的jedisEventHandlerAdaptor其实就是一个可以操作某个队列的类,在java中其实也可以用阻塞队列来实现的,更具体的读者可以自己尝试。

  4、comsumer类

    在这个异步事件处理框架中,comsumer主要负责以下的职责:

    读取事件队列中的eventModel对象,将它反序列化后,根据eventType负责调用具体的handler实现类;

    在初始化的时候利用spring框架自动对handler具体实现类进行注册操作,并将之存储在一个map的数据结构中,key是eventType,valuee是handler具体实现类的对象。

    具体的实现方式请读者注意看代码中的注释:

  

/**
 * 事件处理类,该类负责调用handler,对事件进行处理,需要实现spring的两个接口,InitializingBean接口是初始化时自动注册handler要用;
 *ApplicationContextAware则是调用spring的applicationContext(该applicationContext中存储着handler具体实现类的bean对象)需要实现
 * 的接口,通过applicationContext获取handler对应的beans,然后就可以将handler自动注册到下面的config对象中了(是一个map)
 */
@Component
public class EventComsumer implements InitializingBean, ApplicationContextAware{
    private static final Logger logger = LoggerFactory.getLogger(MessageController.class);
    /**
     * threadPoolUtil封装了线程池的线程相关操作
     */
    @Autowired
    private ThreadPoolUtil threadPool;
    @Autowired
    private JedisEventHandlerAdaptor adaptor;
    /**
     * 这是一个与redis交互相关的工具类,用于获取特定的redis key,避免key冲突用,读者可以忽略
     */
    @Autowired
    private JedisKeyUtil jedisKeyUtil;
    /**
     * spring上下文对象,该对象存储着handler bean对象,必须通过setApplicationContext(ApplicationContextAware接口的实现方法)
     * 进行初始化,这样才能获取spring中的handler具体实现类的beans
     */
    private ApplicationContext applicationContext;
    /**设置消费函数阻塞时间,暂定为一天,redis阻塞list中必须要的参数,读者可以忽略
     */
    private static int COMSUME_TIMEOUT = 24*3600;
    /**
     * congif:该变量用于存储type和eventType的映射关系,在消费时,可以直接根据config中你的映射关系进行handler调用
     * 注意,这里为了保证程序的灵活性,eventHandler用一个list进行存储,因为有可能一个EventType事件类型可能对应多个
     * handler事件处理对象,例如点赞通知这个事件类型可能需要通知被点赞的人以及通知系统管理员,所以应该对应两个事件handler
     * 更具体的可以参考handler接口设计时的注释
     */
    Map<EventType,List<EventHandler>> config = new HashMap<>();

    /**
     * spring对该对象进行初始化的时候,将所有的handler具体对象注册到config对象中
     */
    @Override
    public void afterPropertiesSet() throws Exception {    //获取所有handler具体对象
        Map<String,EventHandler> beans = applicationContext.getBeansOfType(EventHandler.class);    //迭代注册handler对象
        for(Map.Entry<String,EventHandler> entry:beans.entrySet()){
            EventHandler handler = entry.getValue();       //由于一个handler也可能对应多个事件类型,所以一个handler要注册到所有的eventType中去,这里如果看不懂可以结合后面的解释handler接口代码的注释进行理解
            for(EventType type:handler.getHandlerType()){
                if(config.get(type)==null){
                    config.put(type,new ArrayList<EventHandler>());
                }
                config.get(type).add(handler);
            }
        }
        //开线程调用消费函数,注意不能直接调用,否则会导致主线程阻塞
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                doConsume();
            }
        });
    }

    /**
     * 消费函数,用于执行handler
     */
    public void doConsume() {
        while(true){
            List<String> list = adaptor.pop(String.valueOf(COMSUME_TIMEOUT), jedisKeyUtil.getEventHandlerKey());
            //反序列化
            EventModel model = JSON.parseObject(list.get(1),EventModel.class);
            EventType type = model.getEventType();
            //获取事件的handler
            List<EventHandler> handlers = config.get(type);
            //执行handler
            for(EventHandler handler:handlers){
                handler.doHandler(model);
            }
        }
    }
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

  这里需要重点解释下comsumer中handler自动注册的过程(afterPropertitiesSet方法以及config对象):

  首先我们的config对象是一个map,key是一个eventType,表示某个事件;而为什么用list来表示eventHandler呢?这是因为一个事件有可能对应多个eventHandler,所以为了为了保证灵活性,用list形式存储handler最适合了;而关于handler方法中的getHandlerType方法,下面的handler接口设计讲解时会进行详细解释。

  另外,关于 InitializingBean, ApplicationContextAware 两个接口的作用具体不讲,读者入股不知道为什么必须实现这两个接口才能借助spring实现自动注册的话,可以进行谷歌或者百度下,相信很容易找到答案的。

  5、handler接口设计

   直接上代码吧,请注意看注释:

  

/**
 * 事件处理器:用于处理事件队列里面的事件,被eventConsumer调用
 * doHandler:model是具体的事件模型,它需要由调用者(一般是comsumer)传进来
 */
public interface EventHandler {
    public void doHandler(EventModel model);

    /**
     *
     * @return 表明该接口是什么类型的handler,list表明handler可以支持多个业务,也就是说,一个handler可以对应多个eventType   *例如说,sendEmailHandler,邮件发送handler,具体业务例如注册激活的事件类型,点赞的发邮件通知时间类型都会需要这个handler,   *所以一个handler是有必要对应多个eventType的,这里请读者务必理解,当初我也理解了挺久的。所以,具体handler实现类中必须有一个list变量来存储它对应的事件类型
     */
    public List<EventType> getHandlerType();

}

这里的handler为什么要对应多个eventType请读者参考注释理解,我觉得理解这个挺重要的,当你理解这个之后,回头看上面的自动注册过程(在comsumer类中)才不会感到懵逼。

  最后,我们只需要实现eventHandler接口就可以了,comsumer会在spring启动时自动帮你注册该类,我们只需要在service中声明eventType,comsumer便会自动找到相应的接口执行具体操作。

  6、总结

    这个简单的异步事假处理框架例子就大概解析到这里了,其实我觉得最主要的是通过这个事件处理框架设计的过程体会和领悟生产者消费者设计模型以及异步框架的工作原理;当然,这个过程其实还有很多其他需要领悟的:例如如何设计接口才能保证灵活性;对象的注册又是什么意思,我们应该如何实现自动注册等等。

  

  用了一个早上终于把这篇博客写完了,其实这个异步事件处理框架还是有点粗糙的,但是在我开来再复杂的异步框架工作原理大体上也是这样的,也希望这篇博客能给读者带来那么一点点收获,不足的地方请各位大佬指正!

  

  

 

时间: 2024-10-24 12:43:39

多线程实现简单的事件异步处理框架的相关文章

Python 开源异步并发框架的未来(转)

Python 开源异步并发框架的未来 fantix 1.1k 2014年04月16日 发布 推荐 4 推荐 收藏 31 收藏,8.9k 浏览 呵呵,这个标题有点大,其实只是想从零开始介绍一下异步的基础,以及 Python 开源异步并发框架的发展和互操作性. 另外,这是我在 OSTC 2014 做的一个 20140330-OSTC-分论坛1王川 http://v.youku.com/v_show/id_XNjk2ODI0ODQ4.html ,幻灯片在这里,欢迎拍砖. 开源 Python 是开源的,

简单好用的第三方框架(砖)

原文  http://blog.csdn.net/caoyouxing/article/details/42418591 主题 开源安卓开发 Android开源库 自己一直很喜欢Android开发,就如博客签名一样, 我是程序猿,我为自己代言 . 在摸索过程中,GitHub上搜集了很多很棒的Android第三方库,推荐给在苦苦寻找的开发者,而且我会 不定期的更新 这篇文章. Android下的优秀开发库数不胜数,在本文中,我列举的多是开发流程中最常用的一些.如果你还想了解更多的Android开源

Python 开源异步并发框架的未来

http://segmentfault.com/a/1190000000471602 开源 Python 是开源的,介绍的这几个框架 Twisted.Tornado.Gevent 和 tulip 也都是开源的,最后这个演讲是在开源大会弄的,所以标题里肯定少不了开源.另外,我的 gevent3 项目也是开源的——貌似不少同学被我起的极品名字给搞混了,特别说明一下,gevent3 虽然有跟 Gevent 一样的接口外貌,但底层却是 tulip 驱动的(考虑把名字改回 gulip 之类的):请区别于将

基于redis AE的异步网络框架

最近一直在研究redis的源码,redis的高效率令人佩服. 在我们的linux机器上,cpu型号为, Intel(R) Pentium(R) CPU G630 @ 2.70GHz Intel(R) Pentium(R) CPU G630 @ 2.70GHz 上 set,get 都能达到每秒钟15W的请求处理量,真是佩服这代码的效率. 前几篇文章,主要是介绍了基本的代码,比如字符串处理,链表处理,hash等.这篇文章介绍网络的核心,基于事件反映的异步网络框架. 异步网络处理,是基于epoll的.

Java异步NIO框架Netty实现高性能高并发

1. 背景 1.1. 惊人的性能数据 近期一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨节点远程服务调用.相比于传统基于Java序列化+BIO(同步堵塞IO)的通信框架.性能提升了8倍多. 其实,我对这个数据并不感到吃惊,依据我5年多的NIO编程经验.通过选择合适的NIO框架,加上高性能的压缩二进制编解码技术,精心的设计Reactor线程模型,达到上述性能指标是全然有可能的. 以下我们就一起来看下Ne

Python开源异步并发框架

Python开源异步并发框架的未来 2014年3月30日,由全球最大的中文IT社区CSDN主办的“开源技术大会·2014” (Open Source Technology Conference 2014,简称OSTC 2014)在北京丽亭华苑酒店召开. 本次大会以“启蒙·开源”(Open Mind, Open Source)为主题,邀请到了来自全国各地的30多位开源业界资深人士发表主题演讲,数十个开源社区现场参与,到场的开源软件开发者.贡献者和开源爱好 者总人数超过500人.作为一场“接地气”的

python2.0_s12_day10_Twsited异步网络框架

Twsited异步网络框架 Twisted是一个事件驱动的网络框架,其中包含了诸多功能,例如:网络协议.线程.数据库管理.网络操作.电子邮件等. Package application Configuration objects for Twisted Applications Package conch Twisted Conch:The Twisted Shell.Terminal emulation,SSHv2 and telnet Module copyright Copyright i

Android异步处理框架AsyncTask源码解析

一.概述 在Android开发中,我们进行异步处理一般会采用两种方式: 1.Thread +Handler 通常我们在Thread里面发送消息,然后在Handler的handleMessage方法里面去处理对应的任务,因为Android是不允许UI线程去更新UI的,这个时候我们可以采取这种方式 2.AsyncTask AsyncTask是Android为我们封装的一个轻量级的异步处理框架,其实底层也是用了类似Thread+Handler的方式.对外提供了一些方法,我们实现这些方法就可以很方便的进

Android异步任务处理框架AsyncTask源码分析

[转载请注明出处:http://blog.csdn.net/feiduclear_up CSDN 废墟的树] 引言 在平时项目开发中难免会遇到异步耗时的任务(比如最常见的网络请求).遇到这种问题,我们可以自己通过Handler+Message+Thread/ThreadPool来构造一个异步耗时任务框架.当你下次项目中又遇到一个网络请求,你又不得不重写异步耗时任务处理框架.出于避免开发者重复搬砖工作,Google工程师给开发者搭建了一个通用的异步耗时任务处理框架--AsyncTask. Asyn