EventBus3.0 组件通信框架源码学习总结

一、初始化

EventBus的初始化虽然用了单例模式,但是构造方法居然是public修饰符,可能是应对项目中的多线操作。

//单例模式,针对并发情况进行了双层判断
 public static EventBus getDefault() {
        if (defaultInstance == null) {
            synchronized (EventBus.class) {
                if (defaultInstance == null) {
                    defaultInstance = new EventBus();
                }
            }
        }
        return defaultInstance;
 }
 public EventBus() {
        this(DEFAULT_BUILDER);
 }

 EventBus(EventBusBuilder builder) {
        //通过订阅事件的类型来查找订阅者方法的集合。
        //Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType
        subscriptionsByEventType = new HashMap<>();
        //通过订阅者来查找所有订阅事件类型的集合
        //Map<Object, List<Class<?>>> typesBySubscriber;
        typesBySubscriber = new HashMap<>();
        //用于存放指定的粘性事件
        stickyEvents = new ConcurrentHashMap<>();
        mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
        backgroundPoster = new BackgroundPoster(this);
        asyncPoster = new AsyncPoster(this);
        //这个参数本次分析用不到,编译时解析才会使用。
        indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
        subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
                builder.strictMethodVerification, builder.ignoreGeneratedIndex);
        ....
        eventInheritance = builder.eventInheritance;
        executorService = builder.executorService;
 }
  1. subscriptionsByEventType 和 typesBySubscriber 主要用来存放已经找到的订阅者以及订阅方法。
  2. stickyEvents 粘性事件使用方法(sticky = true),标注了粘性事件的方法,在注册时会根据情况向订阅方法发送之前已经推送的消息。
@Subscribe(priority = 1,sticky = true,threadMode = ...)
    public void onMainStoreChange(MainStore.MainStoreChangeEvent event){}

一般情况下,EventBus的注册和解绑是在一下两个方法中,处于后台或者未启动的activity收不到消息,但是注释了粘性事件,会在每次界面启动时收到消息。类似 onActivityResult()功能。

    @Override
    protected void onResume() {
        super.onResume();
        eventBus.register(this);
    }

    @Override
    protected void onPause() {
        super.onPause();
        eventBus.unregister(this);
    }
  1. mainThreadPoster 、backgroundPoster 、asyncPoster 三种推送者,发送事件时会根据 threadMode 来确定使用哪一种Poster,后面会具体分析。
  2. subscriberMethodFinder 这个类就是查找订阅者中所有的订阅方法的类,后面主要分析。
  3. EventBusBuilder 这个建造类,将所有需要定义一些参数全部封装在一个类中,值得学习。

二、注册

 public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        //查找
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                //订阅
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

注册方法的代码不多,做了两件事。

1. 查找订阅事件,任务全部交给 subscriberMethodFinder 来处理,而 subscriberMethods 集合保存了所有订阅事件的封装类,只是得到的集合并没有经过排序、分类等处理,只负责找出来。

 public SubscriberMethod(Method method, Class<?> eventType, ThreadMode threadMode, int priority, boolean sticky) {
     //每个属性对应着 @ subscribe(...)注释上的参数
        this.method = method;
        this.threadMode = threadMode;
        //事件类型
        this.eventType = eventType;
        //优先级
        this.priority = priority;
        //是否是粘性事件
        this.sticky = sticky;
    }

//Subscribe 注释器参数和默认值如下。
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
    ThreadMode threadMode() default ThreadMode.POSTING;

    boolean sticky() default false;

    int priority() default 0;
}

2.订阅事件,将已经得到的订阅事件集合,通过排序、分类等手段赋值给 subscriptionsByEventType 和 typesBySubscriber 这两个集合,并且对粘性事件进行处理。

============

