EventBus框架源码分析

开源项目

上周又手动撸了一遍EventBus实现,同时上传EventBus的中文注释源码到Github上,欢迎大家fork&star.

EventBusAnalysis

EventBus

基础概念

EventBus是一个Android事件发布/订阅框架,通过解耦发布者和订阅者简化Android事件传递.事件传递既可以用于Android四大组件间的通讯,也可以用于用户异步线程和主线程间通讯等.

传统的事件传递方法包括:Handler,BroadCastReceiver,interface回调,相比于EventBus,EventBus的代码更加简洁,代码简单,而且事件发布和订阅充分解耦.

基本概念如下:

  • 事件(Event): 可以称为消息,其实就是一个对象.事件类型(EventType)指事件所属的Class.
  • 订阅者(Subscriber): 订阅某种事件类型的对象.当有发布者发布这类事件后,EventBus会执行订阅者的被Subscribe注解修饰的函数,这个函数叫事件响应函数.订阅者通过register接口订阅某个事件类型,unregister接口退订.订阅者存在优先级,优先级高的订阅者可以取消事件继续向优先级低的订阅者分发,默认所有订阅者优先级都为0.
  • 发布者(Publisher): 发布某事件的对象,通过EventBus.getDefault.post方法发布事件.

构造EventBus

EventBus的默认构造方法如下:

EventBus.getDefault();

源码如下:

/** 通过volatile保证每个线程获取的都是最新的EventBus. */
static volatile EventBus defaultInstance;

/** 懒汉的单例模式构造EventBus. */
public static EventBus getDefault() {
    if (defaultInstance == null) {
        synchronized (EventBus.class) {
            if (defaultInstance == null) {
                defaultInstance = new EventBus();
            }
        }
    }
    return defaultInstance;
}

private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
public EventBus() {
    this(DEFAULT_BUILDER);
}

EventBusBuilder.java

再去了解EventBus具体构造函数之前,需要先看一下EventBusBuilder的具体内容,中文注释源码如下:

/**
 * 构建器模式
 * Effective Java : 遇到多个构造器参数时要考虑用构造器.
 */
@SuppressWarnings("unused")
public class EventBusBuilder {
    private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    /** 是否监听异常日志. */
    boolean logSubscriberExceptions = true;

    /** 如果没有订阅者,显示log信息. */
    boolean logNoSubscriberMessages = true;

    /** 是否发送监听到的异常. */
    boolean sendSubscriberExceptionEvent = true;

    /** 如果没有订阅者,就发布一条默认事件. */
    boolean sendNoSubscriberEvent = true;

    /** 如果失败,则抛出异常. */
    boolean throwSubscriberException;

    /** 是否响应订阅事件的父类事件. */
    boolean eventInheritance = true;
    boolean ignoreGeneratedIndex;

    /** 是否为严格模式.值为true时,当Subscribe注解描述的响应函数不符合要求时,会抛出相应的异常. */
    boolean strictMethodVerification;

    /** 线程池. */
    ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;

    /** 从命名来看,含义是不遍历的Method响应函数集合,但是没啥软用,EventBus3.0版本也没有遍历这个集合. */
    List<Class<?>> skipMethodVerificationForClasses;
    List<SubscriberInfoIndex> subscriberInfoIndexes;

    EventBusBuilder() {
    }

    /** Default: true. */
    public EventBusBuilder logSubscriberExceptions(boolean logSubscriberExceptions) {
        this.logSubscriberExceptions = logSubscriberExceptions;
        return this;
    }

    /** Default: true. */
    public EventBusBuilder logNoSubscriberMessages(boolean logNoSubscriberMessages) {
        this.logNoSubscriberMessages = logNoSubscriberMessages;
        return this;
    }

    /** Default: true. */
    public EventBusBuilder sendSubscriberExceptionEvent(boolean sendSubscriberExceptionEvent) {
        this.sendSubscriberExceptionEvent = sendSubscriberExceptionEvent;
        return this;
    }

    /** Default: true. */
    public EventBusBuilder sendNoSubsciberEvent(boolean sendNoSubscriberEvent) {
        this.sendNoSubscriberEvent = sendNoSubscriberEvent;
        return this;
    }

    /**
     * Fails if an subscriber throws an exception (default: false).
     */
    public EventBusBuilder throwSubscriberException(boolean throwSubscriberException) {
        this.throwSubscriberException = throwSubscriberException;
        return this;
    }

    /**
     * By default, EventBus considers the event class hierarchy
     * (subscribers to super classes will be notified).
     */
    public EventBusBuilder eventInheritance(boolean eventInheritance) {
        this.eventInheritance = eventInheritance;
        return this;
    }

    /**
     * Provide a custom thread pool to EventBus used for async and background event delivery.
     */
    public EventBusBuilder executorService(ExecutorService executorService) {
        this.executorService = executorService;
        return this;
    }

    public EventBusBuilder skipMethodVerificationFor(Class<?> clazz) {
        if (skipMethodVerificationForClasses == null) {
            skipMethodVerificationForClasses = new ArrayList<>();
        }
        skipMethodVerificationForClasses.add(clazz);
        return this;
    }

