动手造轮子:实现一个简单的 EventBus

动手造轮子:实现一个简单的 EventBus

Intro

EventBus 是一种事件发布订阅模式,通过 EventBus 我们可以很方便的实现解耦,将事件的发起和事件的处理的很好的分隔开来,很好的实现解耦。 微软官方的示例项目 EShopOnContainers 也有在使用 EventBus 。

这里的 EventBus 实现也是参考借鉴了微软 eShopOnContainers 项目。

EventBus 处理流程:

微服务间使用 EventBus 实现系统间解耦:

借助 EventBus 我们可以很好的实现组件之间,服务之间,系统之间的解耦以及相互通信的问题。

起初觉得 EventBus 和 MQ 其实差不多嘛,都是通过异步处理来实现解耦合,高性能。后来看到了下面这张图才算明白为什么要用 EventBus 以及 EventBus 和 MQ 之间的关系,EventBus 是抽象的,可以用MQ来实现 EventBus.

为什么要使用 EventBus

  1. 解耦合(轻松的实现系统间解耦)
  2. 高性能可扩展(每一个事件都是简单独立且不可更改的对象,只需要保存新增的事件,不涉及其他的变更删除操作)
  3. 系统审计(每一个事件都是不可变更的,每一个事件都是可追溯的)
  4. ...

EventBus 整体架构:

  • IEventBase :所有的事件应该实现这个接口,这个接口定义了事件的唯一id EventId 和事件发生的事件 EventAt

  • IEventHandler:定义了一个 Handle 方法来处理相应的事件

  • IEventStore:所有的事件的处理存储,保存事件的IEventHandler,一般不会直接操作,通过 EventBus 的订阅和取消订阅来操作 EventStore

  • IEventBus:用来发布/订阅/取消订阅事件,并将事件的某一个 IEventHandler 保存到 EventStore 或从 EventStore 中移除

使用示例

来看一个使用示例,完整代码示例

internal class EventTest
{
    public static void MainTest()
    {
        var eventBus = DependencyResolver.Current.ResolveService<IEventBus>();
        eventBus.Subscribe<CounterEvent, CounterEventHandler1>();
        eventBus.Subscribe<CounterEvent, CounterEventHandler2>();
        eventBus.Subscribe<CounterEvent, DelegateEventHandler<CounterEvent>>();
        eventBus.Publish(new CounterEvent { Counter = 1 });

        eventBus.Unsubscribe<CounterEvent, CounterEventHandler1>();
        eventBus.Unsubscribe<CounterEvent, DelegateEventHandler<CounterEvent>>();
        eventBus.Publish(new CounterEvent { Counter = 2 });
    }
}

internal class CounterEvent : EventBase
{
    public int Counter { get; set; }
}

internal class CounterEventHandler1 : IEventHandler<CounterEvent>
{
    public Task Handle(CounterEvent @event)
    {
        LogHelper.GetLogger<CounterEventHandler1>().Info($"Event Info: {@event.ToJson()}, Handler Type:{GetType().FullName}");
        return Task.CompletedTask;
    }
}

internal class CounterEventHandler2 : IEventHandler<CounterEvent>
{
    public Task Handle(CounterEvent @event)
    {
        LogHelper.GetLogger<CounterEventHandler2>().Info($"Event Info: {@event.ToJson()}, Handler Type:{GetType().FullName}");
        return Task.CompletedTask;
    }
}

具体实现

EventStoreInMemory 实现:

EventStoreInMemory 是 IEventStore 将数据放在内存中的实现,使用了 ConcurrentDictionary 以及 HashSet 来尽可能的保证高效,具体实现代码如下:

public class EventStoreInMemory : IEventStore
{
    private readonly ConcurrentDictionary<string, HashSet<Type>> _eventHandlers = new ConcurrentDictionary<string, HashSet<Type>>();

