Google-Guava-EventBus源码解读

Guava是Google开源的一个Java基础类库,它在Google内部被广泛使用。Guava提供了很多功能模块比如:集合、并发库、缓存等,EventBus是其中的一个module,本篇结合EventBus源码来谈谈它的设计与实现。

概要

首先,我们先来预览一下EventBus模块的全部类图:

类并不是多而且几乎没有太多继承关系。

下面,我们来看一下各个类的职责:

  • EventBus:核心类,代表了一个事件总线。Publish事件也由它发起。
  • AsyncEventBus:在分发事件的时候,将其压入一个全局队列的异步分发模式。
  • Subscriber:对某个事件的处理器抽象,封装了事件的订阅者以及处理器,并负责事件处理(该类的类名及其语义有些不明确,后续会谈到)。
  • SubscriberRegistry:订阅注册表,它用于存储Subscriber跟Event的对应关系,以便于EventBus在publish一个事件时,可以找到它对应的Subscriber。
  • Dispatcher:事件分发器,它定义了事件的分发策略。
  • @Subscribe:用于标识事件处理器的注解,当EventBus publish一个事件后,相应的Subscriber将会得到通知并执行事件处理器。
  • @AllowConcurrentEvents:该注解跟@Subscribe一同使用,标识该订阅者的处理方法为线程安全的,该注解还用于标识该方法将可能会被EventBus在多线程环境下执行。
  • DeadEvent:死信(没有订阅者关注的事件)对象。
  • SubscribeExceptionHandler:订阅者抛出异常的处理器。
  • SubscribeExceptionContext:订阅者抛出异常的上下文对象。

在对每个类进行分解之前,我们再来看一下各个类之间的关联关系:

分“类”解读

EventBus

它有这么几个字段:

  • identifier:事件总线的标识,这说明在一个应用里是可以有多个EventBus的。如果不指明它的值,它将以“default”作为其默认名称。
  • executor:它是Executor接口的实例,用于对订阅者处理事件方法的执行。这里需要注意的是,该字段的实例化是在EventBus内部构造器中,并不是从外部注入进来的,另外真正的执行订阅者方法的时机也不由EventBus负责,而是由Subscriber负责,因此该字段会被公开给外部访问。
  • exceptionHandler:它是SubscribeExceptionHandler的实例,用于处理订阅者在执行事件处理方法时抛出的异常。EventBus可以接收一个外部定义的异常处理器,也可以采用内部缺省的日志记录处理器。
  • subscribers:订阅者注册表,用于存储所有的事件以及事件处理器、订阅对象的对应关系。
  • dispatcher:事件分发器,用于分发事件给订阅对象的事件处理器,该对象在EventBus构造方法内部初始化,默认的实现是PerThreadQueuedDispatcher,该分发器将事件存入队列,并保证在同一个线程上发送的事件能够按照他们发布的顺序被分发给所有的订阅者。

EventBus提供了几个核心方法:

  • register:注册subscriber;
  • unregister:移除注册过的subscriber;
  • post:发布事件;

你可以将EventBus看做是一个代理,这些方法真正的实现者都是上面的这些对象。

AsyncEventBus

一个支持异步发布模式的EventBus,它覆盖了EventBus的默认构造方法,指定了一个异步的分发器:LegacyAsyncDispatcher,这个分发器基于一个全局的队列来暂存未发布的事件。

Subscriber

之前也提到Subscriber的名称是比较容易混淆的。这个类的名称看似表示一个订阅者对象,但其实是用来封装“一个订阅者的一个事件处理器”对象。因为当一个订阅者存在多个处理方法被标注为@Subscribe的时候,那么每个处理方法都对应于一个独立的Subscriber对象的实例。我个人觉得这个名称与其具体的实现语义有些混淆。当然也许实现者认为:一个对象以及一个事件处理器就是一个Subscriber的话,那是没有问题的。因此这里为了理解方便,你可以将其看做是一个封装了订阅者对象以及一个订阅者处理器方法的实体类。

Subscriber的访问级别是package的,它还承担了执行事件处理的责任。通过一个create静态工厂方法创建它:

static Subscriber create(EventBus bus, Object listener, Method method) {
    return isDeclaredThreadSafe(method)
        ? new Subscriber(bus, listener, method)
        : new SynchronizedSubscriber(bus, listener, method);
  }