    /**
     * Forces the use of reflection even if there‘s a generated index.(default: false)
     */
    public EventBusBuilder ignoreGeneratedIndex(boolean ignoreGeneratedIndex) {
        this.ignoreGeneratedIndex = ignoreGeneratedIndex;
        return this;
    }

    /** Default: false. */
    public EventBusBuilder strictMethodVerification(boolean strictMethodVerification) {
        this.strictMethodVerification = strictMethodVerification;
        return this;
    }

    /**
     * Adds an index generated by EventBus‘ annotation preprocessor.
     */
    public EventBusBuilder addIndex(SubscriberInfoIndex index) {
        if (subscriberInfoIndexes == null) {
            subscriberInfoIndexes = new ArrayList<>();
        }
        subscriberInfoIndexes.add(index);
        return this;
    }

    /**
     * Installs the default EventBus returned by {@link EventBus#getDefault()}
     * using this builder‘s values.
     */
    public EventBus installDefaultEventBus() {
        synchronized (EventBus.class) {
            if (EventBus.defaultInstance != null) {
                throw  new EventBusException("Default instance already exists." +
                        "It may be only set once before it‘s used the first time to " +
                        "ensure consistent behavior.");
            }
            EventBus.defaultInstance = build();
            return EventBus.defaultInstance;
        }
    }

    /**
     * Builds an EventBus based on the current configuration.
     */
    public EventBus build() {
        return new EventBus(this);
    }
}

EventBus的构造函数

了解了EventBusBuilder的构造器模式之后,我们就可以去看一下EventBus的默认构造函数了.

/** Map<订阅事件, 订阅该事件的订阅者集合> */
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;

/** Map<订阅者, 订阅事件集合> */
private final Map<Object, List<Class<?>>> typesBySubscriber;

/** Map<订阅事件类类型,订阅事件实例对象>. */
private final Map<Class<?>, Object> stickyEvents;

/** 主线程Handler实现类. */
private final HandlerPoster mainThreadPoster;

/** 继承Runnable的异步线程处理类,将订阅函数的执行通过Executor和队列机制在后台一个一个的执行. */
private final BackgroundPoster backgroundPoster;

/** 继承Runnable的异步线程处理类, 与BackgroundPoster不同的是,订阅函数的执行是并发进行的. */
private final AsyncPoster asyncPoster;

private final int indexCount;

/** 订阅者响应函数信息存储和查找类. */
private final SubscriberMethodFinder subscriberMethodFinder;

/** 用于订阅函数后台执行的线程池. */
private final ExecutorService executorService;

/** EventBusBuilder构造器中的同一成员. */
private final boolean throwSubscriberException;
private final boolean logSubscriberExceptions;
private final boolean logNoSubscriberMessages;
private final boolean sendSubscriberExceptionEvent;
private final boolean sendNoSubscriberEvent;
private final boolean eventInheritance;

EventBus(EventBusBuilder builder) {
    subscriptionsByEventType = new HashMap<>();
    typesBySubscriber = new HashMap<>();
    stickyEvents = new ConcurrentHashMap<>();
    mainThreadPoster = new HandlerPoster(this, Looper.myLooper(), 10);
    backgroundPoster = new BackgroundPoster(this);
    asyncPoster = new AsyncPoster(this);
    indexCount = builder.subscriberInfoIndexes != null ?
            builder.subscriberInfoIndexes.size() : 0;
    subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
            builder.strictMethodVerification, builder.ignoreGeneratedIndex);
    logSubscriberExceptions = builder.logSubscriberExceptions;
    logNoSubscriberMessages = builder.logNoSubscriberMessages;
    sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
    sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
    throwSubscriberException = builder.throwSubscriberException;
    eventInheritance = builder.eventInheritance;
    executorService = builder.executorService;
}

了解了EventBus的构造函数,那接下来,我们就要进入EventBus的注册流程和发送事件流程了.

注册流程

订阅者向EventBus注册时,自身首先需要完成两件事情:

1. 订阅者本身需要声明只有一个参数的public方法,并且使用Subscribe进行注解.

2. 订阅者需要调用EventBus的register方法进行注册.

示例代码如下:

public class MessageEvent {}

class Subscriber extents Activity{
    /** 声明订阅函数. */
    @Subscibe(threadMode = ThreadMode.MAIN)
    public void showToast(MessageEvent event) {
        Toast.makeText(this, "show toast", Toast.LENGTH_LONG).show();
    }

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        /** 注册当前订阅者类. */
        EvnetBus.getDefault().register(this);
    }
}

针对上述两个必须完成的事情,我们分别进行讲解.

Subscribe.java

Subscribe注解是EventBus3.0版本之后添加的,用来标识当前订阅者类中的订阅函数.

之前EventBus的版本是遍历onEvent事件开头的函数来作为订阅函数,有很多局限性(例如函数命名等),使用注解更加规范而且更加灵活一些.