先分析如何查找,下面到了 subscriberMethodFinder 类中的方法了,思路很清晰,就是先从缓存中获取,没有就通过下面的方法查找,最后将找到的结果返回并加入的缓存中,以便下次查找。

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        //先从METHOD_CACHE缓存中取出列表,没有的话就通过对象参数找出对应的订阅方法列表。
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }
        //通过默认的方式实例化EventBus,ignoreGeneratedIndex为false
        if (ignoreGeneratedIndex) {
            //利用反射方法得到订阅方法列表
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            //这个比较难,暂时还没学习。利用编译时查找订阅事件的方式完成,高端
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }

这里一些用于判断的参数都是EventBus初始化变量时通过建造类确定的。ignoreGeneratedIndex 默认为false,所以基本走的就是findUsingInfo()这个方法,接下来看下这个方法具体逻辑。

private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
        //初始化 FindState 对象,利用复用池缓存用过的 FindState 对象,叼
        //FindState 用于检查和保存合格的订阅方法。
        FindState findState = prepareFindState();
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            findState.subscriberInfo = getSubscriberInfo(findState);
            //这个判断目前肯定是空的,由于缺少组件,只能走findUsingReflectionInSingleClass方法。
            if (findState.subscriberInfo != null) {
            //编译时就已经将所有订阅方法找到,这里只需要获取即可。
               ...
            } else {
             //运行时具体查找逻辑,效率比较低下
                findUsingReflectionInSingleClass(findState);
            }
            //移到父类
            findState.moveToSuperclass();
        }
        //这个方法将findState中的订阅方法集合全部提取出来,并释放findState对象。
        return getMethodsAndRelease(findState);
    }

总体来说这个方法就是循环遍历订阅者和其超类,找出所有的订阅方法,然后将方法封装进SubscriberMethod这个类,最后将所有的 SubscriberMethod 类(对应一个订阅方法)全部添加到 findState.subscriberMethods 集合中。这里分析的findUsingInfo() 完全等同 findUsingReflection()方法。

private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {

            //通过反射得到类的所有方法(public 类型),为啥不直接使用getMethods(),上面英文有注释(Activity这种庞大的类,用getDeclaredMethods方法效率更快?)
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {

            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }
        for (Method method : methods) {
            //获取方法类型
            int modifiers = method.getModifiers();

            //只需要public修饰符的方法,MODIFIERS_IGNORE 包含除public以外所有的修饰符
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {

                //得到方法的参数类型
                Class<?>[] parameterTypes = method.getParameterTypes();

                //订阅者的订阅方法只允许拥有一个订阅事件的参数
                if (parameterTypes.length == 1) {

                    // 返回改程序元素上存在的、指定类型的注解,如果该类型注解不存在,则返回null
                    // 判断方法是否为订阅方法
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes[0];

                        //就是过滤掉了超类中的完全相同的订阅事件
                        if (findState.checkAdd(method, eventType)) {
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                   ....
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
               ....
            }
        }
    }

上面代码就是一步步过滤掉不符合要求的方法,最后剩下的 method 就是需要订阅的方法。接下来具体看看findState.checkAdd()方法,这个方法中一些逻辑有点难度。

 boolean checkAdd(Method method, Class<?> eventType) {
            // 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
            // Usually a subscriber doesn‘t have methods listening to the same event type.
            //通常情况下不能存在多个同样参数的订阅方法,不过也能收到同样的消息。
            Object existing = anyMethodByEventType.put(eventType, method);
            //existing 为空代表这个订阅者没有相同参数的订阅方法
            if (existing == null) {
                return true;
            } else {
                //这里处理的就是有多个相同参数的订阅方法的情况,将上一个方法用来比对
                if (existing instanceof Method) {
                    if (!checkAddWithMethodSignature((Method) existing, eventType)) {
                        // Paranoia check
                        throw new IllegalStateException();
                    }
                    // Put any non-Method object to "consume" the existing Method
                    anyMethodByEventType.put(eventType, this);
                }
                //方法不同事件相同的Method都会返回true,Method完全一样的只添加一次。
                //从代码上理解,不同方法名但是订阅事件相同的方法都会收到消息,但是超类有相同的方法,只有子类会收到消息。
                return checkAddWithMethodSignature(method, eventType);
            }
        }

anyMethodByEventType集合只是用于排查是否存在相同订阅事件的方法(从子类开始往超类排查)。如果不存在就将这个订阅方法正常返回,如果存在,就需要通过事件的签名来检查(订阅方法名称>订阅事件名称),使用得是二级检查。意思就是同时对之前anyMethodByEventType已经存在的方法和具有相同事件类型的方法进行检查。

可以确定的是如果方法名一样,前者肯定是子类(不可能一个类中有两个相同的方法)。可以看下checkAddWithMethodSignature()具体逻辑。


        private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
            methodKeyBuilder.setLength(0);
            methodKeyBuilder.append(method.getName());
            methodKeyBuilder.append(‘>‘).append(eventType.getName());
            String methodKey = methodKeyBuilder.toString();

           // 得到目标属性所在类对应的Class对象
            Class<?> methodClass = method.getDeclaringClass();
            Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);

            //1.如果methodClassOld为空,或者是methodClass的超类,返回true
            //2.如果methodClassOld 不为空,代表两个方法完全一样,根据排查逻辑 methodClassOld 不可能是超类,只有可能是子类。
            //所以,必须返回false,因为methodClassOld的方法已经加到集合中,不需要将它的父类方法再一次加进去。
            //保证了完全相同的方法只执行一次,子类完全可以使用super.xxx()。
            //解释的很牵强....
            if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
                // Only add if not already found in a sub class
                return true;
            } else {
                // Revert the put, old class is further down the class hierarchy
                subscriberClassByMethodKey.put(methodKey, methodClassOld);
                return false;
            }
        }

