ENode框架一台机器在处理Command时的设计思路

设计目标

  1. 尽量快的处理命令和事件,保证吞吐量;
  2. 希望处理完一个命令后不需要等待命令产生的事件持久化完成就能处理下一个命令;
  3. 保证命令、事件处理的顺序性,先来的先处理,先产生的先处理;
  4. 保证一个聚合根的事件只有一个线程在持久化,并按事件产生的顺序持久化;
  5. 持久化事件时如果遇到并发冲突时(聚合根ID+事件版本号出现重复)的处理代价要轻;
  6. 要能利用多核的优势;

总体设计思路

  1. 先将命令根据聚合根ID路由到CommandMailBox里;
  2. 单线程处理CommandMailBox中的命令,由于聚合根在in-memory本地内存,所以处理非常快;
  3. 处理成功后更新聚合根的in-memory内存;
  4. 将聚合根产生的事件同样原理路由到EventMailBox里;
  5. 单线程批量处理EventMailBox中的事件;由于是批量,所以持久化的吞吐量也可以保证;
  6. 处理完成一批事件后,把这一批事件对应的命令从CommandMailBox中移除;

详细设计思路

  1. 设计N个存放命令的CommandMailBox,命令首先按聚合根ID的hashcode取摸路由到对应的CommandMailBox;
  2. 每个CommandMailBox都有一个maxOffset, consumeOffset,以及一个CommandProcessor(单线程)在不停的处理;maxOffset表示最后一个命令的位置;consumeOffset表示当前正在处理的命令的位置;
  3. CommandProcessor的处理逻辑;
    • 创建、修改聚合根;
    • 更新聚合根的in-memory缓存;
    • 将聚合根产生的事件按聚合根ID的hashcode取摸路由到对应的EventMailBox;EventMailBox的个数也是程序启动时配置;
  4. 每个EventMailBox都有一个maxOffset, consumeOffset,以及一个EventProcessor(单线程)在不停的处理;maxOffset表示最后一个事件的位置;consumeOffset表示当前正在处理的事件的位置;
  5. EventProcessor的处理逻辑:
    • 按次序批量获取一批要处理的事件;
    • 批量持久化事件到EventStore,采用SqlBulkCopy;
    • 如果持久化一切顺利,则publish这一批事件(publish如果遇到网络IO异常,则重试,直到成功为止),然后继续持久化下一批,并同时将当前这一批事件对应的命令从CommandMailBox中删除;
    • 如果持久化遇到并发冲突(事件的aggregateRootId+Version重复),则对当前这一批事件一个个持久化,如果当前事件持久化成功,则同样publish该事件,当然遇到IO异常时也要不断重试,直到成功为止;如果当前事件持久化出现并发冲突,就做如下处理:
      1. 先通知当前事件对应聚合根暂停处理后续的命令;
      2. 把这一批里该聚合根的所有事件移除;把EventMailBox中的所有该聚合根的所有的事件移除;
      3. 将CommandMailBox的处理位置重置为当前事件对应的命令的offset;从而可以确保产生并发冲突的事件对应的命令以及后续的命令能再重新被处理一遍;
      4. 通知当前事件对应聚合根继续处理后续的命令(从哪个位置开始处理,在上面第三步已经重置过了);
    • 这一批并发有冲突的事件都一个个处理完之后,继续做1);
时间: 2024-11-05 08:53:45

ENode框架一台机器在处理Command时的设计思路的相关文章

ENode 2.0 - 介绍一下关于ENode中对Command的调度设计

CQRS架构,C端的职责是处理从上层发送过来的command.对于单台机器来说,我们如何尽快的处理command呢?本文想通过不断提问和回答的方式,把我的思考写出来. 首先,我们最容易想到的是使用多线程.那当我们要处理一个command时,能直接丢到线程池中,直接交给线程池去调度吗?不行.因为假如多个command修改同一个聚合根时,会导致db的并发冲突,从而会导致command的不断重试,大大降低了command的处理速度. 那该怎么解决呢?既然直接多线程处理会有并发冲突的问题,那就对comm

ENode框架Conference案例分析系列之 - 架构设计

Conference架构概述 先贴一下Conference案例的在线地址,UI因为完全拿了微软的实现,所以都是英文的,以后我有空再改为中文的. Conference后台会议管理:http://www.enode.me/conference Conference前台预定座位:http://www.enode.me/registration ENode论坛开源案例:http://www.enode.me/post ENode开源项目地址:https://github.com/tangxuehua/e