Subscribe注解的中文注释源码如下:

/** 注解的生命周期是:RUNTIME,注解对象是:Method,并且可以被javadoc等工具文档化. */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
    /** 标记订阅方法所在线程的模型. */
    ThreadMode threadMode() default ThreadMode.POSTING;

    /** 标记订阅方法是否为sticky类型. */
    boolean sticky() default false;

    /** 标记订阅方法的优先级. */
    int priority() default 0;
}

注册函数实现

注册函数的源码实现如下:

/** 订阅事件. */
public void register(Object subscriber) {
    // 获取订阅者类的类类型.
    Class<?> subscriberClass = subscriber.getClass();
    // 通过反射机制获取订阅者全部的响应函数信息.
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.
            findSubscriberMethods(subscriberClass);
    // 构造订阅函数-订阅事件集合 与 订阅事件-订阅函数集合
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            subscribe(subscriber, subscriberMethod);
        }
    }
}

订阅函数的实现稍微复杂一些,这也是EventBus的核心所在,我们逐步分析每一个函数的具体实现.

第一步,获取订阅者的类类型,没啥好说的,获取类类型之后,就能通过Java的反射机制来充分获取这个类的具体信息.

第二步,通过SubscriberMethodFinder类来解析订阅者类,获取所有的订阅函数集合.

第三步,遍历订阅函数,构造订阅函数与订阅事件的双重映射.

流程虽然很简单,但是为了深入剖析其EventBus的注册具体实现,接下来我们针对二和三步进行详细讲解,包括其涉及的自定义类实现.

SubscriberMethod.java

第二步中,通过SubscriberMethodFinder类的findSubscriberMethods方法,获取了当前订阅者的所有订阅函数信息集合.

而SubscriberMethod类就是对订阅函数的抽象,中文注释源码如下:

/**
 * 订阅者事件响应函数信息.
 */
public class SubscriberMethod {
    /** 响应函数方法的类类型,可通过invoke方法对该方法进行调用. */
    final Method method;

    /** 函数运行所在的线程的线程类型. */
    final ThreadMode threadMode;

    /** 订阅事件的类型,也是订阅函数第一个形参的参数类型. */
    final Class<?> eventType;

    /** 响应函数的优先级. */
    final int priority;

    /** 是否为sticky响应函数. */
    final boolean sticky;

    /** Used for efficient comparison */
    String methodString;

    public SubscriberMethod(Method method, Class<?> eventType, ThreadMode threadMode, int priority,
                            boolean sticky) {
        this.method = method;
        this.threadMode = threadMode;
        this.eventType = eventType;
        this.priority = priority;
        this.sticky = sticky;
    }

    @Override
    public boolean equals(Object other) {
        if (other == this) {
            return true;
        } else if (other instanceof SubscriberMethod) {
            checkMethodString();
            SubscriberMethod otherSubscriberMethod = (SubscriberMethod) other;
            otherSubscriberMethod.checkMethodString();
            return methodString.equals(otherSubscriberMethod.methodString);
        } else {
            return false;
        }
    }

    /**
     * 构造methodString,构造方法:${methodClassName}#${methodName}.
     */
    @SuppressWarnings("StringBufferReplaceableByString")
    private synchronized void checkMethodString() {
        if (methodString == null) {
            StringBuilder builder = new StringBuilder(64);
            builder.append(method.getDeclaringClass().getName());
            builder.append("#").append(method.getName());
            builder.append("(").append(eventType.getName());
            methodString = builder.toString();
        }
    }

    @Override
    public int hashCode() {
        return method.hashCode();
    }
}

这个抽象很简单,而且和Subscriber注解其实是一一对应的,只是多了一个eventType成员属性和equals比较方法.

其中:

  • eventType:是该订阅函数唯一的参数的类类型,也就是订阅函数对应的订阅事件的类型.
  • equals: 该方法通过构造{methodClassName}#{methodName}字符串来比较两个订阅函数是否相同.

补充一个ThreadMode的中文注解,ThreadMode就是当前订阅函数执行所在的线程类型,包括:

package org.greenrobot.eventbus;

/** 响应函数执行时所在线程的类型. */
public enum ThreadMode {
    /** 响应函数需要运行的线程和发送响应事件的线程为同一线程. */
    POSTING,

    /** 响应函数需要运行在主线程. */
    MAIN,

    /** 响应函数需要运行的线程为后台线程,且根据优先级等进行排队,后台顺序执行. */
    BACKGROUND,

    /** 响应函数需要运行的线程为后台线程,可并发执行. */
    ASYNC
}

SubscriberMethodFinder.java

在介绍SubscriberMethodFinder源码之前,需要说明一下,EventBus3.0版本提供了EventBusAnnotationProcessor这个类,用于在编译期获取并缓存@Subscribe注解的方法.这里由于篇幅和介绍EventBus主要功能的关系,省略这部分代码的讲解.

只介绍SubscriberMethodFinder在运行时是如何通过反射获取订阅者类中的订阅函数信息的.

