RocketMQ实现事务消息方案

RocketMQ 是一个来自阿里巴巴的分布式消息中间件,于 2012 年开源,并在 2017 年正式成为 Apache 顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在 RocketMQ 之上,并且最近几年的双十一大促中,RocketMQ 都有抢眼表现。Apache RocketMQ 4.3之后的版本正式支持事务消息,为分布式事务实现提供了便利性支持。

RocketMQ 事务消息设计则主要是为了解决 Producer 端的消息发送与本地事务执行的原子性问题,RocketMQ 的设计中 broker 与 producer 端的双向通信能力,使得 broker 天生可以作为一个事务协调者存在;而 RocketMQ 本身提供的存储机制为事务消息提供了持久化能力;RocketMQ 的高可用机制以及可靠消息设计则为事务消息在系统发生异常时依然能够保证达成事务的最终一致性。

在RocketMQ 4.3后实现了完整的事务消息,实际上其实是对本地消息表的一个封装,将本地消息表移动到了MQ内部,解决 Producer 端的消息发送与本地事务执行的原子性问题。

执行流程如下:

为方便理解我们还以注册送积分的例子来描述 整个流程。

Producer 即MQ发送方,本例中是用户服务,负责新增用户。MQ订阅方即消息消费方,本例中是积分服务,负责新增积分。

1、Producer 发送事务消息

Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。

本例中,Producer 发送 ”增加积分消息“ 到MQ Server。

2、MQ Server回应消息发送成功

MQ Server接收到Producer 发送给的消息则回应发送成功表示MQ已接收到消息。

3、Producer 执行本地事务

Producer 端执行业务代码逻辑,通过本地数据库事务控制。

本例中,Producer 执行添加用户操作。

4、消息投递

若Producer 本地事务执行成功则自动向MQServer发送commit消息,MQ Server接收到commit消息后将”增加积分消息“ 状态标记为可消费,此时MQ订阅方(积分服务)即正常消费消息;

若Producer 本地事务执行失败则自动向MQServer发送rollback消息,MQ Server接收到rollback消息后 将删除”增加积分消息“ 。

MQ订阅方(积分服务)消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即程序执行正常则自动回应ack。

5、事务回查

如果执行Producer端本地事务过程中,执行端挂掉,或者超时,MQ Server将会不停的询问同组的其他 Producer来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。

以上主干流程已由RocketMQ实现,对用户侧来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此只需关注本地事务的执行状态即可。

RoacketMQ提供RocketMQLocalTransactionListener接口:

public interface RocketMQLocalTransactionListener {
/**
- 发送prepare消息成功此方法被回调,该方法用于执行本地事务
- @param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id
- @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
- @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
*/
RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg);
/**
- @param msg 通过获取transactionId来判断这条消息的本地事务执行状态
- @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
*/
RocketMQLocalTransactionState checkLocalTransaction(Message msg);
}

发送事务消息:
以下是RocketMQ提供用于发送事务消息的API:

TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    //设置TransactionListener实现
    producer.setTransactionListener(transactionListener);
    //发送事务消息
    SendResult sendResult = producer.sendMessageInTransaction(msg, null);

下面给出代码示例

package com.example.rocketmq.common;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Slf4j
@Service
public class SyncProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public TransactionSendResult sendSyncMessage(String msg, String topic, String tag){

        log.info("【发送消息】:{}", msg);

        //构建消息体
        JSONObject jsonObject = new JSONObject();

        jsonObject.put("message",msg);

        Message<String> message = MessageBuilder.withPayload(jsonObject.toJSONString()).build();

        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("tx_group", topic, message, tag);

        log.info("【发送状态】:{}", result.getLocalTransactionState());

        return result;

    }

}
package com.example.rocketmq.common;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "tx_group")
public class SyncProducerListener implements RocketMQLocalTransactionListener {

    private ConcurrentHashMap<String, Object> localTrans = new ConcurrentHashMap<>();

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {

        try {

            log.info("【本地业务执行完毕】 msg:{}, Object:{}", message, o);

            localTrans.put(message.getHeaders().getId()+"", message.getPayload());

            return RocketMQLocalTransactionState.COMMIT;

        } catch (Exception e) {

            e.printStackTrace();

            log.error("【执行本地业务异常】 exception message:{}", e.getMessage());

            return RocketMQLocalTransactionState.ROLLBACK;

        }

    }
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {

        log.info("【执行检查任务】");

        return RocketMQLocalTransactionState.UNKNOWN;

    }
}
package com.example.rocketmq.common;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "TransactionTopic",consumerGroup = "spring_boot_consumer_group")
@Slf4j
public class TxmsgConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {

        log.info("开始消费消息:{}",s);

    }
}

application.propertis

rocketmq.name-server=localhost:9876
rocketmq.producer.group = spring_boot_producer_group

pom.xml

       <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.3</version>
        </dependency>