subscriberClassByMethodKey 集合key对应上面所说到的签名,key值相同代表有两个相同的订阅方法。代码注释很详细。所以checkAdd方法仅仅过滤掉子类和超类中完全相同的订阅方法,仅保留子类中的方法。

这样findState.subscriberMethods集合就保存了所有有效的订阅方法啦。

无论是 findUsingInfo 还是 findUsingReflection方法最后都是调用下面的方法返回集合的。

  private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
        List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
        //释放findstate对象的变量
        findState.recycle();
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i++) {
                if (FIND_STATE_POOL[i] == null) {
                //findstate复用池还有空间就将对象添加进去
                    FIND_STATE_POOL[i] = findState;
                    break;
                }
            }
        }
        return subscriberMethods;
    }

========

再回到eventBus的注册方法,这时候subscriberMethods 集合的值就知道怎么来的了。

public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

查找逻辑完了,在看看subscribe()方法吧,每次订阅方法都要执行一次这个方法。这个循环订阅的过程必须受线程保护的。

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        Class<?> eventType = subscriberMethod.eventType;

        //Subscription二次封装了订阅者对象和 subscriberMethod,并且put到 CopyOnWriteArrayList 集合。
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod); 

        //CopyOnWriteArrayList 适用于多线程并发操作,集合中存储的对象是以订阅者(类名.class)划分的
        //每个相同的eventType对应一个CopyOnWriteArrayList 集合
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            //subscriptions 不为null代表有相同订阅事件类型的 Subscription 对象。
            //如果在相同订阅事件类型前提下,订阅对象和订阅方法完全一样,就抛异常。Subscription对象中有具体的 equals()判断方法
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }

        //根据注释中的 priority 值大小对相同订阅事件类型的 newSubscription 进行排序,
        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }
        //typesBySubscriber 每个订阅对象对应一个集合
        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);

        //只有sticky = true的事件,才会执行下面的方法。
        if (subscriberMethod.sticky) {
            //只有在注册之前调用了postSticky()方法,下面的post才会有效
            if (eventInheritance) {

                //如果class A extends B,A 事件为粘性事件,参数为 A 和 B订阅方法都能能收到 A 对象的消息。
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                //eventInheritance = false情况,参数为 B 的注释方法是收不到消息的。
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

1.Subscription 是订阅者和订阅方法(SubscriberMethod)的封装对象。

2.这时候 subscriptionsByEventType 集合终于用上了,以事件类型为key、subscriptions集合为value。

3.CopyOnWriteArrayList subscriptions,事件类型相同的Subscription对象都会添加到这个集合。

查看CopyOnWriteArrayList.contains(obj)方法源码,发现用的是obj.equals(obj1)方法来判断的。

Subscription 类中的equals方法(注释很详细):

@Override
    public boolean equals(Object other) {
        //在EventBus.subscribe方法中用于判断 Subscription 集合中是否已经添加过相同的 Subscription对象
        if (other instanceof Subscription) {
            Subscription otherSubscription = (Subscription) other;
            return subscriber == otherSubscription.subscriber
                    && subscriberMethod.equals(otherSubscription.subscriberMethod);
        } else {
            return false;
        }
    }

4.typesBySubscriber集合也用上了,以订阅对象为key,事件类型(eventType)为value。

5.stickyEvents 粘性事件,每次发送粘性事件否会保存在这个集合中,没发送出去不会被销毁。

subscribe 方法中首先对 subscriptionsByEventType 集合赋值,然后在对 typesBySubscriber 集合赋值,具体逻辑代码写的很详细,最后就是对粘性事件的处理了(发送粘性事件:postSticky() 不是 post())。

 public void postSticky(Object event) {
        synchronized (stickyEvents) {
            stickyEvents.put(event.getClass(), event);
        }
        // Should be posted after it is putted, in case the subscriber wants to remove immediately
        post(event);
    }

而checkPostStickyEventToSubscription()方法就是具体发送消息的逻辑了,这块和post逻辑一起分析。

三、发送消息

postSticky()方法已经贴出来了,逻辑非常简单,下面看看post()方法。

  public void post(Object event) {
        //currentPostingThreadState 为 ThreadLocal对象,多线程调用时为每个线程单独维护PostingThreadState 对象
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);

        if (!postingState.isPosting) {
            postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while (!eventQueue.isEmpty()) {
                    //发送同时销毁订阅事件,这点和粘性事件是不一样的。
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

post()方法逻辑也是很清晰的,通过获取ThreadLocal维护的 PostingThreadState 对象,对其进行一些状态的初始化。postSingleEvent()方法应该就是具体的发送逻辑了吧。

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        //是否支持订阅事件继承关系
        if (eventInheritance) {
            //查找所有订阅事件及其超类的超接口
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                //遍历查找到的class,与subscriptionsByEventType集合中的key进行匹配,并发送
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
           //只发送订阅事件本身
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        //如果并没有匹配到相应的订阅方法,就发送一个通知
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                Log.d(TAG, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }

方法中注释比较详细了,直接看postSingleEventForEventType()方法:

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            //从subscriptionsByEventType集合中查找是否存在这个事件
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            //将事件发送给所有匹配到的方法
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                   //根据线程模式选择发送
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

通过 eventClass 去 subscriptionsByEventType 集合中查找,如果存在就取出这些方法,依次进行发送。

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        //通过 threadMode 确定应该在哪条线程中发送消息
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

POSTING 在什么线程上发送就在什么线程接收。

MAIN 无论在哪条线程发送,都需要在主线程接收。

BACKGROUND 与 MAIN 模式正好相反。

ASYNC 同样是异步发送,但是只发送一条消息。

postToSubscription方法写的很清楚,只有在MAIN 和 BACKGROUND 模式下对当前前程进行判断,比如在MAIN 模式下,如果在主线程发送消息了,直接调用invokeSubscriber()方法就好了,不需要特意用handler来向主线程发送消息。

1、先看看invokeSubscriber方法,后面再分析backgroundPoster 和 mainThreadPoster 。

 void invokeSubscriber(Subscription subscription, Object event) {
        try {
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }

invokeSubscriber()就是通过反射调用指定subscriber(订阅者)的method(订阅方法),参数就是发送的事件。

2、 mainThreadPoster.enqueue()方法逻辑分析起来比较困难,但是学到了很多,首先mainThreadPoster继承自Handler,而handler可以通过初始化传参Looper.MainLooper()让消息发送到主线程中去,这里也正是这么做的。


    void enqueue(Subscription subscription, Object event) {
        //将subscription 和 消息事件对象封装到 PendingPost 中。

        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            //入队操作
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                //从队列中取出 PendingPost 通过eventBus 对象直接发送。
                //每次取出一个 header,会将下一个待发送 PendingPost 赋值给 header,直到发送完
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                //在主线程跳用invokeSubscriber()方法,完成对消息的分发。
                eventBus.invokeSubscriber(pendingPost);
                //在规定时间内没发送完,退出本次任务,重新执行handleMessage()接着发送
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }

先总体概括下enqueue () 和 handleMessage()的关系。

enqueue () 和 handleMessage()不在一个线程中执行,handleMessage()肯定在主线程了。enqueue ()进行了入队的操作,而handleMessage()从队列中取出事件来发送。这两个方法中通过handlerActive 形成互锁,酷似生产者和消费者模式。

入队的逻辑看了很久才懂,首先是直接将所有参数全部封装进PendingPost 对象中,再入队。PendingPostQueue( 待发送队列 )在这几种模式中除了直接发送外都需要使用,提供了入队和出队的逻辑方法,支持多线程操作。

final class PendingPostQueue {
    private PendingPost head; //头指针
    private PendingPost tail; //尾巴指针

    synchronized void enqueue(PendingPost pendingPost) {
        //入队逻辑,通过单向链表方式添加 PendingPost
        if (pendingPost == null) {
            throw new NullPointerException("null cannot be enqueued");
        }
        if (tail != null) {
           //后面依次让tail.next指向下一个tail
           //如此一来每个tail中的next都有下一个元素的引用。
            tail.next = pendingPost;
            //让tail指向下一个堆空间
            tail = pendingPost;
        } else if (head == null) {
        //添加第一个节点,让head 和 tail指向同一个堆空间。后面在tail还没指向别的堆空间时,将下个元素节点的引用赋值tail内的next变量,这样head也就拥有了下一个引用了。以此类推
            head = tail = pendingPost;
        } else {
            throw new IllegalStateException("Head present, but no tail");
        }
        notifyAll();
    }

    synchronized PendingPost poll() {
        //出队逻辑
        PendingPost pendingPost = head;
        if (head != null) {
            head = head.next;
            if (head == null) {
                tail = null;
            }
        }
        return pendingPost;
    }

    synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
        if (head == null) {
            wait(maxMillisToWait);
        }
        return poll();
    }

}

整个类都贴出来了。发送消息任务已经在handleMessage()方法中完成了,反正最后就是调用invokeSubscriber()方法。

3. backgroundPoster 和 asyncPoster 都是利用子线程发送消息的。有啥不同?看看关键代码:

asyncPoster :

@Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }

backgroundPoster :

 @Override
    public void run() {
        try {
            try {
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }

入队就不贴了,一个有锁一个无锁,都是继承Runable,backgroundPoster 类似 mainThreadPoster那种处理方式,而asyncPoster 就是来一个发一个。

========================================================================

四、解绑

  public synchronized void unregister(Object subscriber) {
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                //subscribeByEventType集合内的有关信息进行回收
                unsubscribeByEventType(subscriber, eventType);
            }
            //eventBus 基本就这两个集合在维护订阅者信息
            typesBySubscriber.remove(subscriber);
        } else {
            Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }

回收就是维护订阅者信息的两个集合对相关信息进行回收释放。typesBySubscriber 集合到现在没见在哪里用过?除了用于解绑操作,还有就是用于判断当前类是否已经注册过。

 public synchronized boolean isRegistered(Object subscriber) {
        return typesBySubscriber.containsKey(subscriber);
 }

几位大神的分析博客:

1 . http://skykai521.github.io/2016/02/20/EventBus-3-0%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/

2 . http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2016/0417/4152.html

时间: 2024-10-22 16:48:08

EventBus3.0 组件通信框架源码学习总结的相关文章

不包含通信框架源码

源码 (不包含通信框架源码,通信框架源码请另行下载)上一篇文章写了如何通过TCP通信发送图片到客户端,有朋友问如何传送文件,本文将就如何发送文件进行探讨.对于比较小的文件,可以把文件转化成字节形式,用契约类包装一下,服务器收到后,再把字节转化成文件即可,这也是本文中实现的方式,这种方式的优点是比较简... 处理客户端发来的文件 private void IncomingUploadFile(PacketHeader header, Connection connection, FileWrapp

Bottle 框架源码学习 三

def run(app=None, server='wsgiref', host='127.0.0.1', port=8080,         interval=1, reloader=False, quiet=False, plugins=None,         debug=None, **kargs): 今天要学习一下bottle里是怎样打印debug信息的 run函数的倒数第二个参数是debug,默认为None try:     if debug is not None: _debu

Bottle 框架源码学习 二

上一篇简单分析了route的基本用法 本篇分析一下run函数的运行原理 def run(app=None, server='wsgiref', host='127.0.0.1', port=8080,         interval=1, reloader=False, quiet=False, plugins=None,         debug=None, **kargs):          if NORUN: return     if reloader and not os.env

PHP框架CodeIgniter CI框架源码学习笔记-index.php一切的入口

CI框架(CodeIgniter)的基本执行流程图以备参考: index.php作为CI框架的入口文件,源码阅读,自然由此开始.

ABP框架源码学习之修改默认数据库表前缀或表名称

1,源码 1 namespace Abp.Zero.EntityFramework 2 { 3 /// <summary> 4 /// Extension methods for <see cref="DbModelBuilder"/>. 5 /// </summary> 6 public static class AbpZeroDbModelBuilderExtensions 7 { 8 /// <summary> 9 /// Chan

Bottle 框架源码学习 一

# -*- coding=utf-8 -*- from bottle import route, run, template,Bottle app = Bottle() @route("/hello/<name>") def index(name):     return template("<b>Hello, {{name}}</b>", name=name) run(app, host="localhost"

CI框架源码学习笔记6——Config.php

接着上一节往下,我们这一节来看看配置类Config.php,对应手册内容http://codeigniter.org.cn/user_guide/libraries/config.html. class CI_Config { //所有已经加载的配置项组成的数组 public $config = array(); //所有已经加载的配置文件组成的数组 public $is_loaded = array(); //用来搜索配置文件的路径数组 public $_config_paths = arra

mybatis框架源码学习

转:来自https://my.oschina.net/u/1458864/blog/293659 摘要:初始化mybatis,所有的配置都在configuation 对象中使用mybatis,从sqlsessionfactory 工厂中获取sqlsession,从configuation对象中获取mapper对象,并返回结果,mybatis在加载mapper的时候对mapper接口的注解进行解析,重要的几个包:io,session,builder,mapper(annotations,bindi

MVC系列——MVC源码学习:打造自己的MVC框架(四:自定义视图)

前言:通过之前的三篇介绍,我们基本上完成了从请求发出到路由匹配.再到控制器的激活,再到Action的执行这些个过程.今天还是趁热打铁,将我们的View也来完善下,也让整个系列相对完整,博主不希望烂尾.对于这个系列,通过学习源码,博主也学到了很多东西,在此还是把博主知道的先发出来,供大家参考. 本文原创地址:http://www.cnblogs.com/landeanfen/p/6019719.html MVC源码学习系列文章目录: MVC系列——MVC源码学习:打造自己的MVC框架(一) MVC