SubscriberMethodFinder类的中文注解如下:

/**
 * 订阅者响应函数发现类.
 */
@SuppressWarnings({"unused", "FieldCanBeLocal"})
class SubscriberMethodFinder {
    /** 这两种是编译器添加的方法,因为需要忽略. */
    private static final int BRIDGE = 0X40;
    private static final int SYNTHETIC = 0X1000;

    /** 定义需要忽略方法的修饰符. */
    private static final int MODIFIERS_IGNORE = Modifier.ABSTRACT | Modifier.STATIC
            | BRIDGE | SYNTHETIC;

    /** 线程安全的缓存Map,存储的键值对为<订阅者类类型,订阅者方法信息集合>. */
    private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE =
            new ConcurrentHashMap<>();

    /** 这里我们只介绍ignoreGeneratedIndex为true的情况,即只在运行时分析订阅者类的订阅函数信息. */
    private final boolean ignoreGeneratedIndex;

    /** ignoreGeneratedIndex为true时,subscriberInfoIndexes为空集合. */
    private List<SubscriberInfoIndex> subscriberInfoIndexes;
    private final boolean strictMethodVerification;

    /** 对象缓冲池的默认大小. */
    private static final int POOL_SIZE = 4;

    /** FindState对象缓冲池. */
    private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];

    SubscriberMethodFinder(List<SubscriberInfoIndex> subscriberInfoIndexes,
                           boolean strictMethodVerification, boolean ignoreGeneratedIndex) {
        this.subscriberInfoIndexes = subscriberInfoIndexes;
        this.strictMethodVerification = strictMethodVerification;
        this.ignoreGeneratedIndex = ignoreGeneratedIndex;
    }

    List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        // 先从METHOD_CACHE中看是否有缓存.key:订阅者的类类型.value:订阅者的订阅方法信息集合.
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }

        // 通过反射来获取订阅者的订阅方法信息集合.
        subscriberMethods = findUsingReflection(subscriberClass);

        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber" + subscriberClass + " and its super classes have no " +
                    "public methods with the @Subscribe annotation");
        } else {
            // 在METHOD_CACHE中缓存订阅者类类型-订阅方法信息集合.
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }

    private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
        FindState findState = prepareFindState();
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            // 通过反射获取订阅函数集合.
            findUsingReflectionInSingleClass(findState);
            // 检查订阅者类的父类中是否有符合条件的订阅函数.
            findState.moveToSuperclass();
        }
        return getMethodsAndRelease(findState);
    }

    /** 对象池,复用FindState对象,防止对象被多次new或者gc. */
    private FindState prepareFindState() {
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i ++) {
                FindState state = FIND_STATE_POOL[i];
                if (state != null) {
                    FIND_STATE_POOL[i] = null;
                    return state;
                }
            }
        }

        // 说明对象池中没有被new出的FindState对象,所以需要手动new一个出来,返回给调用者.
        return new FindState();
    }

    /** 通过反射来解析订阅者类获取订阅函数信息. */
    private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
            // 获取订阅者所有声明的方法
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }

        for (Method method : methods) {
            // 获取当前方法的语言修饰符
            int modifiers = method.getModifiers();
            // 订阅方法必须是public而且不能在被忽略的修饰符集合中[abstract,static,bridge,synthetic].
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                // 获取指定方法的形参类型集合
                Class<?>[] parameterTypes = method.getParameterTypes();
                // EventBus从3.0版本之后,订阅函数的标准改为有Subscribe注解修饰
                // 并且只有一个参数,参数类型作为订阅事件的参数类型.
                if (parameterTypes.length == 1) {
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes[0];
                        if (findState.checkAdd(method, eventType)) {
                            // 封装订阅函数到FindState的subscriberMethods数组中.
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            findState.subscriberMethods.add(new SubscriberMethod(
                                    method, eventType, threadMode, subscribeAnnotation.priority(),
                                    subscribeAnnotation.sticky()
                            ));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    // 订阅函数的参数必须只有一个,也就说一个订阅函数只能订阅一个事件.
                    String methodName = method.getDeclaringClass().getName() +
                            "." + method.getName();
                    throw new EventBusException("@Subscribe method" + methodName +
                            " must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                // 订阅函数必须是public的,而且不能有[abstract,static,bridge,synthetic]修饰符.
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName + " is a illegal @Subscribe method: " +
                        "must be public, non-static, and non-abstract");
            }
        }
    }

    /** 返回订阅函数List,并释放FindState到对象缓冲池中. */
    private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
        List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
        findState.recycle();
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i ++) {
                if (FIND_STATE_POOL[i] == null) {
                    FIND_STATE_POOL[i] = findState;
                    break;
                }
            }
        }
        return subscriberMethods;
    }

    /** 用来对订阅方法进行校验和保存. */
    static class FindState {
        final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
        final Map<Class, Object> anyMethodByEventType = new HashMap<>();
        final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
        final StringBuilder methodKeyBuilder = new StringBuilder(128);
        Class<?> subscriberClass;
        Class<?> clazz;
        boolean skipSuperClasses;
        SubscriberInfo subscriberInfo;

        void initForSubscriber(Class<?> subscriberClass) {
            this.subscriberClass = clazz = subscriberClass;
            skipSuperClasses = false;
            subscriberInfo = null;
        }

        void recycle() {
            subscriberMethods.clear();
            anyMethodByEventType.clear();
            subscriberClassByMethodKey.clear();
            methodKeyBuilder.setLength(0);
            subscriberClass = null;
            clazz = null;
            skipSuperClasses = false;
            subscriberInfo = null;
        }

        boolean checkAdd(Method method, Class<?> eventType) {
            Object existing = anyMethodByEventType.put(eventType, method);
            if (existing == null) {
                return true;
            } else {
                if (existing instanceof Method) {
                    if (!checkAddWithMethodSignature((Method)existing, eventType)) {
                        throw new IllegalStateException();
                    }
                    anyMethodByEventType.put(eventType, this);
                }
                return checkAddWithMethodSignature(method, eventType);
            }
        }

        private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
            methodKeyBuilder.setLength(0);
            methodKeyBuilder.append(method.getName());
            methodKeyBuilder.append(‘>‘).append(eventType.getName());
            String methodKey = methodKeyBuilder.toString();
            Class<?> methodClass = method.getDeclaringClass();
            Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
            if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
                return true;
            } else {
                subscriberClassByMethodKey.put(methodKey, methodClassOld);
                return false;
            }
        }

        /** 获取clazz父类的类类型. */
        void moveToSuperclass() {
            if (skipSuperClasses) {
                clazz = null;
            } else {
                clazz = clazz.getSuperclass();
                String clazzName = clazz.getName();
                if (clazzName.startsWith("java.") || clazzName.startsWith("javax.") ||
                        clazzName.startsWith("android.")) {
                    clazz = null;
                }
            }
        }
    }
}