测试:

package com.example.rocketmq.controller;

import com.example.rocketmq.common.SyncProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestController {

    @Autowired
    private SyncProducer syncProducer;

    @GetMapping("test")
    String test() {

        syncProducer.sendSyncMessage("我随便传的一条测试消息,内容保密","TransactionTopic","TransactionTAG");

        return "ok";
    }

}

结果:

原文地址:https://www.cnblogs.com/liudalei/p/12529258.html

时间: 2024-07-30 14:31:51

RocketMQ实现事务消息方案的相关文章

一个基于RabbitMQ的可复用的事务消息方案

原文:一个基于RabbitMQ的可复用的事务消息方案 前提# 分布式事务是微服务实践中一个比较棘手的问题,在笔者所实施的微服务实践方案中,都采用了折中或者规避强一致性的方案.参考Ebay多年前提出的本地消息表方案,基于RabbitMQ和MySQL(JDBC)做了轻量级的封装,实现了低入侵性的事务消息模块.本文的内容就是详细分析整个方案的设计思路和实施.环境依赖如下: JDK1.8+ spring-boot-start-web:2.x.x.spring-boot-start-jdbc:2.x.x.

RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态为UN_KNOW时,第二次提交到服务器时将不会做任何操作,也就是说此时消息还存在与RMQ_SYS_TRANS_HALF_TOPIC主题中,并不能被消息消费者消费,那这些消息最终如何被提交或回滚呢? 原来RocketMQ使用TransactionalMessa

阿里云-ONS-Help-产品介绍-消息类型:事务消息

ylbtech-阿里云-ONS-Help-产品介绍-消息类型:事务消息 1.返回顶部 1. 本页目录 概念介绍 适用场景 交互流程 注意事项 更多信息 本文介绍消息队列 RocketMQ 版事务消息的概念.适用场景.交互流程以及使用过程中的注意事项. 概念介绍 事务消息:消息队列 RocketMQ 版提供类似 X/Open XA 的分布式事务功能,通过消息队列 RocketMQ 版事务消息能达到分布式事务的最终一致. 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列 Rock

Apache RocketMQ 正式开源分布式事务消息

近日,Apache RocketMQ 社区正式发布4.3版本.此次发布不仅包括提升性能,减少内存使用等原有特性增强,还修复了部分社区提出的若干问题,更重要的是该版本开源了社区最为关心的分布式事务消息,而且实现了对外部组件的零依赖.接下来,本文将详细探秘RocketMQ事务消息的设计原理以及实现机制. 一.需求缘起 在微服务架构中,随着服务的逐步拆分,数据库私有已经成为共识,这也导致所面临的分布式事务问题成为微服务落地过程中一个非常难以逾越的障碍,但是目前尚没有一个完整通用的解决方案. 其实不仅仅

搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务

搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务 初步认识RocketMQ的核心模块 rocketmq模块 rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息. rocketmq-client:提供发送.接受消息的客户端API. rocketmq-namesrv:NameServer,类似于Zookeeper,这里保存着消息的TopicName,队列等运行时的元信息.(有点NameNode的味道) rocketm

RocketMQ事务消息学习及刨坑过程

一.背景 MQ组件是系统架构里必不可少的一门利器,设计层面可以降低系统耦合度,高并发场景又可以起到削峰填谷的作用,从单体应用到集群部署方案,再到现在的微服务架构,MQ凭借其优秀的性能和高可靠性,得到了广泛的认可. 随着数据量增多,系统压力变大,开始出现这种现象:数据库已经更新了,但消息没发出来,或者消息先发了,但后来数据库更新失败了,结果研发童鞋各种数据修复,这种生产问题出现的概率不大,但让人很郁闷.这个其实就是数据库事务与MQ消息的一致性问题,简单来讲,数据库的事务跟普通MQ消息发送无法直接绑

消息中间件 RocketMQ源码解析:事务消息

关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址 您对于源码的疑问每条留言都将得到认真回复.甚至不知道如何读源码也可以请教噢. 新的源码解析文章实时收到通知.每周更新一篇左右. 1. 概述 2. 事务消息发送 2.1 Producer 发送事务消息 2.2 Broker 处理结束事务请求 2.3 Broker 生成

RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想

RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理.首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中. 官方版本未发布之前,从apache rocketmq第一个版本上线后,代码中存在与事务消息相关的代码,例如COMMIT.ROLLBACK.PREPARED,在事务消息未开源之前网上对于事务消息的"声音"基本上是使用类似二阶段提交,主要是根据消息系统标志MessageSysFlag中定义来推测的: TRANSACTION_P

RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚)

本文将重点分析RocketMQ Broker如何处理事务消息提交.回滚命令,根据前面的介绍,其入口EndTransactionProcessor#proce***equest: OperationResult result = new OperationResult();if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // @1result = this.brokerCont