EventBus源码阅读(三)—— 订阅

  经过订阅之后的类,才会接受到EventBus,post出来的消息。所以今天我们来看一下订阅的流程。

EventBus.getDefault().register(this);
    /**
     * Registers the given subscriber to receive events. Subscribers must call {@link #unregister(Object)} once they
     * are no longer interested in receiving events.
     * <p/>
     * Subscribers have event handling methods that must be annotated by {@link Subscribe}.
     * The {@link Subscribe} annotation also allows configuration like {@link
     * ThreadMode} and priority.
     */
    public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

  这个方法比较简单就是从类中寻找到订阅的方法,然后执行 subscribe(subscriber, subscriberMethod);订阅操作。

  subscribe方法的实现比较长,我们分块儿来看:  

        Class<?> eventType = subscriberMethod.eventType;
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }

        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }

  首先,subscribe中新建了一个Subscription,用来存放订阅者和订阅者的方法。然后,通过订阅者的订阅模式,寻找到该模式下已经存在订阅者队列——Subscriptions。如果是新建的Subscriptions就直接加入,如果已经存在,则抛出异常。接着,以订阅方法的priority为依据,将Subscription加入Subscriptions。

        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);

  这一段代码确定了当前订阅源有没有已经存在的事件列表,如果已经存在,直接加入,如果没有,新建一个加入。typesBySubscriber会用来判断订阅源是否已注册。

        if (subscriberMethod.sticky) {
            if (eventInheritance) {
                // Existing sticky events of all subclasses of eventType have to be considered.
                // Note: Iterating over all events may be inefficient with lots of sticky events,
                // thus data structure should be changed to allow a more efficient lookup
                // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

  最后是一段关于粘性事件的处理,在此不做深究。

  可以看到,EventBus的订阅与发送事件最终的桥梁,就是subscriptionsByEventType。发送事件中

    private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

  订阅过程中,就是将订阅源放入subscriptionsByEventType中。

   

  初步的阅读过程,到这里,下一次来看一看粘性事件的实现。

  Done~

时间: 2024-10-23 12:03:36

EventBus源码阅读(三)—— 订阅的相关文章

EventBus源码阅读(一)

EventBus虽然有诸多缺点,但是作为一个经典的事件总线框架,其代码还是有一些学习价值,从他的代码中可以获得一些启发,运用于开发. EventBus有两个入口,一个是订阅,一个是发送事件.今天从发送事件开始阅读. EventBus.getDefault().post("11"); 进入post方法 /** Posts the given event to the event bus. */ public void post(Object event) { PostingThreadS

SDWebImage源码阅读(三)UIImage+GIF

UIImage+GIF 是UIImage 类的一个GIF 分类,在之前的版本里面这个分类是用了处理GIF 动态图片的但是会有内存暴增的bug.在当前 '4.0.0-beta2' 的版本里GIF 动态图片处理放在了UIImage+MultiFormat  这个分类里面,而当前这个GIF 的分类的功能只是将动态图片作为静态图片来处理,如果是静态图片的NSData 数据则转化为静态UIImage 直接返回,如果是动态图片的NSData 数据,则把图像的第1帧图像转换化为静态UIImage 返回. 首先

EventBus源码阅读(二)——Poster

上一节在阅读了EventBus的消息发送后,停在了postToSubscription方法上: private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { case POSTING: invokeSubscriber(subscription, event);

dubbo源码阅读-服务订阅(八)之远程订阅(dubbo)

说明 从<服务订阅主流程>可以看到根据协议来注册 我们默认没有根据url直接配置所以url是registry SPI扩展就是走的RegistryProtocol RegistryProtocol <1>refer /** * type为订阅接口 * @param type Service class * @param url URL address for the remote service * @param <T> * @return * @throws RpcEx

SparkConf加载与SparkContext创建(源码阅读三)

sparkContext创建还没完呢,紧接着前两天,我们继续探索..作死... 紧接着前几天我们继续SparkContext的创建: 接下来从这里我们可以看到,spark开始加载hadoop的配置信息,第二张图中 new出来的Configuration正是hadoop的Configuration.同时,将所有sparkConf中所有以spark.hadoop.开头的属性都复制到了Hadoop的Configuration.同时又将spark.buffer.size复制为Hadoop的Configu

SpringMVC源码阅读(三)

先理一下Bean的初始化路线 org.springframework.beans.factory.support.AbstractBeanDefinitionReader public int loadBeanDefinitions(Resource... resources) throws BeanDefinitionStoreException { Assert.notNull(resources, "Resource array must not be null"); int c

Struts2源码阅读(一)_Struts2框架流程概述

1. Struts2架构图  当外部的httpservletrequest到来时 ,初始到了servlet容器(所以虽然Servlet和Action是解耦合的,但是Action依旧能够通过httpservletrequest取得请求参数), 然后通过Filter chain,Filter主要包括ActionContextCleanUp,它主要清理当前线程的ActionContext和 Dispatcher:FilterDispatcher主要通过AcionMapper来决定需要调用哪个Actio

《java.util.concurrent 包源码阅读》13 线程池系列之ThreadPoolExecutor 第三部分

这一部分来说说线程池如何进行状态控制,即线程池的开启和关闭. 先来说说线程池的开启,这部分来看ThreadPoolExecutor构造方法: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecut

【原】SDWebImage源码阅读(三)

[原]SDWebImage源码阅读(三) 本文转载请注明出处 —— polobymulberry-博客园 1.SDWebImageDownloader中的downloadImageWithURL 我们来到SDWebImageDownloader.m文件中,找到downloadImageWithURL函数.发现代码不是很长,那就一行行读.毕竟这个函数大概做什么我们是知道的.这个函数大概就是创建了一个SDWebImageSownloader的异步下载器,根据给定的URL下载image. 先映入眼帘的