1.使用
注册:
public class MyBaseFragment extends Fragment { @Overridepublic void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);//注册EventBusEventBus.getDefault().register(this);} @Overridepublic void onDestroy() {super.onDestroy();//反注册EventBusEventBus.getDefault().unregister(this);} public void onEvent(MyBaseEvent event){//接收消息-默认类型,必须定义,不然程序会报错!!!但是没卵用}public void onEventMainThread(MyBaseEvent event){//接收消息-主线程} public void onEventBackgroundThread(MyBaseEvent event){//接收消息-后台线程}
发布:
EventBus.getDefault().post(new MyBaseEvent(MyEventEnum.AddPkEnd));
订阅:
@Overridepublic void onEventMainThread(MyBaseEvent ev) {super.onEventMainThread(ev); if (ev.getAction() == MyEventEnum.AddPkEnd) {tvPkNumer.setText("" + MyConfig.pkNumber);}
2.源码解析
(1)注册
public static EventBus getDefault() {if(defaultInstance == null) { Class var0 = EventBus.class; synchronized(EventBus.class) {if(defaultInstance == null) { defaultInstance = new EventBus();} } } return defaultInstance;}
public void register(Object subscriber) { register(subscriber, false, 0);}
private synchronized void register(Object subscriber, boolean sticky, int priority) { List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriber.getClass()); for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod, sticky, priority);}}
(2)根据订阅类,找到订阅类中的所有订阅方法(以onEvent开头的方法……)
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) { String key = subscriberClass.getName();List<SubscriberMethod> subscriberMethods; synchronized (methodCache) { subscriberMethods = methodCache.get(key);}if (subscriberMethods != null) {return subscriberMethods;} subscriberMethods = new ArrayList<SubscriberMethod>();Class<?> clazz = subscriberClass;HashSet<String> eventTypesFound = new HashSet<String>();StringBuilder methodKeyBuilder = new StringBuilder(); while (clazz != null) { String name = clazz.getName(); if (name.startsWith("java.") || name.startsWith("javax.") || name.startsWith("android.")) {// Skip system classes, this just degrades performancebreak;} // Starting with EventBus 2.2 we enforced methods to be public (might change with annotations again)Method[] methods = clazz.getDeclaredMethods(); for (Method method : methods) { String methodName = method.getName(); if (methodName.startsWith(ON_EVENT_METHOD_NAME)) {int modifiers = method.getModifiers(); if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) { Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 1) { String modifierString = methodName.substring(ON_EVENT_METHOD_NAME.length());ThreadMode threadMode; if (modifierString.length() == 0) { threadMode = ThreadMode.PostThread;} else if (modifierString.equals("MainThread")) { threadMode = ThreadMode.MainThread;} else if (modifierString.equals("BackgroundThread")) { threadMode = ThreadMode.BackgroundThread;} else if (modifierString.equals("Async")) { threadMode = ThreadMode.Async;} else {if (skipMethodVerificationForClasses.containsKey(clazz)) {continue;} else {throw new EventBusException("Illegal onEvent method, check for typos: " + method);} } Class<?> eventType = parameterTypes[0];methodKeyBuilder.setLength(0);methodKeyBuilder.append(methodName);methodKeyBuilder.append(‘>‘).append(eventType.getName());String methodKey = methodKeyBuilder.toString(); if (eventTypesFound.add(methodKey)) {// Only add if not already found in a sub classsubscriberMethods.add(new SubscriberMethod(method, threadMode, eventType));} } } else if (!skipMethodVerificationForClasses.containsKey(clazz)) { Log.d(EventBus.TAG, "Skipping method (not public, static or abstract): " + clazz + "."+ methodName);} } } clazz = clazz.getSuperclass();}if (subscriberMethods.isEmpty()) {throw new EventBusException("Subscriber " + subscriberClass + " has no public methods called "+ ON_EVENT_METHOD_NAME);} else {synchronized (methodCache) {methodCache.put(key, subscriberMethods);}return subscriberMethods;}}
(3)遍历订阅类中的订阅方法,存进一个全局的Map,key为eventType, value为订阅方法的包装类(持有订阅者、订阅方法(方法、线程模式、事件类型)、优先级)
这里:Value类型为CopyOnWriteArrayList,为线程安全的类ArrayList(不是继承自ArrayList)
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod, boolean sticky, int priority) { Class<?> eventType = subscriberMethod.eventType;CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);Subscription newSubscription = new Subscription(subscriber, subscriberMethod, priority); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<Subscription>();subscriptionsByEventType.put(eventType, subscriptions);} else {if (subscriptions.contains(newSubscription)) {throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "+ eventType);} } // Starting with EventBus 2.2 we enforced methods to be public (might change with annotations again) // subscriberMethod.method.setAccessible(true); int size = subscriptions.size(); for (int i = 0; i <= size; i++) {if (i == size || newSubscription.priority > subscriptions.get(i).priority) { subscriptions.add(i, newSubscription); break;} } List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber); if (subscribedEvents == null) { subscribedEvents = new ArrayList<Class<?>>();typesBySubscriber.put(subscriber, subscribedEvents);} subscribedEvents.add(eventType); if (sticky) { Object stickyEvent; synchronized (stickyEvents) { stickyEvent = stickyEvents.get(eventType);}if (stickyEvent != null) {// If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state) // --> Strange corner case, which we don‘t take care of here.postToSubscription(newSubscription, stickyEvent, Looper.getMainLooper() == Looper.myLooper());} }}
(4)发布事件,使用ThreadLocal,为每个线程分别获取一个事件发布状态的副本,遍历事件队列,循环发布事件
这里:isMainThread存储了当前是否是主线程,后面会用到
public void post(Object event) { EventBus.PostingThreadState postingState = (EventBus.PostingThreadState)this.currentPostingThreadState.get();List eventQueue = postingState.eventQueue;eventQueue.add(event); if(!postingState.isPosting) { postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();postingState.isPosting = true; if(postingState.canceled) {throw new EventBusException("Internal error. Abort state was not reset");} try {while(!eventQueue.isEmpty()) {this.postSingleEvent(eventQueue.remove(0), postingState);} } finally { postingState.isPosting = false;postingState.isMainThread = false;} } }
(5)遍历所有的事件类型
private void postSingleEvent(Object event, EventBus.PostingThreadState postingState) throws Error { Class eventClass = event.getClass(); boolean subscriptionFound = false; if(this.eventInheritance) { List eventTypes = this.lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for(int h = 0; h < countTypes; ++h) { Class clazz = (Class)eventTypes.get(h);subscriptionFound |= this.postSingleEventForEventType(event, postingState, clazz);} } else { subscriptionFound = this.postSingleEventForEventType(event, postingState, eventClass);} if(!subscriptionFound) {if(this.logNoSubscriberMessages) { Log.d(TAG, "No subscribers registered for event " + eventClass);} if(this.sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) {this.post(new NoSubscriberEvent(this, event));} } }
(6)根据事件类型,到全局的Map中去查找订阅方法的包装类集合,再遍历包装类集合,发布事件给订阅者
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) { subscriptions = subscriptionsByEventType.get(eventClass);}if (subscriptions != null && !subscriptions.isEmpty()) {for (Subscription subscription : subscriptions) { postingState.event = event;postingState.subscription = subscription; boolean aborted = false; try { postToSubscription(subscription, event, postingState.isMainThread);aborted = postingState.canceled;} finally { postingState.event = null;postingState.subscription = null;postingState.canceled = false;}if (aborted) {break;} }return true;}return false;}
(7)根据订阅方法的线程模式,以及当前的线程类型,分发执行订阅方法
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {switch (subscription.subscriberMethod.threadMode) {case PostThread: invokeSubscriber(subscription, event); break; case MainThread:if (isMainThread) { invokeSubscriber(subscription, event);} else {mainThreadPoster.enqueue(subscription, event);}break; case BackgroundThread:if (isMainThread) {backgroundPoster.enqueue(subscription, event);} else { invokeSubscriber(subscription, event);}break; case Async:asyncPoster.enqueue(subscription, event); break; default:throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);}}
(8-1)使用反射,在当前线程执行订阅者的订阅方法
void invokeSubscriber(Subscription subscription, Object event) {try { subscription.subscriberMethod.method.invoke(subscription.subscriber, event);} catch (InvocationTargetException e) { handleSubscriberException(subscription, event, e.getCause());} catch (IllegalAccessException e) {throw new IllegalStateException("Unexpected exception", e);}}
(8-2)切换到主线程执行订阅方法,其实就是用了Handler
void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) {queue.enqueue(pendingPost); if (!handlerActive) {handlerActive = true; if (!sendMessage(obtainMessage())) {throw new EventBusException("Could not send handler message");} } }}
@Overridepublic void handleMessage(Message msg) {boolean rescheduled = false; try {long started = SystemClock.uptimeMillis(); while (true) { PendingPost pendingPost = queue.poll(); if (pendingPost == null) {synchronized (this) {// Check again, this time in synchronizedpendingPost = queue.poll(); if (pendingPost == null) {handlerActive = false; return;} } }eventBus.invokeSubscriber(pendingPost); long timeInMethod = SystemClock.uptimeMillis() - started; if (timeInMethod >= maxMillisInsideHandleMessage) {if (!sendMessage(obtainMessage())) {throw new EventBusException("Could not send handler message");} rescheduled = true; return;} } } finally {handlerActive = rescheduled;}}
(8-3)切换到后台线程执行订阅方法,其实就是用了Executors.newCachedThreadPool()逐个控制并发
public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) {queue.enqueue(pendingPost); if (!executorRunning) {executorRunning = true;eventBus.getExecutorService().execute(this);} }}
@Overridepublic void run() {try {try {while (true) { PendingPost pendingPost = queue.poll(1000); if (pendingPost == null) {synchronized (this) {// Check again, this time in synchronizedpendingPost = queue.poll(); if (pendingPost == null) {executorRunning = false; return;} } }eventBus.invokeSubscriber(pendingPost);} } catch (InterruptedException e) { Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);} } finally {executorRunning = false;}}
(8-4)切换到异步线程执行订阅方法,其实就是用了Executors.newCachedThreadPool()动态控制并发
public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);queue.enqueue(pendingPost);eventBus.getExecutorService().execute(this);}
@Overridepublic void run() { PendingPost pendingPost = queue.poll(); if(pendingPost == null) {throw new IllegalStateException("No pending post available");}eventBus.invokeSubscriber(pendingPost);}
void invokeSubscriber(PendingPost pendingPost) { Object event = pendingPost.event;Subscription subscription = pendingPost.subscription;PendingPost.releasePendingPost(pendingPost); if (subscription.active) { invokeSubscriber(subscription, event);}}
类图:
参考:
http://blog.csdn.net/lmj623565791/article/details/40920453
http://a.codekk.com/detail/Android/Trinea/EventBus%20源码解析
时间: 2024-07-29 22:30:19