事务消息组件的套路

事务消息组件的套路

本文总结一些互联网产品的服务端关于处理事务消息的套路

套路1:最终一致性消息模型

该方案关键是要有个消息表。另外,一般会有个队列,而且我们一般都会假设这个MQ不丢消息。

基本思路如下

  • 消息生产方

需要额外建一个消息表,并记录消息发送状态。消息表和业务数据要在一个事务里提交(有时实现时为了简单,可以只是增加一个字段。新增字段会跟业务强耦合)

  • 消息消费方

    需要处理这个消息,并完成自己的业务逻辑。

如果本地事务处理成功,那发送给生产方一个confirm消息,表明已经处理成功了。

如果处理失败,可以发送给生产方一个失败消息,或者记录本地表。以后重试。

生产方定时扫描本地消息表,把还没处理完成的消息重新发送一遍。如果有其他的对账补账逻辑,这一步也可以省略。

整体流程如下图所示:

对一致性要求不高的或者有其他兜底方案的场景(比如较为频繁的对账补账机制),我们就不需要关心消息的confirm等情况,只要扔给消息,就认为OK,一般也是可取的。

但是这个方案存在一个小问题

如果发送消息失败,发送方并不知道是消息中间件真的没有收到消息呢,还是消息已经收到了只是返回response的时候失败了

  1. 如果是已经收到消息了,而发送端认为没有收到,执行update db的回滚操作。则会导致发送方事务回滚,接受方却把后续逻辑做掉了
  2. 把网络调用放在DB事务里面,可能会因为网络的延时,导致DB长事务,存在风险

套路2:事务消息模型

国内常用方案

事务消息与上面提到的最终一致性模型一致,只不过是把记录消息发送状态这一步在中间件内部做掉了,从而无需业务方针对每个业务自己手动实现。

具体的实现如下图所示。

现在的步骤如下:

  1. 业务应用把要发送的消息给协调器

    1.1 协调器在他自己的DB中记录下这条消息

    1.2 协调器返回对应的msgid

  2. 业务应用自行本地事务更新DB
  3. 业务系统如果本地事务执行成功,告诉协调器这条消息Commit,如果本地事务执行失败,则rollback

    3.1 协调器更新自己的数据库,标记消息已经Commit/Rollback

  4. 协调器对于成功submit的消息,开始往MQ进行投递,等待ack,如果一段时间没有收到ack,会继续投递该消息
  5. MQ将消息投递给接收方,接收方执行事务更新DB
  6. 接收方应用给协调器做ack,告诉这条消息消费成功.如果长时间没有收到ACK,协调器重投到MQ,这里需要接受方做幂等实现
  7. 如果协调器没有收到Commit/rollback,则会询问业务应用消息的状态,是要Commit还是Rollback

对于网易内部的TMC来说,1、3、7三步都是由dubbo实现的(7走的Dubbo的泛化调用)。

阿里的Rocketmq则是做在了产品内部,不依赖外部实现。

RocketMQ的实现细节见此

对比而言,方案2的最大不一样就是把“扫描消息表”这个事情,不需要业务方做,消息中间件帮着做了。

至于消息表,其实还是没有省掉。因为消息中间件要询问发送方,事物是否执行成功,还是需要一个“变相的本地消息表”或者说业务自己的业务表有对应的信息,记录事物执行状态。

KAFKA的事务消息实现方案

Kafka的transaction最开始是给kafka steaming实现exactly once语义设计的,后面又有一些扩展。

Kafka的实现方案(不考虑kafka streaming的情况下,kafka streaming对消费者有处理)对比上面是没有回查步骤的,同时消费者也需要业务代码来保证幂等。

流程图如下:

上图来自matt的博客

KAFKA的事务消息具体实现比较复杂,简单来说分为如下几个步骤:

  1. 生产者像事务协调器注册自己的TransactionalID,这个ID需要保证每台生产者是唯一的。这时如果对应的TransactionalID的机器有没有结束的transaction,会直接被终止。
  2. beginTransaction是本地做的,对于服务端没有交互
  3. 向协调器注册要发消息的partition(要解决跨Topic,跨Partition的事务,对应上图4.1a,协调器开始写ongoing的txLog到Transaction_Log的内部topic中)
  4. 正常发送消息到对应的partition去
  5. 调用commit/abord到协调器,协调器开始[两阶段提交]
  6. 协调器写prepare消息到Transaction_Log,这时就算提交了。然后再会给发送到的partition后面补一个不可见的commit消息(主要是用来处理read committed),然后会在写一个commited到到Transaction_Log。到这里就算一个事务结束。

由于这个方案没有引入一个唯一的消息ID(非kafka streaming 模式),且没有回查机制,对于方案1中的最后两个问题也是没有解决。