    public bool AddSubscription<TEvent, TEventHandler>()
        where TEvent : IEventBase
        where TEventHandler : IEventHandler<TEvent>
    {
        var eventKey = GetEventKey<TEvent>();
        if (_eventHandlers.ContainsKey(eventKey))
        {
            return _eventHandlers[eventKey].Add(typeof(TEventHandler));
        }
        else
        {
            return _eventHandlers.TryAdd(eventKey, new HashSet<Type>()
            {
                typeof(TEventHandler)
            });
        }
    }

    public bool Clear()
    {
        _eventHandlers.Clear();
        return true;
    }

    public ICollection<Type> GetEventHandlerTypes<TEvent>() where TEvent : IEventBase
    {
        if(_eventHandlers.Count == 0)
            return  new Type[0];
        var eventKey = GetEventKey<TEvent>();
        if (_eventHandlers.TryGetValue(eventKey, out var handlers))
        {
            return handlers;
        }
        return new Type[0];
    }

    public string GetEventKey<TEvent>()
    {
        return typeof(TEvent).FullName;
    }

    public bool HasSubscriptionsForEvent<TEvent>() where TEvent : IEventBase
    {
        if(_eventHandlers.Count == 0)
            return false;

        var eventKey = GetEventKey<TEvent>();
        return _eventHandlers.ContainsKey(eventKey);
    }

    public bool RemoveSubscription<TEvent, TEventHandler>()
        where TEvent : IEventBase
        where TEventHandler : IEventHandler<TEvent>
    {
        if(_eventHandlers.Count == 0)
            return false;

        var eventKey = GetEventKey<TEvent>();
        if (_eventHandlers.ContainsKey(eventKey))
        {
            return _eventHandlers[eventKey].Remove(typeof(TEventHandler));
        }
        return false;
    }
}

EventBus 的实现,从上面可以看到 EventStore 保存的是 IEventHandler 对应的 Type,在 Publish 的时候根据 Type 从 IoC 容器中取得相应的 Handler 即可,如果没有在 IoC 容器中找到对应的类型,则会尝试创建一个类型实例,然后调用 IEventHandlerHandle 方法,代码如下:

/// <summary>
/// EventBus in process
/// </summary>
public class EventBus : IEventBus
{
    private static readonly ILogHelperLogger Logger = Helpers.LogHelper.GetLogger<EventBus>();

    private readonly IEventStore _eventStore;
    private readonly IServiceProvider _serviceProvider;

    public EventBus(IEventStore eventStore, IServiceProvider serviceProvider = null)
    {
        _eventStore = eventStore;
        _serviceProvider = serviceProvider ?? DependencyResolver.Current;
    }

    public bool Publish<TEvent>(TEvent @event) where TEvent : IEventBase
    {
        if (!_eventStore.HasSubscriptionsForEvent<TEvent>())
        {
            return false;
        }
        var handlers = _eventStore.GetEventHandlerTypes<TEvent>();
        if (handlers.Count > 0)
        {
            var handlerTasks = new List<Task>();
            foreach (var handlerType in handlers)
            {
                try
                {
                    if (_serviceProvider.GetServiceOrCreateInstance(handlerType) is IEventHandler<TEvent> handler)
                    {
                        handlerTasks.Add(handler.Handle(@event));
                    }
                }
                catch (Exception ex)
                {
                    Logger.Error(ex, $"handle event [{_eventStore.GetEventKey<TEvent>()}] error, eventHandlerType:{handlerType.FullName}");
                }
            }
            handlerTasks.WhenAll().ConfigureAwait(false);

            return true;
        }
        return false;
    }

    public bool Subscribe<TEvent, TEventHandler>()
        where TEvent : IEventBase
        where TEventHandler : IEventHandler<TEvent>
    {
        return _eventStore.AddSubscription<TEvent, TEventHandler>();
    }

    public bool Unsubscribe<TEvent, TEventHandler>()
        where TEvent : IEventBase
        where TEventHandler : IEventHandler<TEvent>
    {
        return _eventStore.RemoveSubscription<TEvent, TEventHandler>();
    }
}