subscribe()方法

回到EventBus类的register方法中,当通过EventBusMethodFinder获取到订阅函数集合后,下一步就是通过subscribe方法对订阅函数和订阅事件做双重映射.

在介绍subscribe方法之前,先讲代码中使用到的抽象类讲解一下.

Subscription.java

一个订阅者可能会有多个订阅函数,每个订阅函数对应着订阅事件.Subscription类是对<订阅者-其中一个订阅函数>的抽象.

Subscription中文注释代码如下:

/**
 * 订阅者信息,包含订阅者对象,订阅者中一个订阅函数.
 */
final class Subscription {
    /** 订阅者的实例化对象. */
    final Object subscriber;

    /** 订阅者的订阅函数信息. */
    final SubscriberMethod subscriberMethod;

    volatile boolean active;

    Subscription(Object subscriber, SubscriberMethod subscriberMethod) {
        this.subscriber = subscriber;
        this.subscriberMethod = subscriberMethod;
        active = true;
    }

    @Override
    public boolean equals(Object other) {
        if (other instanceof Subscription) {
            Subscription otherSubscription = (Subscription) other;
            return subscriber == otherSubscription.subscriber &&
                    subscriberMethod.equals(otherSubscription.subscriberMethod);
        } else {
            return false;
        }
    }

    @Override
    public int hashCode() {
        return subscriber.hashCode() + subscriberMethod.methodString.hashCode();
    }
}

介绍完Subscription类,我们来看一下subscribe的具体流程.

subscribe()方法

中文注释代码如下:

/**
 * 构造两个集合.
 * 1. 订阅事件->订阅者集合.
 * 2. 订阅者->订阅事件集合.
 * @param subscriber 订阅者
 * @param subscriberMethod 订阅者中的响应函数
 */
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    Class<?> eventType = subscriberMethod.eventType;
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    // 一个Event事件可能会被多个订阅者订阅,因此这里使用Map结构,存储Event事件对应的订阅者集合.
    // 此外,一个订阅者类中可能会有多个订阅函数,有几个订阅函数这里就解析成有几个订阅者.
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass()
                    + " already registered to event " + eventType);
        }
    }

    // 按照方法优先级从高到低的顺序将订阅者加入到订阅者集合中.
    int size = subscriptions.size();
    for (int i = 0; i <= size; i ++) {
        if (i == size ||
                subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }

    // 当前订阅者订阅了哪些事件集合.
    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    // 将订阅事件加入到当前订阅者的订阅事件集合中.
    subscribedEvents.add(eventType);

    // 如果订阅方法为sticky类型,则立即分发sticky事件.
    if (subscriberMethod.sticky) {
        // eventInheritance的作用:是否响应订阅事件的父类事件.
        if (eventInheritance) {
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            Object stickyEvent = stickyEvents.get(eventType);
            postToSubscription(newSubscription, stickyEvent,
                    Looper.getMainLooper() == Looper.myLooper());
        }
    }
}

注册流程图

以往写代码,都是先画流程图再考虑具体实现.但是这里我们是分析开源项目,我认为更好的方法是先了解代码实现,然后再自己画流程图进行巩固.