ENode框架Conference案例分析系列之 - ENode框架初始化

前言 Conference案例是使用ENode框架来开发的.之前我没有介绍过ENode框架是如何启动的,以及启动时要注意的一些点,估计很多人对ENode框架的初始化这一块感觉很复杂,一头雾水.所以,本文想简单介绍一下在做一个实际项目时,我们该如何初始化ENode. 使用ENode开发的项目的顶层宿主工程一般有两类:1)前台Web项目,它的职责就是发送命令:2)后台ProcessorHost项目,负责处理命令或事件: 这两类项目的初始化方式完全一样,只是Web项目可能需要多初始化Controlle

ENode框架Conference案例分析系列之 - 复杂情况的读库更新设计

问题背景 Conference案例,是一个关于在线创建会议(类似QCon这种全球开发者大会).在线管理会议位置信息.在线预订某个会议的位置的,这样一个系统.具体可以看微软的这个项目的主页:http://cqrsjourney.github.io. 然后我们设计了一个Conference聚合根,对应领域中的会议这个领域概念.Conference聚合根下面,有一些位置信息SeatType.一个会议聚合根下面可以添加不同类型的位置,每种类型的位置可以指定数量以及价格.所以,Conference是聚合根

ENode框架初始化

ENode框架初始化 前言 Conference案例是使用ENode框架来开发的.之前我没有介绍过ENode框架是如何启动的,以及启动时要注意的一些点,估计很多人对ENode框架的初始化这一块感觉很复杂,一头雾水.所以,本文想简单介绍一下在做一个实际项目时,我们该如何初始化ENode. 使用ENode开发的项目的顶层宿主工程一般有两类:1)前台Web项目,它的职责就是发送命令:2)后台ProcessorHost项目,负责处理命令或事件: 这两类项目的初始化方式完全一样,只是Web项目可能需要多初

ENode框架Conference案例分析系列之 - 订单处理减库存的设计

前言 前面的文章,我介绍了Conference案例的业务.上下文划分.领域模型.架构,以及代码整体流程.接下来想针对案例中一些重要的场景,分别做进一步的分析.本文想先介绍一下Conference案例的核心业务场景 - 订单处理减库存的设计. 下单以及订单处理流程描述 下单过程 预订者浏览某个已发布的会议: 进入会议的详情页面,该页面显示了所有可预订的座位分类信息: 预订者选择好要预订的座位分类,录入每个分类的预定数量: 预订者点击提交按钮,提交下单请求到Server端: Server端订单处理过

ENode框架Conference案例转载

ENode框架Conference案例分析系列之 - Quick Start 前言 前一篇文章介绍了Conference案例的架构设计,本篇文章开始介绍Conference案例的代码实现.由于代码比较多,一开始就全部介绍所有细节,估计很多人接受不了,也理解不了.所以,我先进行一次QuickStart的介绍,即选取某个简单典型的场景从前到后过一下每个环节.这样大家就能够快速对代码的重要关键环节有大概的理解.另外,我现在正在做ENode的官网,到时会像axon framework一样,介绍ENode

ENode框架Conference案例分析系列之 - Quick Start

前言 前一篇文章介绍了Conference案例的架构设计,本篇文章开始介绍Conference案例的代码实现.由于代码比较多,一开始就全部介绍所有细节,估计很多人接受不了,也理解不了.所以,我先进行一次QuickStart的介绍,即选取某个简单典型的场景从前到后过一下每个环节.这样大家就能够快速对代码的重要关键环节有大概的理解.另外,我现在正在做ENode的官网,到时会像axon framework一样,介绍ENode框架本身.使用场景.性能数据.案例,以及论坛社区等功能: 本文打算选择Conf

ENode框架使用场景简述

ENode是一个基于DDD,CQRS,ES,EDA,In-Memory架构风格,可以帮助开发者开发高并发.高吞吐.可伸缩.可扩展的应用程序.ENode可能的应用场景如下: 当你正在找一个.NET平台的DDD的开发框架时,可以考虑ENode: 当你想找一个CQRS架构的实现框架时,可以考虑ENode:当你的系统具有大量的写入,同时又有更大量的读取时,只要系统能接受写入数据和读取数据的最终一致性(秒级),那就可以考虑使用ENode:ENode可以让我们对读写两端做不同的技术架构,分开优化,互不影响: