1. 发布-订阅模式
发布-订阅模式(publish-subscribe)是一种编程范式,发布方不发布消息给特定的接收方,而是由订阅方选择性接收。这使得发布方和订阅方相对独立,减少了耦合性。
在发布-订阅模式中,有以下几个难点:
1)如何区分或分配订阅者关注的消息;
2)发布者如何将消息提交给对应订阅者;
下图描述Guava EventBus对发布-订阅模式的实现。
2. 订阅者注册
下面是简单的订阅者实现:
// 订阅者 public class Subscriber { @Subscribe public void process(String event) { System.out.print(event); } }
注册订阅者:
// 消息订阅 EventBus eventBus = new EventBus(); eventBus.register(new Subscriber());
Guava EventBus中有关注册的流程:
Guava EventBus中有关注册的代码:
// SubscriberRegistry.register() void register(Object listener) { // 查找订阅者中有Subscribe注解的方法 // findAllSubscribers返回值: 函数参数类型-{EventBus,listener,有Subscribe注解的方法} Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<?> eventType = entry.getKey(); Collection<Subscriber> eventMethodsInListener = entry.getValue(); // 所有订阅者 CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) { CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>(); eventSubscribers = MoreObjects.firstNonNull( subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } }
3. 事件提交&处理
eventBus.post("Hello, EventBus");
Guava EventBus中消息提交和处理流程:
Guava EventBus中消息提交和处理代码:
// EventBus.post(..) public void post(Object event) { // 查找相关订阅者 Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { // 事件处理类型 dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } }
Dispatcher有以下几种:
1. PerThreadQueuedDispatcher : 按照事件提交顺序进行处理(a breadth-first dispatch)。 // EventBus默认分配类型
2. LegacyAsyncDispatcher:若订阅者处理函数上有AllowConcurrentEvents注解,则使用线程中对象进行多线程并行处理;否则将使用同步处理。// AsyncEventBus默认分配类型
class Subscriber { static Subscriber create(EventBus bus, Object listener, Method method) { return method.getAnnotation(AllowConcurrentEvents.class) != null ? new Subscriber(bus, listener, method) : new SynchronizedSubscriber(bus, listener, method); } }
Subscriber中dispatchEvent中处理方法:
final void dispatchEvent(final Object event) { executor.execute(new Runnable() { @Override public void run() { method.invoke(target, checkNotNull(event)); }); }
3. ImmediateDispatcher:立即将事件提交给对应的订阅者(a depth-first dispatch)
时间: 2024-11-05 12:27:38