项目实例

来看一个实际的项目中的使用,在我的活动室预约项目中有一个公告的模块,访问公告详情页面,这个公告的访问次数加1,把这个访问次数加1改成了用 EventBus 来实现,实际项目代码:https://github.com/WeihanLi/ActivityReservation/blob/67e2cb8e92876629a7af6dc051745dd8c7e9faeb/ActivityReservation/Startup.cs

  1. 定义 Event 以及 EventHandler
public class NoticeViewEvent : EventBase
{
    public Guid NoticeId { get; set; }

    // UserId
    // IP
    // ...
}

public class NoticeViewEventHandler : IEventHandler<NoticeViewEvent>
{
    public async Task Handle(NoticeViewEvent @event)
    {
        await DependencyResolver.Current.TryInvokeServiceAsync<ReservationDbContext>(async dbContext =>
        {
            var notice = await dbContext.Notices.FindAsync(@event.NoticeId);
            notice.NoticeVisitCount += 1;
            await dbContext.SaveChangesAsync();
        });
    }
}

这里的 Event 只定义了一个 NoticeId ,其实也可以把请求信息如IP/UA等信息加进去,在 EventHandler里处理以便日后数据分析。

  1. 注册 EventBus 相关服务以及 EventHandlers
services.AddSingleton<IEventBus, EventBus>();
services.AddSingleton<IEventStore, EventStoreInMemory>();
//register EventHandlers
services.AddSingleton<NoticeViewEventHandler>();
  1. 订阅事件
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, IEventBus eventBus)
{
    eventBus.Subscribe<NoticeViewEvent, NoticeViewEventHandler>();
    // ...
}
  1. 发布事件
eventBus.Publish(new NoticeViewEvent { NoticeId = notice.NoticeId });

Reference

原文地址:https://www.cnblogs.com/weihanli/p/implement-a-simple-event-bus.html

时间: 2024-10-09 20:08:26

动手造轮子:实现一个简单的 EventBus的相关文章

动手造轮子:实现一个简单的依赖注入(一)