它接收三个参数:

  • bus:EventBus的实例,通过它来获取事件的执行器(executor)
  • listener:真实的订阅者对象
  • method:订阅对象的事件处理方法的Method实例

在实现中,它会先判断该处理器方法上是否被标注有@AllowConcurrentEvents注解,如果有,则实例化Subscriber类的一个实例;如果没有,则不允许eventbus在多线程的环境中调用处理器方法,所以这里专门为此提供了一个同步的订阅者对象:SynchronizedSubscriber来保证线程安全。

该类的两个关键方法之一:

dispatchEvent:

final void dispatchEvent(final Object event) {
    executor.execute(new Runnable() {
      @Override
      public void run() {
        try {
          invokeSubscriberMethod(event);
        } catch (InvocationTargetException e) {
          bus.handleSubscriberException(e.getCause(), context(event));
        }
      }
    });
  }

它调用一个多线程执行器来执行事件处理器方法。

另一个方法:invokeSubscriberMethod以反射的方式调用事件处理器方法。

另外,该类对Object的equals方法进行了override并标识为final。主要是为了避免同一个对象对某个事件进行重复订阅,在SubscriberRegistry中会有相应的判等操作。当然这里Subscriber也override并final了hashCode方法。这是最佳实践,不必多谈,如果不了解的可以去看看《Effective Java》。

该类还有个内部类,就是我们上面谈到的SynchronizedSubscriber,它继承了Subscriber,与Subscriber唯一的不同就是在invokeSubscriberMethod的执行上做了同步。

SubscriberRegistry

针对单个EventBus的订阅与事件的关系维护。在内部用来存储订阅者关系的对象是java并发包下的并发Map:ConcurrentMap,该map以Class对象为键,值的类型是CopyOnWriteArraySet<Subscriber>集合类型。

SubscriberRegistry直接依赖EventBus对象,所以在构造器中需要注入EventBus的实例。

SubscriberRegistry里有两个关键的实例方法:register/unregister。

register

接收订阅者对象作为参数并建立Event跟Subscriber的关联关系。

我们来看看它的实现:

void register(Object listener) {
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<Subscriber> eventMethodsInListener = entry.getValue();

      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
        eventSubscribers = MoreObjects.firstNonNull(
            subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
  }

它首先获得一个Multimap实例(它是Google Guava集合框架提供的一个多值Map类型,也就是说一个key可以对应多个value),该Multimap用于存储事件类型对应的该订阅者内所有关于该事件的处理器方法集合,其key为事件的Class类型。这里在for循环的中通过asMap获取其map视图,即可将Multimap对应的多个值存储到一个Collection中。

也就是说这里for循环的每个entry,表示的是一个事件的Class实例对应的一组Subscriber的集合,即eventMethodsInListener。

然后根据该事件的Class对象从注册表中获取对应的存储Subscriber实例的集合,如果不存在则创建该集合,然后将该订阅者内所有的事件处理器方法都加入到注册表中去。

unregister

unregister的实现跟register有些类似,先查找该订阅者所有的事件类型与处理器的对应关系。然后,遍历所有的事件类型,移除针对当前订阅者的所有Subscriber实例。

findAllSubscribers

register/unregister方法都调用了findAllSubscribers方法,它有一些特别之处,这里需要单独拎出来提一下。

findAllSubscribers用于查找事件类型以及事件处理器的对应关系。查找注解需要涉及到反射,通过反射来获取标注在方法上的注解。因为Guava针对EventBus的注册采取的是“隐式契约”而非接口这种“显式契约”。而类与接口是存在继承关系的,所有很有可能某个订阅者其父类(或者父类实现的某个接口)也订阅了某个事件。因此这里的查找需要顺着继承链向上查找父类的方法是否也被注解标注,代码实现:

  private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
    Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
    Class<?> clazz = listener.getClass();
    for (Method method : getAnnotatedMethods(clazz)) {
      Class<?>[] parameterTypes = method.getParameterTypes();
      Class<?> eventType = parameterTypes[0];
      methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
    }
    return methodsInListener;
  }

同样涉及这个问题的,还有根据事件类型获取Subscriber实例的方法:getSubscribers。

