ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

上文中,我介绍了事件驱动型架构的一种简单的实现,并演示了一个完整的事件派发、订阅和处理的流程。这种实现太简单了,百十行代码就展示了一个基本工作原理。然而,要将这样的解决方案运用到实际生产环境,还有很长的路要走。今天,我们就研究一下在事件处理器中,对象生命周期的管理问题。

事实上,不仅仅是在事件处理器中,我们需要关心对象的生命周期,在整个ASP.NET Core Web API的应用程序里,我们需要理解并仔细推敲被注册到IoC容器中的服务,它们的生命周期应该是个怎样的情形,这也是服务端应用程序设计必须认真考虑的内容。因为如果生命周期管理不合理,程序申请的资源无法合理释放,最后便会带来内存泄漏、程序崩溃等各种问题,然而这样的问题对于服务端应用程序来说,是非常严重的。

记得在上一篇文章的结束部分,我给大家留下一个练习,就是让大家在CustomerCreatedEventHandler事件处理器的HandleAsync方法中,填入自己的代码,以便对获得的事件消息做进一步的处理。作为本文的引子,我们首先将这部分工作做完,然后再进一步分析生命周期的问题。

Event Store

Event Store是CQRS体系结构模式中最为重要的一个组成部分,它的主要职责就是保存发生于领域模型中的领域事件,并对事件数据进行归档。当仓储需要获取领域模型对象时,Event Store也会配合快照数据库一起,根据领域事件的发生顺序,逐步回放并重塑领域模型对象。事实上,Event Store的实现是非常复杂的,虽然从它的职责上来看并不算太复杂,然而它所需要解决的事件同步、快照、性能、消息派发等问题,使得CQRS体系结构的实现变得非常复杂。在实际应用中,已经有一些比较成熟的框架和工具集,能够帮助我们在CQRS中很方便地实现Event Store,比如GetEventStore就是一个很好的开源Event Store框架,它是基于.NET开发的,在微软官方的eShopOnContainers说明文档中,也提到了这个框架,推荐大家上他们的官网(https://eventstore.org/)了解一下。在这里我们就先不深入研究Event Store应该如何实现,我们先做一个简单的Event Store,以便展示我们需要讨论的问题。

延续着上一版的代码库(https://github.com/daxnet/edasample/tree/chapter_1),我们首先在EdaSample.Common.Events命名空间下,定义一个IEventStore的接口,这个接口非常简单,仅仅包含一个保存事件的方法,代码如下:

public interface IEventStore : IDisposable
{
    Task SaveEventAsync<TEvent>(TEvent @event)
        where TEvent : IEvent;
}

SaveEventAsync方法仅有一个参数:由泛型类型TEvent绑定的@event对象。泛型约束表示SaveEventAsync方法仅能接受IEvent接口及其实现类型的对象作为参数传入。接口定义好了,下一步就是实现这个接口,对传入的事件对象进行保存。为了实现过程的简单,我们使用Dapper,将事件数据保存到SQL Server数据库中,来模拟Event Store对事件的保存操作。

Note:为什么IEventStore接口的SaveEventAsync方法签名中,没有CancellationToken参数?严格来说,支持async/await异步编程模型的方法定义上,是需要带上CancellationToken参数的,以便调用方请求取消操作的时候,方法内部可以根据情况对操作进行取消。然而有些情况下取消操作并不是那么合理,或者方法内部所使用的API并没有提供更深层的取消支持,因此也就没有必要在方法定义上增加CancellationToken参数。在此处,为了保证接口的简单,没有引入CancellationToken的参数。

接下来,我们实现这个接口,并用Dapper将事件数据保存到SQL Server中。出于框架设计的考虑,我们新建一个Net Standard Class Library项目,在这个新的项目中实现IEventStore接口,这么做的原因已经在上文中介绍过了。代码如下:

public class DapperEventStore : IEventStore
{
    private readonly string connectionString;

    public DapperEventStore(string connectionString)
    {
        this.connectionString = connectionString;
    }

    public async Task SaveEventAsync<TEvent>(TEvent @event) where TEvent : IEvent
    {
        const string sql = @"INSERT INTO [dbo].[Events]
([EventId], [EventPayload], [EventTimestamp])
VALUES
(@eventId, @eventPayload, @eventTimestamp)";
        using (var connection = new SqlConnection(this.connectionString))
        {
            await connection.ExecuteAsync(sql, new
            {
                eventId = @event.Id,
                eventPayload = JsonConvert.SerializeObject(@event),
                eventTimestamp = @event.Timestamp
            });
        }
    }

    #region IDisposable Support
    // 此处省略
    #endregion
}

IDisposable接口的实现部分暂且省略,可以看到,实现还是非常简单的:通过构造函数传入数据库的连接字符串,在SaveEventAsyc方法中,基于SqlConnection对象执行Dapper的扩展方法来完成事件数据的保存。

Note: 此处使用了JsonConvert.SerializeObject方法来序列化事件对象,也就意味着DapperEventStore程序集需要依赖Newtonsoft.Json程序集。虽然在我们此处的案例中不会有什么影响,但这样做会造成DapperEventStore对Newtonsoft.Json的强依赖,这样的依赖关系不仅让DapperEventStore变得不可测试,而且Newtonsoft.Json将来未知的变化,也会影响到DapperEventStore,带来一些不确定性和维护性问题。更好的做法是,引入一个IMessageSerializer接口,在另一个新的程序集中使用Newtonsoft.Json来实现这个接口,同时仅让DapperEventStore依赖IMessageSerializer,并在应用程序启动时,将Newtonsoft.Json的实现注册到IoC容器中。此时,IMessageSerializer可以被Mock,DapperEventStore就变得可测试了;另一方面,由于只有那个新的程序集会依赖Newtonsoft.Json,因此,Newtonsoft.Json的变化也仅仅会影响那个新的程序集,不会对框架主体的其它部分造成任何影响。

EventStore实现好了,接下来,我们将其用在CustomerCreatedEventHandler中,以便将订阅的CustomerCreatedEvent保存下来。

事件数据的保存

保存事件数据的第一步,就是在ASP.NET Core Web API的IoC容器中,将DapperEventStore注册进去。这一步是非常简单的,只需要在Startup.cs的ConfigureServices方法中完成即可。代码如下:

public void ConfigureServices(IServiceCollection services)
{
    services.AddMvc();

    services.AddTransient<IEventHandler, CustomerCreatedEventHandler>();
    services.AddTransient<IEventStore>(serviceProvider => new DapperEventStore(Configuration["mssql:connectionString"]));
    services.AddSingleton<IEventBus, PassThroughEventBus>();
}

注意我们使用的是services.AddTransient方法来注册DapperEventStore,我们希望应用程序在每次请求IEventStore实例时,都能获得一个新的DapperEventStore的实例。

接下来,打开CustomerCreatedEventHandler.cs文件,在构造函数中加入对IEventStore的依赖,然后修改HandleAsync方法,在该方法中使用IEventStore的实例来完成事件数据的保存。代码如下:

public class CustomerCreatedEventHandler : IEventHandler<CustomerCreatedEvent>
{
    private readonly IEventStore eventStore;

    public CustomerCreatedEventHandler(IEventStore eventStore)
    {
        this.eventStore = eventStore;
    }

    public bool CanHandle(IEvent @event)
        => @event.GetType().Equals(typeof(CustomerCreatedEvent));

    public async Task<bool> HandleAsync(CustomerCreatedEvent @event, CancellationToken cancellationToken = default)
    {
        await this.eventStore.SaveEventAsync(@event);
        return true;
    }

    public Task<bool> HandleAsync(IEvent @event, CancellationToken cancellationToken = default)
        => CanHandle(@event) ? HandleAsync((CustomerCreatedEvent)@event, cancellationToken) : Task.FromResult(false);
}

OK,代码修改完毕,测试一下。

看看数据库中客户信息是否已经创建:

看看数据库中事件数据是否已经保存成功:

OK,数据全部保存成功。

然而,事情真的就这么简单么?No。在追踪了IEventStore实例(也就是DapperEventStore)的生命周期后,你会发现,问题没有想象的那么简单。

追踪对象的生命周期

在使用services.AddTransient/AddScoped/AddSingleton/AddScoped这些方法对服务进行注册时,使用不同的方法也就意味着选择了不同的对象生命周期。在此我们也不再深入讨论每种方法之间的差异,微软官方有详细的文档和demo(抱歉我没有贴出中文链接,因为机器翻译的缘故,实在有点不堪入目),如果对ASP.NET Core的IoC容器不熟悉的话,建议先了解一下官网文章的内容。在上面我稍微提了一下,我们是用AddTransient方法来注册DapperEventStore的,因为我们希望在每次使用IEventStore的时候,都会有一个新的DapperEventStore被创建。现在,让我们来验证一下,看情况是否果真如此。

日志的使用

追踪程序执行的最有效的方式就是使用日志。在我们的场景中,使用基于文件的日志会更合适,因为这样我们可以更清楚地看到程序的执行过程以及对象的变化过程。同样,我不打算详细介绍如何在ASP.NET Core Web API中使用日志,微软官网同样有着非常详尽的文档来介绍这些内容。在这里,我简要地将相关代码列出来,以介绍如何启用基于文件的日志系统。

首先,在Web API服务的项目上,添加对Serilog.Extensions.Logging.File的nuget包,使用它能够非常方便地启用基于文件的日志。然后,打开Program.cs文件,添加ConfigureLogging的调用:

public static IWebHost BuildWebHost(string[] args) =>
    WebHost.CreateDefaultBuilder(args)
        .ConfigureLogging((context, lb) =>
        {
            lb.AddFile(LogFileName);
        })
        .UseStartup<Startup>()
        .Build();

此处LogFileName为本地文件系统中的日志文件文件名,为了避免权限问题,我将日志写入C:\Users\<user>\appdata\local目录下,因为我的Web API进程是由当前登录用户启动的,所以写在这个目录下不会有权限问题。如果今后我们把Web API host在IIS中,那么启动IIS服务的用户需要对日志所在的目录具有写入的权限,日志文件才能被正确写入,这一点是需要注意的。

好了,现在可以使用日志了,先试试看。在Startup类的构造函数中,加入ILoggerFactory参数,并在构造函数执行时获取ILogger实例,然后在ConfigureServices调用中输出一些内容:

public class Startup
{
    private readonly ILogger logger;

    public Startup(IConfiguration configuration, ILoggerFactory loggerFactory)
    {
        Configuration = configuration;
        this.logger = loggerFactory.CreateLogger<Startup>();
    }

    public IConfiguration Configuration { get; }

    public void ConfigureServices(IServiceCollection services)
    {
        this.logger.LogInformation("正在对服务进行配置...");

        services.AddMvc();

        services.AddTransient<IEventHandler, CustomerCreatedEventHandler>();
        services.AddTransient<IEventStore>(serviceProvider =>
            new DapperEventStore(Configuration["mssql:connectionString"]));
        services.AddSingleton<IEventBus, PassThroughEventBus>();
        this.logger.LogInformation("服务配置完成,已注册到IoC容器!");
    }

    // 其它方法暂时省略
}

现在重新启动服务,然后查看日志文件,发现日志可以被正确输出:

接下来,使用类似的方式,向PassThroughEventBus的构造函数和Dispose方法中加入一些日志输出,在CustomersController的Create方法中、CustomerCreatedEventHandler的构造函数和HandleAsync方法中、DapperEventStore的构造函数和Dispose方法中也加入一些日志输出,以便能够观察当新的客户信息被创建时,Web API的执行过程。限于文章篇幅,就不在此一一贴出各方法中加入日志输出的代码了,大家可以根据本文最后所提供的源代码链接来获取源代码。简单地举个例子吧,比如对于DapperEventStore,我们通过构造函数注入ILogger的实例:

public class DapperEventStore : IEventStore
{
    private readonly string connectionString;
    private readonly ILogger logger;

    public DapperEventStore(string connectionString,
        ILogger<DapperEventStore> logger)
    {
        this.connectionString = connectionString;
        this.logger = logger;
        logger.LogInformation($"DapperEventStore构造函数调用完成。Hash Code:{this.GetHashCode()}.");
    }
    // 其它函数省略
}

这样一来,在DapperEventStore的其它方法中,就可以通过logger来输出日志了。

发现问题

同样,再次运行Web API,并通过Powershell发起一次创建客户信息的请求,然后打开日志文件,整个程序的执行过程基本上就一目了然了:

从上面的日志内容可以得知,当应用程序正常退出时,由IoC容器托管的PassThroughEventBus和DapperEventStore都能够被正常Dispose,目前看来没什么问题,因为资源可以正常释放。现在让我们重新启动Web API,连续发送两次创建客户信息的请求,再次查看日志,我们得到了下面的内容:

从上面的日志内容可以看到,在Web API的整个运行期间,CustomerCreatedEventHandler仅被构造了一次,而且在每次处理CustomerCreatedEvent事件的时候,都是使用同一个DapperEventStore实例来保存事件数据。也就是说,CustomerCreatedEventHandler和DapperEventStore在整个Web API服务的生命周期中,有且仅有一个实例,它们是Singleton的!然而,在进行系统架构的时候,我们应该尽量保证较短的对象生命周期,以免因为状态的不一致性导致不可回滚的错误出现,这也是架构设计中的一种最佳实践。虽然目前我们的DapperEventStore在程序正常退出的时候能够被Dispose掉,但如果DapperEventStore使用了非托管资源,并且非托管资源并没有很好地管理自己的内存呢?久而久之,DapperEventStore就产生了内存泄漏点,慢慢地,Web API就会出现内存泄漏,系统资源将被耗尽。假如Web API被部署在云中,应用程序监控装置(比如AWS的Cloud Watch)就会持续报警,并强制服务断线,整个系统的可用性就无法得到保障。所以,我们更期望DapperEventStore能够正确地实现C#的Dispose模式,在Dispose方法中合理地释放资源,并且仅在需要使用DapperEventStore时候才去构建它,用完就及时Dispose,以保证资源的合理使用。这也就是为什么我们使用services.AddTransient方法来注册CustomerCreatedEventHandler以及DapperEventStore的原因。

然而,事实却并非如此。究其原因,就是因为PassThroughEventBus是单例实例,它的生命周期是整个Web API服务。而在PassThroughEventBus的构造函数中,CustomerCreatedEventHandler被作为参数传入,于是,PassThroughEventBus产生了对CustomerCreatedEventHandler的依赖,而连带地也产生了对DapperEventStore的依赖。换句话说,在整个应用程序运行的过程中,IoC框架完全没有理由再去创建新的CustomerCreatedEventHandler以及DapperEventStore的实例,因为事件处理器作为强引用被注册到PassThroughEventBus中,而PassThroughEventBus至始至终没有变过!

Note:为什么PassThroughEventBus可以作为单例注册到IoC容器中?因为它提供了无状态的全局性的基础结构层服务:事件总线。在PassThroughEventBus的实现中,这种全局性体现得不明显,我们当然可以每一次HTTP请求都创建一个新的PassThroughEventBus来转发事件消息并作处理。然而,在今后我们要实现的基于RabbitMQ的事件总线中,如果我们还是每次HTTP请求都创建一个新的消息队列,不仅性能得不到保证,而且消息并不能路由到新创建的channel上。注意:我们将其注册成单例,一个很重要的依据是由于它是无状态的,但即使如此,我们也要注意在应用程序退出的时候,合理Dispose掉它所占用的资源。当然,在这里,ASP.NET Core的IoC机制会帮我们解决这个问题(因为我注册了PassThroughEventBus,但我没有显式调用Dispose方法,我仍然能从日志中看到“PassThroughEventBus已经被Dispose”的字样),然而有些情况下,ASP.NET Core不会帮我们做这些,就需要我们自己手工完成。

OMG!由于构造函数注入,使得对象之间产生了依赖关系,从而影响到了它们的生命周期,这可怎么办?既然问题是由依赖引起的,那么就需要想办法解耦。

解耦!解决事件处理器对象生命周期问题

经过分析,我们需要解除PassThroughEventBus对各种EventHandler的直接依赖。因为PassThroughEventBus是单例的,那么由它引用的所有组件也只可能具有相同的生命周期。然而,这样的解耦又该如何做呢?将EventHandler封装到另一个类中?结果还是一样,PassThroughEventBus总会通过某种对象关系,来间接引用到EventHandler上,造成EventHandler全局唯一。

或许,应该要有另一套生命周期管理体系来管理EventHandler的生命周期,使得每当PassThroughEventBus需要使用EventHandler对所订阅的事件进行处理的时候,都会通过这套体系来请求新的EventHandler实例,这样一来,PassThroughEventBus也就不再依赖于某个特定的实例了,而仅仅是引用了各种EventHandler在新的生命周期管理体系中的注册信息。每当需要的时候,PassThroughEventBus都会将事件处理器的注册信息传给新的管理体系,然后由这套新的体系来维护事件处理器的生命周期。

通过阅读微软官方的eShopOnContainers案例代码后,证实了这一想法。在案例中,有如下代码:

// namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
private async Task ProcessEvent(string eventName, string message)
{
    if (_subsManager.HasSubscriptionsForEvent(eventName))
    {
        using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
        {
            var subscriptions = _subsManager.GetHandlersForEvent(eventName);
            foreach (var subscription in subscriptions)
            {
                if (subscription.IsDynamic)
                {
                    var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
                    dynamic eventData = JObject.Parse(message);
                    await handler.Handle(eventData);
                }
                else
                {
                    var eventType = _subsManager.GetEventTypeByName(eventName);
                    var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
                    var handler = scope.ResolveOptional(subscription.HandlerType);
                    var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
                    await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
                }
            }
        }
    }
}

可以看到,高亮的这一行,通过Autofac创建了一个新的LifetimeScope,在这个Scope中,通过eventName来获得一个subscription对象(也就是EventHandler的注册信息),进而通过scope的ResolveOptional调用来获得新的EventHandler实例。基本过程就是这样,目前也不需要纠结IDynamicIntegrationEventHandler是干什么用的,也不需要纠结为什么要使用dynamic来保存事件数据。重点是,autofac的BeginLifetimeScope方法调用创建了一个新的IoC Scope,在这个Scope中解析(resolve)了新的EventHandler实例。在eShopOnContainer案例中,EventBusRabbitMQ的设计是特定的,必须依赖于Autofac作为依赖注入框架。或许这部分设计可以进一步改善,使得EventBusRabbitMQ不会强依赖于Autofac。

接下来,我们会引入一个新的概念:事件处理器执行上下文,使用类似的方式来解决对象生命周期问题。

事件处理器执行上下文

事件处理器执行上下文(Event Handler Execution Context, EHEC)为事件处理器提供了一个完整的生命周期管理机制,在这套机制中,事件处理器及其引用的对象资源可以被正常创建和正常销毁。现在让我们一起看看,如何在EdaSample的案例代码中使用事件处理器执行上下文。

事件处理器执行上下文的接口定义如下,当然,这部分接口是放在EdaSample.Common.Events目录下,作为消息系统的框架代码提供给调用方:

public interface IEventHandlerExecutionContext
{
    void RegisterHandler<TEvent, THandler>()
        where TEvent : IEvent
        where THandler : IEventHandler<TEvent>;

    void RegisterHandler(Type eventType, Type handlerType);

    bool HandlerRegistered<TEvent, THandler>()
        where TEvent : IEvent
        where THandler : IEventHandler<TEvent>;

    bool HandlerRegistered(Type eventType, Type handlerType);

    Task HandleEventAsync(IEvent @event, CancellationToken cancellationToken = default);
}

这个接口主要包含三种方法:注册事件处理器、判断事件处理器是否已经注册,以及对接收到的事件消息进行处理。整个结构还是非常清晰简单的。现在需要实现这个接口。根据上面的分析,这个接口的实现是需要依赖于IoC容器的,目前简单起见,我们仅使用微软ASP.NET Core标准的Dependency Injection框架来实现,当然,也可以使用Autofac,取决于你怎样去实现上面这个接口。需要注意的是,由于该接口的实现是需要依赖于第三方组件的(在这里是微软的Dependency Injection框架),因此,最佳做法是新建一个类库,并引用EdaSample.Common程序集,并在这个新的类库中,依赖Dependency Injection框架来实现这个接口。

以下是基于Microsoft.Extensions.DependencyInjection框架来实现的事件处理器执行上下文完整代码,这里有个兼容性问题,就是构造函数的第二个参数:serviceProviderFactory。在Microsoft.Extensions.DependencyInjection框架2.0版本之前,IServiceCollection.BuildServiceProvider方法的返回类型是IServiceProvider,但从2.0开始,它的返回类型已经从IServiceProvider接口,变成了ServiceProvider类。这里引出了框架设计的另一个原则,就是依赖较低版本的.NET Core,以便获得更好的兼容性。如果我们的EdaSample是使用.NET Core 1.1开发的,那么当下面这个类被直接用在ASP.NET Core 2.0的项目中时,如果不通过构造函数参数传入ServiceProvider创建委托,而是直接在代码中使用registry.BuildServiceProvider调用,就会出现异常。

public class EventHandlerExecutionContext : IEventHandlerExecutionContext
{
    private readonly IServiceCollection registry;
    private readonly Func<IServiceCollection, IServiceProvider> serviceProviderFactory;
    private readonly ConcurrentDictionary<Type, List<Type>> registrations = new ConcurrentDictionary<Type, List<Type>>();

    public EventHandlerExecutionContext(IServiceCollection registry,
        Func<IServiceCollection, IServiceProvider> serviceProviderFactory = null)
    {
        this.registry = registry;
        this.serviceProviderFactory = serviceProviderFactory ?? (sc => registry.BuildServiceProvider());
    }

    public async Task HandleEventAsync(IEvent @event, CancellationToken cancellationToken = default(CancellationToken))
    {
        var eventType = @event.GetType();
        if (this.registrations.TryGetValue(eventType, out List<Type> handlerTypes) &&
            handlerTypes?.Count > 0)
        {
            var serviceProvider = this.serviceProviderFactory(this.registry);
            using (var childScope = serviceProvider.CreateScope())
            {
                foreach(var handlerType in handlerTypes)
                {
                    var handler = (IEventHandler)childScope.ServiceProvider.GetService(handlerType);
                    if (handler.CanHandle(@event))
                    {
                        await handler.HandleAsync(@event, cancellationToken);
                    }
                }
            }
        }
    }

    public bool HandlerRegistered<TEvent, THandler>()
        where TEvent : IEvent
        where THandler : IEventHandler<TEvent>
        => this.HandlerRegistered(typeof(TEvent), typeof(THandler));

    public bool HandlerRegistered(Type eventType, Type handlerType)
    {
        if (this.registrations.TryGetValue(eventType, out List<Type> handlerTypeList))
        {
            return handlerTypeList != null && handlerTypeList.Contains(handlerType);
        }

        return false;
    }

    public void RegisterHandler<TEvent, THandler>()
        where TEvent : IEvent
        where THandler : IEventHandler<TEvent>
        => this.RegisterHandler(typeof(TEvent), typeof(THandler));

    public void RegisterHandler(Type eventType, Type handlerType)
    {
        Utils.ConcurrentDictionarySafeRegister(eventType, handlerType, this.registrations);
        this.registry.AddTransient(handlerType);
    }
}

好了,事件处理器执行上下文就定义好了,接下来就是在我们的ASP.NET Core Web API中使用。为了使用IEventHandlerExecutionContext,我们需要修改事件订阅器的接口定义,并相应地修改PassThroughEventBus以及Startup.cs。代码如下:

// IEventSubscriber
public interface IEventSubscriber : IDisposable
{
    void Subscribe<TEvent, TEventHandler>()
        where TEvent : IEvent
        where TEventHandler : IEventHandler<TEvent>;
}

// PassThroughEventBus
public sealed class PassThroughEventBus : IEventBus
{
    private readonly EventQueue eventQueue = new EventQueue();
    private readonly ILogger logger;
    private readonly IEventHandlerExecutionContext context;

    public PassThroughEventBus(IEventHandlerExecutionContext context,
        ILogger<PassThroughEventBus> logger)
    {
        this.context = context;
        this.logger = logger;
        logger.LogInformation($"PassThroughEventBus构造函数调用完成。Hash Code:{this.GetHashCode()}.");

        eventQueue.EventPushed += EventQueue_EventPushed;
    }

    private async void EventQueue_EventPushed(object sender, EventProcessedEventArgs e)
        => await this.context.HandleEventAsync(e.Event);

    public Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
        where TEvent : IEvent
            => Task.Factory.StartNew(() => eventQueue.Push(@event));

    public void Subscribe<TEvent, TEventHandler>()
        where TEvent : IEvent
        where TEventHandler : IEventHandler<TEvent>
    {
        if (!this.context.HandlerRegistered<TEvent, TEventHandler>())
        {
            this.context.RegisterHandler<TEvent, TEventHandler>();
        }
    }

    #region IDisposable Support
    private bool disposedValue = false; // To detect redundant calls
    void Dispose(bool disposing)
    {
        if (!disposedValue)
        {
            if (disposing)
            {
                this.eventQueue.EventPushed -= EventQueue_EventPushed;
                logger.LogInformation($"PassThroughEventBus已经被Dispose。Hash Code:{this.GetHashCode()}.");
            }

            disposedValue = true;
        }
    }
    public void Dispose() => Dispose(true);

    #endregion
}

// Startup.cs
public void ConfigureServices(IServiceCollection services)
{
    this.logger.LogInformation("正在对服务进行配置...");

    services.AddMvc();

    services.AddTransient<IEventStore>(serviceProvider =>
        new DapperEventStore(Configuration["mssql:connectionString"],
            serviceProvider.GetRequiredService<ILogger<DapperEventStore>>()));

    var eventHandlerExecutionContext = new EventHandlerExecutionContext(services,
        sc => sc.BuildServiceProvider());
    services.AddSingleton<IEventHandlerExecutionContext>(eventHandlerExecutionContext);
    services.AddSingleton<IEventBus, PassThroughEventBus>();

    this.logger.LogInformation("服务配置完成,已注册到IoC容器!");
}

// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
    var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
    eventBus.Subscribe<CustomerCreatedEvent, CustomerCreatedEventHandler>();

    if (env.IsDevelopment())
    {
        app.UseDeveloperExceptionPage();
    }

    app.UseMvc();
}

代码修改完成后,再次执行Web API,并发送两次(或多次)创建客户的请求,然后查看日志,我们发现,每次请求都会使用新的事件处理器去处理接收到的消息,在保存消息数据时,会使用新的DapperEventStore来保存数据,而保存完成后,会及时将DapperEventStore dispose掉:

小结

本文篇幅比较长,或许你没有太多耐心将文章读完。但我尽量将问题分析清楚,希望提供给读者的内容是详细的、有理有据的。文章中黑体部分是在设计过程中的一些思考和需要注意的地方,希望能够给读者在工作和学习之中带来启发和收获。总而言之,对象生命周期的管理,在服务端应用程序中是非常重要的,需要引起足够的重视。在下文中,我们打算逐步摆脱PassThroughEventBus,基于RabbitMQ来实现消息总线的基础结构。

源代码的使用

本系列文章的源代码在https://github.com/daxnet/edasample这个Github Repo里,通过不同的release tag来区分针对不同章节的源代码。本文的源代码请参考chapter_2这个tag,如下:

原文地址:https://www.cnblogs.com/daxnet/p/8270480.html

时间: 2024-12-17 15:49:44

ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理的相关文章

ASP.NET Core Web API下事件驱动型架构的实现(一):一个简单的实现

很长一段时间以来,我都在思考如何在ASP.NET Core的框架下,实现一套完整的事件驱动型架构.这个问题看上去有点大,其实主要目标是为了实现一个基于ASP.NET Core的微服务,它能够非常简单地订阅来自于某个渠道的事件消息,并对接收到的消息进行处理,于此同时,它还能够向该渠道发送事件消息,以便订阅该事件消息的消费者能够对消息数据做进一步处理.让我们回顾一下微服务之间通信的几种方式,分为同步和异步两种.同步通信最常见的就是RESTful API,而且非常简单轻量,一个Request/Resp

ASP.NET Core Web Api之JWT VS Session VS Cookie(二)

前言 本文我们来探讨下JWT VS Session的问题,这个问题本没有过多的去思考,看到评论讨论太激烈,就花了一点时间去研究和总结,顺便说一句,这就是写博客的好处,一篇博客写出有的可能是经验积累,有的可能是学习分享,但都逃不过看到文章的你有更多或更好的想法,往返交流自身能收获更多,何乐而不为呢?希望本文能解惑或者能得到更多的交流.我们可直接抛出问题:使用客户端存储的JWT比服务端维持Session更好吗? 基于JWT和Session认证共同点 既然要比较JWT VS Session,那我们就得

Docker容器环境下ASP.NET Core Web API

Docker容器环境下ASP.NET Core Web API应用程序的调试 本文主要介绍通过Visual Studio 2015 Tools for Docker – Preview插件,在Docker容器环境下,对ASP.NET Core Web API应用程序进行调试.在自己做实验的过程中也碰到了一些问题,经过一些测试和搜索资料,基本解决了这些问题,本文也会对这些问题进行介绍,以免有相同需求的朋友多走弯路. 插件的下载与安装 至撰写本文为止,Visual Studio 2015 Tools

在Mac下创建ASP.NET Core Web API

在Mac下创建ASP.NET Core Web API 在Mac下创建ASP.NET Core Web API 这系列文章是参考了.NET Core文档和源码,可能有人要问,直接看官方的英文文档不就可以了吗,为什么还要写这些文章呢? 原因如下: 官方文档涉及的内容相当全面,属于那种大而全的知识仓库,不太适合初学者,很容易让人失去重要,让人掉入到具体的细节之中. 对于大多数人来讲开发语言只是工具,程序员都有一个通病,就是死磕工具,把工具学深.我认为在工具上没有必要投入太多时间,以能高效地完成日常的

docker中运行ASP.NET Core Web API

在docker中运行ASP.NET Core Web API应用程序 本文是一篇指导快速演练的文章,将介绍在docker中运行一个ASP.NET Core Web API应用程序的基本步骤,在介绍的过程中,也会对docker的使用进行一些简单的描述.对于.NET Core以及docker的基本概念,网上已经有很多文章对其进行介绍了,因此本文不会再详细讲解这些内容.对.NET Core和docker不了解的朋友,建议首先查阅与这些技术相关的文档,然后再阅读本文. 先决条件 要完成本文所介绍的演练任

在docker中运行ASP.NET Core Web API应用程序

本文是一篇指导快速演练的文章,将介绍在docker中运行一个ASP.NET Core Web API应用程序的基本步骤,在介绍的过程中,也会对docker的使用进行一些简单的描述.对于.NET Core以及docker的基本概念,网上已经有很多文章对其进行介绍了,因此本文不会再详细讲解这些内容.对.NET Core和docker不了解的朋友,建议首先查阅与这些技术相关的文档,然后再阅读本文. 先决条件 要完成本文所介绍的演练任务,需要准备以下环境: Visual Studio 2015,或者Vi

支持多个版本的ASP.NET Core Web API

基本配置及说明 版本控制有助于及时推出功能,而不会破坏现有系统. 它还可以帮助为选定的客户提供额外的功能. API版本可以通过不同的方式完成,例如在URL中添加版本或通过自定义标头和通过Accept-Header作为查询字符串参数. 在这篇文章中,我们来看看如何支持多版本的ASP.NET Core Web API 创建一个ASP.NET Core Web API应用程序.通过 NuGet 安装此软件包:Microsoft.AspNetCore.Mvc.Versioning,打开Startup.c

Gitlab CI 自动部署 asp.net core web api 到Docker容器

为什么要写这个? 在一个系统长大的过程中会经历不断重构升级来满足商业的需求,而一个严谨的商业系统需要高效.稳定.可扩展,有时候还不得不考虑成本的问题.我希望能找到比较完整的开源解决方案来解决持续集成.监控报警.以及扩容和高可用性的问题.是学习和探索的过程分享给大家,也欢迎同行的人交流. 先来一个三步曲,我们将完成通过GitLab CI 自动部署 net core web api 到Docker 容器的一个示例.这是第一步,通过此文您将了解如何将net core web api 运行在Docker

使用 Swagger 自动生成 ASP.NET Core Web API 的文档、在线帮助测试文档(ASP.NET Core Web API 自动生成文档)

对于开发人员来说,构建一个消费应用程序时去了解各种各样的 API 是一个巨大的挑战.在你的 Web API 项目中使用 Swagger 的 .NET Core 封装 Swashbuckle 可以帮助你创建良好的文档和帮助页面. Swashbuckle 可以通过修改 Startup.cs 作为一组 NuGet 包方便的加入项目.Swashbuckle 是一个开源项目,为使用 ASP.NET Core MVC 构建的 Web APIs 生成 Swagger 文档.Swagger 是一个机器可读的 R