Kafka自己的kafka streaming则是需要把消费源头的offset作为transaction的一部分(会写到到Transaction_Log里)提交给协调器来实现幂等。而对接一般的外部业务系统,则没有对应机制可以做这件事。

下面是相关的资料:

confluent的简单介绍

原始KIP

matt33的博客(中文)写的很详细,推荐一看

原文地址:https://www.cnblogs.com/ctgulong/p/10312100.html

时间: 2024-11-15 01:00:11

事务消息组件的套路的相关文章

消息中间件 RocketMQ源码解析:事务消息

关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址 您对于源码的疑问每条留言都将得到认真回复.甚至不知道如何读源码也可以请教噢. 新的源码解析文章实时收到通知.每周更新一篇左右. 1. 概述 2. 事务消息发送 2.1 Producer 发送事务消息 2.2 Broker 处理结束事务请求 2.3 Broker 生成

Apache RocketMQ 正式开源分布式事务消息

近日,Apache RocketMQ 社区正式发布4.3版本.此次发布不仅包括提升性能,减少内存使用等原有特性增强,还修复了部分社区提出的若干问题,更重要的是该版本开源了社区最为关心的分布式事务消息,而且实现了对外部组件的零依赖.接下来,本文将详细探秘RocketMQ事务消息的设计原理以及实现机制. 一.需求缘起 在微服务架构中,随着服务的逐步拆分,数据库私有已经成为共识,这也导致所面临的分布式事务问题成为微服务落地过程中一个非常难以逾越的障碍,但是目前尚没有一个完整通用的解决方案. 其实不仅仅

搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务

搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务 初步认识RocketMQ的核心模块 rocketmq模块 rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息. rocketmq-client:提供发送.接受消息的客户端API. rocketmq-namesrv:NameServer,类似于Zookeeper,这里保存着消息的TopicName,队列等运行时的元信息.(有点NameNode的味道) rocketm

RocketMQ事务消息学习及刨坑过程

一.背景 MQ组件是系统架构里必不可少的一门利器,设计层面可以降低系统耦合度,高并发场景又可以起到削峰填谷的作用,从单体应用到集群部署方案,再到现在的微服务架构,MQ凭借其优秀的性能和高可靠性,得到了广泛的认可. 随着数据量增多,系统压力变大,开始出现这种现象:数据库已经更新了,但消息没发出来,或者消息先发了,但后来数据库更新失败了,结果研发童鞋各种数据修复,这种生产问题出现的概率不大,但让人很郁闷.这个其实就是数据库事务与MQ消息的一致性问题,简单来讲,数据库的事务跟普通MQ消息发送无法直接绑

我心中的核心组件(可插拔的AOP)~消息组件~完善篇

回到目录 为什么要有本篇文章 本篇文章主要实现了RTX消息生产者,并且完成了整体的设计方式,之前在设计时消息生产者全局使用单一的生产方式,即一个项目里使用了Email就不能使用SMS,这种设计方法和实际不相符,虽然在性能上是最佳的(采用了单例模型,维护一个生产者,使用策略模式进行生产者的实现,使用工厂模式生产具体生产者),实际项目中,应该是可以自动选择消息生产者的,当然为了程序的性能,我们还必须使用单例模式来生产具体生产者,这种单例,在本程序中,采用了具体类型,具体单例的方法,即Email自己维

说说设计模式~装饰器模式(Decorator)~多功能消息组件的实现

返回目录 为何要设计多功能消息组件 之前写过一篇装饰器模式的文章,感觉不够深入,这次的例子是实现项目中遇到的,所以把它拿出来,再写写,之前也写过消息组件的文章,主要采用了策略模式实现的,即每个项目可以通过配置进行一种消息的订制,如,你可以订制email,sms,rtx,qq等,但不能同时采用多种机制完成消息的发送,这在一些情况下是没有问题的,但有时,我们也需要同时为客户提供多种消息的推送,这在目前还是挺现时的,如在用户下单后,同时为它发email 和短信进行通过,并对每个订单的过程进行跟踪并通知

RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想

RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理.首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中. 官方版本未发布之前,从apache rocketmq第一个版本上线后,代码中存在与事务消息相关的代码,例如COMMIT.ROLLBACK.PREPARED,在事务消息未开源之前网上对于事务消息的"声音"基本上是使用类似二阶段提交,主要是根据消息系统标志MessageSysFlag中定义来推测的: TRANSACTION_P

RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态为UN_KNOW时,第二次提交到服务器时将不会做任何操作,也就是说此时消息还存在与RMQ_SYS_TRANS_HALF_TOPIC主题中,并不能被消息消费者消费,那这些消息最终如何被提交或回滚呢? 原来RocketMQ使用TransactionalMessa

RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚)

本文将重点分析RocketMQ Broker如何处理事务消息提交.回滚命令,根据前面的介绍,其入口EndTransactionProcessor#proce***equest: OperationResult result = new OperationResult();if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // @1result = this.brokerCont