EventBus的注册流程图如下(ps,如果大家认真的看完上述分析,相信对EventBus的注册流程图应该会很容易理解):

取消注册流程

学习了EventBus的订阅者注册流程,我们趁热打铁,来看一下EventBus的取消注册流程是怎样的.

其实,从前面注册流程的学习,我们应该已经了解到,取消注册其实就是为了从订阅者-订阅事件这两个双向集合[subscriptionsByEventType,typesBySubscriber]中删除相应的对象,可以避免内存泄露.

取消注册示例代码

我们还是从用法入手,然后逐步深入源码.取消注册的示例代码非常简单:

EventBus.getDefault().unregister(this);

unregister()方法

unregister方法代码很简单,就是从两个集合中删除指定的对象,中文注释源码如下:

/** 取消订阅. */
public synchronized void unregister(Object subscriber) {
    // 获取该订阅者所有的订阅事件类类型集合.
    List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
    if (subscribedTypes != null) {
        for (Class<?> eventType : subscribedTypes) {
            unsubscribeByEventType(subscriber, eventType);
        }
        // 从typesBySubscriber删除该<订阅者对象,订阅事件类类型集合>
        typesBySubscriber.remove(subscriber);
    } else {
        Log.e("EventBus", "Subscriber to unregister was not registered before: "
                + subscriber.getClass());
    }
}

/** 从订阅事件对应的订阅者集合中删除取消注册的订阅者. */
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
    // 获取订阅事件对应的订阅者信息集合.
    List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions != null) {
        int size = subscriptions.size();
        for (int i = 0; i < size; i ++) {
            Subscription subscription = subscriptions.get(i);
            // 从订阅者集合中删除特定的订阅者.
            if (subscription.subscriber == subscriber) {
                subscription.active = false;
                subscriptions.remove(i);
                i --;
                size --;
            }
        }
    }
}

事件发布post流程

EventBus发布一个事件非常简单,示例代码如下:

EventBus.getDefault().post(new MessageEvent());

当调用上述代码之后,所有订阅MessageEvent的方法均会在其声明的线程中得到执行.乍看起来很神奇,但是有了之前注册流程的分析,我们已经知道:

EventBus的单例中已经存储了订阅事件-订阅者信息集合的映射关系,我们只要遍历其订阅者信息集合,再其规定的线程中执行即可.

知道了上述原理,我们先来看一下post方法的具体实现.

post()方法

post()中文注释源码如下:

/** 事件分发. */
public void post(Object event) {
    // 获取当前线程的Posting状态.
    PostingThreadState postingState = currentPostingThreadState.get();
    // 获取当前线程的事件队列.
    List<Object> eventQueue = postingState.eventQueue;
    eventQueue.add(event);

    if (!postingState.isPosting) {
        postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
        postingState.isPosting = true;
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        try {
            // 循环处理当前线程eventQueue中的每一个event对象.
            while (!eventQueue.isEmpty()) {
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
            // 处理完知乎重置postingState一些标识信息.
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}

post()方法首先从currentPostingThreadState对象中获取当前线程的PostingThreadState对象.为什么说是当前线程的PostingThreadState对象呢,这就需要看一下currentPostingThreadState对象的构造函数了.

/** 存储当前线程的PostingThreadState对象. */
private final ThreadLocal<PostingThreadState> currentPostingThreadState =
        new ThreadLocal<PostingThreadState>() {
    @Override
    protected PostingThreadState initialValue() {
        return new PostingThreadState();
    }
};

可以看到,currentPostingThreadState是通过ThreadLocal来实现对PostingThreadState对象的存储.ThreadLocal是一个线程内部的数据存储类,通过它可以在指定的线程中存储数据,而这段数据是不会与其他线程共享的.

ThreadLocal的内部原理是:通过生成一个包裹的泛型对象的数组,在不同的线程会有不同的数组索引值.通过这样就可以做到每个线程通过get()方法获取的时候,取到的是自己线程对应的数据.

PostingThreadState类的定义如下:

/** 当前线程的事件分发类. */
final static class PostingThreadState {
    /** 当前线程的发布事件队列. */
    final List<Object> eventQueue = new ArrayList<>();

    /** 当前线程是否处于发送事件的过程中. */
    boolean isPosting;

    /** 当前线程是否是主线程. */
    boolean isMainThread;

    /** 处理当前分发的订阅事件的订阅者. */
    Subscription subscription;

    /** 当前准备分发的订阅事件. */
    Object event;

    /** 当前线程分发是否被取消. */
    boolean canceled;
}

回到Post方法,Post方法取出当前线程的PostingThreadState对象之后,将需要入队的Event事件入队,然后调用了postSingleEvent方法.接下来,我们去看一下这个方法的具体实现.

postSingleEvent()方法

postSingleEvent的中文注释源码如下:

private void postSingleEvent(Object event, PostingThreadState postingState) {
    Class<?> eventClass = event.getClass();
    boolean subscriptionFound = false;
    if (eventInheritance) {
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h ++) {
            Class<?> clazz = eventTypes.get(h);
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }

    if (!subscriptionFound) {
        if (logNoSubscriberMessages) {
            Log.d("EventBus", "No subscribers registered for event " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}

/** 找出当前订阅事件类类型eventClass的所有父类的类类型和其实现的接口的类类型. */
private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
    synchronized (eventTypesCache) {
        List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
        if (eventTypes == null) {
            eventTypes = new ArrayList<>();
            Class<?> clazz = eventClass;
            while (clazz != null) {
                eventTypes.add(clazz);
                addInterfaces(eventTypes, clazz.getInterfaces());
                clazz = clazz.getSuperclass();
            }
            eventTypesCache.put(eventClass, eventTypes);
        }
        return eventTypes;
    }
}

/** 递归获取指定接口的所有父类接口. */
private static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
    for (Class<?> interfaceClass : interfaces) {
        if (!eventTypes.contains(interfaceClass)) {
            eventTypes.add(interfaceClass);
            addInterfaces(eventTypes, interfaceClass.getInterfaces());
        }
    }
}

从源码中可以看出,postSingleEvent方法主要是调用了postSingleEventForEventType来对订阅事件进行分发.区别是,当EventBus的eventInheritance成员属性为true时,订阅了当前事件父类事件或者实现接口的事件的订阅函数也会响应这个订阅事件.

postSingleEventForEventType()方法

postSingleEventForEventType()中文注释源码如下:

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState,
                                            Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        // 获取订阅事件类类型对应的订阅者信息集合.(register函数时构造的集合)
        subscriptions = subscriptionsByEventType.get(eventClass);
    }

    if (subscriptions != null && !subscriptions.isEmpty()) {
        for (Subscription subscription : subscriptions) {
            postingState.event = event;
            postingState.subscription = subscription;
            boolean aborted = false;
            try {
                // 发布订阅事件给订阅函数
                postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                break;
            }
        }
        return true;
    }
    return false;
}

从源码中可以看出,postSingleEventForEventType的作用是:找出所有订阅event事件的订阅函数集合,然后调用postToSubscription方法进行事件分发.

postToSubscription()方法

postToSubscription()方法注释源码如下:

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case BACKGROUND:
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " +
                    subscription.subscriberMethod.threadMode);
    }
}