动手造轮子:实现一个简单的依赖注入(一) Intro 在上一篇文章中主要介绍了一下要做的依赖注入的整体设计和大概编程体验,这篇文章要开始写代码了,开始实现自己的依赖注入框架. 类图 首先来温习一下上次提到的类图 服务生命周期 服务生命周期定义: public enum ServiceLifetime : sbyte { /// <summary> /// Specifies that a single instance of the service will be created. /// &

自己动手造“轮子”---python常用的几个方法

前言:由于工作内容的原因,经常需要些python脚本,久而久之,发现有一些方法经常用到,于是就自己动手编辑了一些常用的.大众的.通用的方法.小弟不才,但也希望能为开源做做贡献. 最后再附上代码哈: 一.目前该文件中只总结了五个方法: 1.ping_network(network_ip)? ?---? ping测试的方法 2.socket_port(network_ip, port)? ---? 检测目标网络端口是否开放(正常)的方法 3.file_format_analysis(file_pat

【控件】给地图添加一个简单的比例尺条

ArcGIS API for Android 10.2.3暂时没有直接提供显示比例尺条的功能,但是又必须要用肿么办呢?自己动手丰衣足食!以一个简单的例子介绍一下如何给地图添加一个简单的比例尺条. 一.目标 在屏幕下方添加一个长2cm的比例尺条,实时显示地图当前比例尺,随底图颜色的改变而改变,并动态调整其位置. 二.逻辑 1.两个TextView:一个用于显示获取到比例尺,一个用于显示一个2cm的比例尺条 2.2cm比例尺条:1dp=0.00625英寸=0.015875 厘米,理论上不同品牌的手机

重复造轮子,编写一个轻量级的异步写日志的实用工具类(LogAsyncWriter)

一说到写日志,大家可能推荐一堆的开源日志框架,如:Log4Net.NLog,这些日志框架确实也不错,比较强大也比较灵活,但也正因为又强大又灵活,导致我们使用他们时需要引用一些DLL,同时还要学习各种用法及配置文件,这对于有些小工具.小程序.小网站来说,有点“杀鸡焉俺用牛刀”的感觉,而且如果对这些日志框架不了解,可能输出来的日志性能或效果未毕是与自己所想的,鉴于这几个原因,我自己重复造轮子,编写了一个轻量级的异步写日志的实用工具类(LogAsyncWriter),这个类还是比较简单的,实现思路也很

自己动手实现一个简单的JSON解析器

1. 背景 JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式.相对于另一种数据交换格式 XML,JSON 有着诸多优点.比如易读性更好,占用空间更少等.在 web 应用开发领域内,得益于 JavaScript 对 JSON 提供的良好支持,JSON 要比 XML 更受开发人员青睐.所以作为开发人员,如果有兴趣的话,还是应该深入了解一下 JSON 相关的知识.本着探究 JSON 原理的目的,我将会在这篇文章中详细向大家介绍一个简单的JSON解析器的解析流

造轮子:新建一个属于自己的String类

练习造轮子,新建一个属于自己的MyString类 首先来开启检测内存泄漏的函数 在main里添加 _CrtSetDbgFlag(_CrtSetDbgFlag(_CRTDBG_REPORT_FLAG) | _CRTDBG_LEAK_CHECK_DF); 开启内存泄漏检测 int main() { _CrtSetDbgFlag(_CrtSetDbgFlag(_CRTDBG_REPORT_FLAG) | _CRTDBG_LEAK_CHECK_DF); int *p = new int; return

自己动手模拟开发一个简单的Web服务器

开篇:每当我们将开发好的ASP.NET网站部署到IIS服务器中,在浏览器正常浏览页面时,可曾想过Web服务器是怎么工作的,其原理是什么?“纸上得来终觉浅,绝知此事要躬行”,于是我们自己模拟一个简单的Web服务器来体会一下. 一.请求-处理-响应模型 1.1 基本过程介绍 每一个HTTP请求都会经历三个步凑:请求-处理-响应:每当我们在浏览器中输入一个URL时都会被封装为一个HTTP请求报文发送到Web服务器,而Web服务器则接收并解析HTTP请求报文,然后针对请求进行处理(返回指定的HTML页面

教你自己实现一个事件总线EventBus

EventBus是一个Github上著名的开源事件总线框架,想必很多人都使用过它.它实现了事件订阅者和事件发布者的解耦,让我们更加容易在actvity等组件间传递信息. 我们虽然不喜欢重复造轮子,但是不代表我们不需要了解轮子是怎么造的. 这篇文章通过这个简单的实例,给大家说明EventBus实现的原理,一起来打造一个简单的事件总线框架.如果你明白了这个框架的设计原理,那么EventBus也就相差不大,两者比起来只是后者更加完善和高效. 这也给看源码的朋友带来便利,大家可以先尝试看我这篇文章,再去

GitHub Android 最火开源项目Top20 GitHub 上的开源项目不胜枚举,越来越多的开源项目正在迁移到GitHub平台上。基于不要重复造轮子的原则,了解当下比较流行的Android与iOS开源项目很是必要。利用这些项目,有时能够让你达到事半功倍的效果。

1. ActionBarSherlock(推荐) ActionBarSherlock应该算得上是GitHub上最火的Android开源项目了,它是一个独立的库,通过一个API和主题,开发者就可以很方便地使用所有版本的Android动作栏的设计模式. 对于Android 4.0及更高版本,ActionBarSherlock可以自动使用本地ActionBar实现,而对于之前没有ActionBar功能的版本,基于Ice Cream Sandwich的自定义动作栏实现将自动围绕布局.能够让开发者轻松开发