getSubscribers

  Iterator<Subscriber> getSubscribers(Object event) {
    ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());

    List<Iterator<Subscriber>> subscriberIterators =
        Lists.newArrayListWithCapacity(eventTypes.size());

    for (Class<?> eventType : eventTypes) {
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
      if (eventSubscribers != null) {
        // eager no-copy snapshot
        subscriberIterators.add(eventSubscribers.iterator());
      }
    }

    return Iterators.concat(subscriberIterators.iterator());
  }

Dispatcher

dispatcher用于分发事件给Subscriber。它内部实现了多个分发器用于提供在不同场景下不同的事件顺序性。Dispatcher是一个抽象类,定义了一个核心抽象方法:

abstract void dispatch(Object event, Iterator<Subscriber> subscribers);

该方法用于将一个指定的事件分发给所有的订阅者。

另外在Dispatcher提供了三个不同的分发器实现:

PerThreadQueuedDispatcher

它比较常用,针对每个线程构建一个队列用于暂存事件对象。保证所有的事件都按照他们publish的顺序从单一的线程上发出。保证从单一线程上发出,没什么特别的地方,主要是在内部定义了一个队列,将其放在ThreadLocal中,用以跟特定的线程关联。

LegacyAsyncDispatcher

另一个异步分发器的实现:LegacyAsyncDispatcher,之前在介绍AsyncEventBus的时候提到,它就是用这种实现来分发事件。

它在内部通过一个ConcurrentLinkedQueue<EventWithSubscriber>的全局队列来存储事件。从关键方法:dispatch的实现来看,它跟PerThreadQueuedDispatcher的区别主要是两个循环上的差异(这里基于队列的缓存事件的方式,肯定会存在两个循环:循环取队列里的事件以及循环发送给Subscriber)。

PerThreadQueuedDispatcher:是两层嵌套循环,外层是遍历队列取事件,内存是遍历事件的订阅处理器。

LegacyAsyncDispatcher:是一前一后两个循环。前面一个是遍历事件订阅处理器,并构建一个事件实体对象存入队列。后一个循环是遍历该事件实体对象队列,取出事件实体对象中的事件进行分发。

ImmediateDispatcher

其实以上两个基于中间队列的分发实现都可以看做是异步模式,而ImmediateDispatcher则是同步模式:只要有事件发生就会立即分发并被立即得到处理。ImmediateDispatcher从感官上看类似于线性并顺序执行,而采用队列的方式有多线程汇聚到一个公共队列的由发散到聚合的模型。因此,ImmediateDispatcher的分发方式是一种深度优先的方式,而使用队列是一种广度优先的方式。

DeadEvent

它是一个实体对象,封装了没有订阅者的事件。DeadEvent由两个属性组成:

  • source:事件源(通常指发布事件的EventBus对象)
  • event:事件对象

DeadEvent对象的产生:当通过某个EventBus的实例发布一个事件的时候,没有找到事件订阅者并且它本身又不是一个DeadEvent的实例时,将由EventBus构建一个DeadEvent类的实例。

总结

Guava的EventBus源码还是比较简单、清晰的。从源码来看,它一番常用的Observer的设计方式,放弃采用统一的接口、统一的事件对象类型。转而采用基于注解扫描的绑定方式。

其实无论是强制实现统一的接口,还是基于注解的实现方式都是在构建一种关联关系(或者说满足某种契约)。很明显接口的方式是编译层面上强制的显式契约,而注解的方式则是运行时动态绑定的隐式契约关系。接口的方式是传统的方式,编译时确定观察者关系,清晰明了,但通常要求有一致的事件类型、方法签名。而基于注解实现的机制,刚好相反,编译时因为没有接口的语法层面上的依赖关系,显得不那么清晰,至少静态分析工具很难展示观察者关系,但无需一致的方法签名、事件参数,至于多个订阅者类之间的继承关系,可以继承接收事件的通知,可以看作既是其优点也是其缺点。

时间: 2024-12-21 00:15:48

Google-Guava-EventBus源码解读的相关文章

第二章 Google guava cache源码解析1--构建缓存器

1.guava cache 当下最常用最简单的本地缓存 线程安全的本地缓存 类似于ConcurrentHashMap(或者说成就是一个ConcurrentHashMap,只是在其上多添加了一些功能) 2.使用实例 具体在实际中使用的例子,去查看<第七章 企业项目开发--本地缓存guava cache>,下面只列出测试实例: import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;