从源码中可以看出,postToSubscription方法主要是根据订阅方法指定的ThreadMode进行相应的处理.

虽然ThreadMode的具体含义已经在上面的博文中介绍过了,但是这里还是要结合代码讲一下实现原理.

POSTING

POSTING的含义是订阅函数可以直接运行在发送当前Event事件的线程中.而post方法又是发布订阅事件线程调用的,所以直接执行订阅方法即可.EventBus中订阅方法的执行是通过反射机制.

/** 通过反射来执行订阅函数. */
void invokeSubscriber(Subscription subscription, Object event) {
    try {
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        e.printStackTrace();
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

MAIN

MAIN的含义表示订阅函数需要运行在主线程中,例如一些UI的操作.

如何判断当前发布订阅事件的线程是否为UI线程,可以通过如下方法:

Looper.getMainLooper() == Looper.myLooper();

所以,如果当前发布订阅事件的线程是UI线程,则直接反射调用订阅函数即可.如果不是,则通过mainThreadPoster来执行.

public class HandlerPoster extends Handler {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    /** 用于表示当前队列中是否有正在发送的任务. */
    private boolean handlerActive;

    HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    /**
     * 将订阅者和订阅者事件组成PendingPost并入队列.
     * @param subscription 订阅者
     * @param event 订阅者事件
     */
    void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                // 如果现在队列中没有正在执行的消息,则发送一条空消息,让当前handler开始轮询执行消息.
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);

                // 如果在规定的时间内没有发送完队列中的所有请求,则先退出当前循环,让出cpu,
                // 同时发送消息再次调度handleMessage方法.
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}

mainThreadPoster是HandlerPoster的实例,HandlerPoster关联了主线程的Looper,因此通过handleMessage方法通过反射调用订阅函数将订阅函数在主线程中执行.

BACKGROUND

BACKGROUND的意思是订阅函数必须运行在子线程中,而且是顺序执行.这个实现很简单,通过队列机制+线程池就可以实现该功能.

/**
 * 后台通过线程池去执行事件响应回调.
 */
final class BackgroundPoster implements Runnable{
    private final PendingPostQueue queue;
    private final EventBus eventBus;

    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
            try {
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                Log.e("EventBus", Thread.currentThread().getName() + " was interruppted", e);
            }
        }finally {
            executorRunning = false;
        }
    }
}

ASYNC

ASYNC意思是订阅函数运行在子线程中,而且可以并发执行.这个实现就更简单了,直接线程池+Runnable即可.EventBus具体实现如下:

/**
 * 将订阅事件在后台响应执行,并且执行顺序是并发执行.
 */
class AsyncPoster implements Runnable{
    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if (pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }
}

