设计目标
- 尽量快的处理命令和事件,保证吞吐量;
- 希望处理完一个命令后不需要等待命令产生的事件持久化完成就能处理下一个命令;
- 保证命令、事件处理的顺序性,先来的先处理,先产生的先处理;
- 保证一个聚合根的事件只有一个线程在持久化,并按事件产生的顺序持久化;
- 持久化事件时如果遇到并发冲突时(聚合根ID+事件版本号出现重复)的处理代价要轻;
- 要能利用多核的优势;
总体设计思路
- 先将命令根据聚合根ID路由到CommandMailBox里;
- 单线程处理CommandMailBox中的命令,由于聚合根在in-memory本地内存,所以处理非常快;
- 处理成功后更新聚合根的in-memory内存;
- 将聚合根产生的事件同样原理路由到EventMailBox里;
- 单线程批量处理EventMailBox中的事件;由于是批量,所以持久化的吞吐量也可以保证;
- 处理完成一批事件后,把这一批事件对应的命令从CommandMailBox中移除;
详细设计思路
- 设计N个存放命令的CommandMailBox,命令首先按聚合根ID的hashcode取摸路由到对应的CommandMailBox;
- 每个CommandMailBox都有一个maxOffset, consumeOffset,以及一个CommandProcessor(单线程)在不停的处理;maxOffset表示最后一个命令的位置;consumeOffset表示当前正在处理的命令的位置;
- CommandProcessor的处理逻辑;
- 创建、修改聚合根;
- 更新聚合根的in-memory缓存;
- 将聚合根产生的事件按聚合根ID的hashcode取摸路由到对应的EventMailBox;EventMailBox的个数也是程序启动时配置;
- 每个EventMailBox都有一个maxOffset, consumeOffset,以及一个EventProcessor(单线程)在不停的处理;maxOffset表示最后一个事件的位置;consumeOffset表示当前正在处理的事件的位置;
- EventProcessor的处理逻辑:
- 按次序批量获取一批要处理的事件;
- 批量持久化事件到EventStore,采用SqlBulkCopy;
- 如果持久化一切顺利,则publish这一批事件(publish如果遇到网络IO异常,则重试,直到成功为止),然后继续持久化下一批,并同时将当前这一批事件对应的命令从CommandMailBox中删除;
- 如果持久化遇到并发冲突(事件的aggregateRootId+Version重复),则对当前这一批事件一个个持久化,如果当前事件持久化成功,则同样publish该事件,当然遇到IO异常时也要不断重试,直到成功为止;如果当前事件持久化出现并发冲突,就做如下处理:
- 先通知当前事件对应聚合根暂停处理后续的命令;
- 把这一批里该聚合根的所有事件移除;把EventMailBox中的所有该聚合根的所有的事件移除;
- 将CommandMailBox的处理位置重置为当前事件对应的命令的offset;从而可以确保产生并发冲突的事件对应的命令以及后续的命令能再重新被处理一遍;
- 通知当前事件对应聚合根继续处理后续的命令(从哪个位置开始处理,在上面第三步已经重置过了);
- 这一批并发有冲突的事件都一个个处理完之后,继续做1);
时间: 2024-11-05 08:53:45