Google guava cache源码解析1--构建缓存器(3)

此文已由作者赵计刚授权网易云社区发布. 欢迎访问网易云社区,了解更多网易技术产品运营经验. 下面介绍在LocalCache(CacheBuilder, CacheLoader)中调用的一些方法: CacheBuilder-->getConcurrencyLevel() int getConcurrencyLevel() {         return (concurrencyLevel == UNSET_INT) ? //是否设置了concurrencyLevel               

解读EventBus源码

Event 其实就是一个对象,可以是网络请求返回的字符串,也可以是某个开关状态等等.事件类型(EventType)指事件所属的 Class. 事件分为一般事件和 Sticky 事件,相对于一般事件,Sticky 事件不同之处在于,当事件发布后,再有订阅者开始订阅该类型事件,依然能收到该类型事件最近一个 Sticky 事件 ThreadMode PostThread-> onEvent(Object e) 表示此方法在事件发布的线程中执行,如果是在主线程中发布则在主线程中执行,并且不能进行耗时操作

Apache Beam WordCount编程实战及源码解读

概述:Apache Beam WordCount编程实战及源码解读,并通过intellij IDEA和terminal两种方式调试运行WordCount程序,Apache Beam对大数据的批处理和流处理,提供一套先进的统一的编程模型,并可以运行大数据处理引擎上.完整项目Github源码 负责公司大数据处理相关架构,但是具有多样性,极大的增加了开发成本,急需统一编程处理,Apache Beam,一处编程,处处运行,故将折腾成果分享出来. 1.Apache Beam编程实战–前言,Apache B

Gson 源码解读

开源库地址:https://github.com/google/gson 解读版本:2.7 Gson是一个可以用来将Java对象转换为JSON字符串的Java库.当然,它也可以把JSON字符串转换为等价的Java对象.网上已经有了不少可将Java对象转换成JSON的开源项目.但是,大多数都要求你在Java类中加入注解,如果你无法修改源码的话就比较坑爹了,此外大多数开源库并没有对泛型提供完全的支持.于是,Gson在这两个重要的设计目标下诞生了.Gson可以作用于任意的Java对象(包括接触不到源码

【Spark】SparkContext源码解读

SparkContext的初始化 SparkContext是应用启动时创建的Spark上下文对象,是进行Spark应用开发的主要接口,是Spark上层应用与底层实现的中转站(SparkContext负责给executors发送task). SparkContext在初始化过程中,主要涉及一下内容: SparkEnv DAGScheduler TaskScheduler SchedulerBackend SparkUI 生成SparkConf SparkContext的构造函数中最重要的入参是Sp

【转】Retrofit 源码解读之离线缓存策略的实现

Retrofit 源码解读之离线缓存策略的实现 Retrofit 是square公司开发的一款网络框架,也是至今Android网络请求中最火的一个,配合OkHttp+RxJava+Retrofit三剑客更是如鱼得水,公司项目重构时,我也在第一时间使用了RxJava+Retrofit,使用过程中遇到的一些问题,也会在后续的博客中,一点点分享出来,供大家参考! 在项目的过程中,项目需求需要在离线的情况下能够继续浏览app内容,第一时间想到缓存,于是经过各种google搜索,得出以下结论(使用Retr

YYModel 源码解读(二)之NSObject+YYModel.h (1)

本篇文章主要介绍 _YYModelPropertyMeta 前边的内容 首先先解释一下前边的辅助函数和枚举变量,在写一个功能的时候,这些辅助的东西可能不是一开始就能想出来的,应该是在后续的编码过程中 逐步添加的. #define force_inline __inline__ __attribute__((always_inline)) 这行代码用到了C语言的内联函数 内联函数: 是用inline修饰的函数,内联函数在代码层次看和普通的函数结构一样,却不具备函数的性质,内联函数不是在调用时发生控

EventBus3 源码解读

基本概念 EventBus是一款针对Android优化的发布/订阅事件总线库.简便了Activities, Fragments, 以及background threads之间的通信,使发送者与订阅者之间有效解耦. 基本使用 EventBus的使用也极其简单,只需三步即可. 定义一个事件类型. public class MessageEvent { public final String message; public MessageEvent(String message) { this.mes