post流程图

接下来,我们总结一下post的流程图.

总结

经过上述EventBus的源码分析,我们应该了解到EventBus通过反射机制实现了订阅者和发布者的解耦和订阅发布功能.与传统的观察者模型相比,不需要写冗余的interface接口,而且支持自定义要执行的线程,感觉还是很不错的.

时间: 2024-10-29 10:46:33

EventBus框架源码分析的相关文章

YII框架源码分析(百度PHP大牛创作-原版-无广告无水印)

                        YII 框架源码分析             百度联盟事业部--黄银锋   目 录 1. 引言 3 1.1.Yii 简介 3 1.2.本文内容与结构 3 2.组件化与模块化 4 2.1.框架加载和运行流程 4 2.2.YiiBase 静态类 5 2.3.组件 6 2.4.模块 9 2.5 .App 应用   10 2.6 .WebApp 应用   11 3.系统组件 13 3.1.日志路由组件  13 3.2.Url 管理组件  15 3.3.异常

android 网络框架 源码分析

android 网络框架 源码分析 导语: 最近想开发一个协议分析工具,来监控android app 所有的网络操作行为, 由于android 开发分为Java层,和Native层, 对于Native层我们只要对linux下所有网络I/O接口进行拦截即可,对于java 层,笔者对android 网络框架不是很了解,所以这个工具开发之前,笔者需要对android 的网络框架进行一个简单的分析. 分析结论: 1. android 的网络框架都是基于Socket类实现的 2. java 层Socket

携程DynamicAPK插件化框架源码分析

携程DynamicAPK插件化框架源码分析 Author:莫川 插件核心思想 1.aapt的改造 分别对不同的插件项目分配不同的packageId,然后对各个插件的资源进行编译,生成R文件,然后与宿主项目的R文件进行id的合并. 要求:由于最终会将所有的资源文件id进行合并,因此,所有的资源名称均不能相同. 2.运行ClassLoader加载各Bundle 和MultiDex的思路是一样的,所有的插件都被加载到同一个ClassLoader当中,因此,不同插件中的Class必须保持包名和类名的唯一

介绍开源的.net通信框架NetworkComms框架 源码分析

原文网址: http://www.cnblogs.com/csdev Networkcomms 是一款C# 语言编写的TCP/UDP通信框架  作者是英国人  以前是收费的 售价249英镑 我曾经花了2千多购买过此通讯框架, 目前作者已经开源  许可是:Apache License v2 开源地址是:https://github.com/MarcFletcher/NetworkComms.Net 这个框架给我的感觉是,代码很优美,运行很稳定,我有一个项目使用此框架已经稳定运行1年多.这个框架能够

CodeIgniter框架——源码分析之入口文件index.php

CodeIgniter框架的入口文件主要是配置开发环境,定义目录常量,加载CI的核心类core/CodeIgniter.php. 源码分析如下: <?php //这个文件是入口,后期所有的文件都要在这里执行. /*----------------------------------------------- * 系统环境配置常量 * 能够配置错误显示级别 * ----------------------------------------------- * 默认情况下: * developmen

CodeIgniter框架——源码分析之CodeIgniter.php

CodeIgniter.php中加载了很多外部文件,完成CI的一次完整流程. <?php /** * 详见 http://www.phpddt.com/tag/codeIgniter/ */ //如果入口文件系统目录常量BASEPATH没定义,就挂了 if ( ! defined('BASEPATH')) exit('No direct script access allowed'); //定义常量:CI_VERSION,CI_CORE define('CI_VERSION', '2.1.4')

CodeIgniter框架——源码分析之Config.php

CI框架的配置信息被存储在$config数组中,我们可以添加自己的配置信息或配置文件到$config中: $this->config->load('filename'); //加载配置文件 $this->config->item('xxx'); //获取配置信息 当然也可以在autoload.php中设置默认加载! <?php if ( ! defined('BASEPATH')) exit('No direct script access allowed');   clas

Android Small插件化框架源码分析

Android Small插件化框架源码分析 目录 概述 Small如何使用 插件加载流程 待改进的地方 一.概述 Small是一个写得非常简洁的插件化框架,工程源码位置:https://github.com/wequick/Small 插件化的方案,说到底要解决的核心问题只有三个: 1.1 插件类的加载 这个问题的解决和其它插件化框架的解决方法差不多.Android的类是由DexClassLoader加载的,通过反射可以将插件包动态加载进去.Small的gradle插件生成的是.so包,在初始

iOS常用框架源码分析

SDWebImage NSCache 类似可变字典,线程安全,使用可变字典自定义实现缓存时需要考虑加锁和释放锁 在内存不足时NSCache会自动释放存储的对象,不需要手动干预 NSCache的key不会被复制,所以key不需要实现NSCopying协议 第三方框架 网络 1.PPNetworkHelper 对AFNetworking 3.x 与YYCache的二次封装 简单易用,包含了缓存机制,控制台可以直接打印json中文字符 2..YTKNetwork 猿题库研发团队基于AFNetworki