RocketMQ 整合SpringBoot发送事务消息

环境

jdk: 8u22
rocketmq: rocketmq-all-4.5.2-bin-release
springboot: 2.1.6.RELEASE
rocketmq-springboot: 2.0.3

发送流程(事务消息)

Rocket发送事务消息:
1、由producer发送prepare(半消息)给MQ的broker
2、prepare消息发送成功以后执行本地业务(本地事务),根据本地事务执行结果手动返回相应状态(RocketMQLocalTransactionState.COMMIT、RocketMQLocalTransactionState.ROLLBACK等)
3、如果是COMMIT则MQ发送消息给consumer,然后consumer执行本地业务。如果是ROLLBACK则会删除prepare消息
4、如果MQ一直没收到返回状态则会启动定时任务检查本地事务状态
5、消费者、生产者的事务各由自己保证。MQ的事务是由MQ保证,这里会根据使用者配置的参数来决定如何执行。

生产者

发送事务消息

生产者

/**
 * @author Zhang Qiang
 * @date 2019/12/4 15:55
 */
@Slf4j
@Service
public class SyncProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public TransactionSendResult sendSyncMessage(String msg, String topic, String tag){
        log.info("【发送消息】:{}", msg);
        Message message = MessageBuilder.withPayload(msg).build();
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(MQGroup.SPRING_BOOT_PRODUCER_GROUP.getGroup(), topic, message, tag);
        log.info("【发送状态】:{}", result.getLocalTransactionState());
        return result;
    }

}

监听生产者发送事务消息

每次推送消息会执行executeLocalTransaction方法,首先会发送半消息,到这里的时候是执行具体本地业务,执行成功后手动返回RocketMQLocalTransactionState.COMMIT状态,这里是保证本地事务执行成功,如果本地事务执行失败则可以返回ROLLBACK进行消息回滚。 此时消息只是被保存到broker,并没有发送到topic中,broker会根据本地返回的状态来决定消息的处理方式。

checkLocalTransaction方法是rocket定时回查时所执行的,本环境下并没有执行,原因版本原因,是因为当前版本没有执行回调(待验证)。

/**
 * @author Zhang Qiang
 * @date 2019/12/4 16:07
 */
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "spring_boot_producer_group")
public class SyncProducerListener implements RocketMQLocalTransactionListener {
    private AtomicInteger trnner = new AtomicInteger(0);
    private ConcurrentHashMap<String, Object> localTrans = new ConcurrentHashMap<>();
    @Autowired
    private LocalService localService;
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            localService.executeLocalService(message.getPayload().toString());
            log.info("【本地业务执行完毕】 msg:{}, Object:{}", message, o);
            localTrans.put(message.getHeaders().getId()+"", message.getPayload());
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
            log.error("【执行本地业务异常】 exception message:{}", e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        log.info("【执行检查任务】");
        return RocketMQLocalTransactionState.UNKNOWN;
    }
}

消费者

这里注解可以使用selectorExpression = "tag",来标明tag,注意这里如果队列发送的时候当前消费者无法消费的时候消息就会发送给其它tag,也就是说tag并不能保证一定能消费到消息。

/**
 * @author Zhang Qiang
 * @date 2019/12/4 16:07
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "springboot_topic", consumerGroup = "spring_boot_consumer_group")
public class SyncConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {

    @Autowired
    ConsumerService service;

    @Override
    public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
        defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently)(exts, context) ->{
            try {
                log.info("【获取消息】");
                if (!CollectionUtils.isEmpty(exts)) {
                    exts.forEach(ext->{
                        Integer con = ext.getReconsumeTimes();
                        service.readMessage(new String(ext.getBody()));
                        log.info("【消费消息】 次数:{}, ext :{}", con, ext);
                    });
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e){
                e.printStackTrace();
                log.error("【消费消息失败】,message:{}", e.getMessage());
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
    }

    @Override
    public void onMessage(MessageExt messageExt) {
        String msg = null;
        try {
            msg = new String(messageExt.getBody(),"utf-8");
            log.info("MsgConsumer >>> {}, msgId:{}", msg, messageExt.getMsgId());
            service.readMessage(msg);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            log.error("【消费异常】:{}", e.getMessage());
        }
    }

}

  

执行结果:

2019-12-04 17:36:43.711  INFO 9792 --- [io-10100-exec-3] com.zang.rocket.producer.SyncProducer    : 【发送状态】:COMMIT_MESSAGE
2019-12-04 17:36:59.956  INFO 9792 --- [io-10100-exec-5] com.zang.rocket.producer.SyncProducer    : 【发送消息】:farst
2019-12-04 17:36:59.958  INFO 9792 --- [io-10100-exec-5] com.zang.rocket.service.LocalService     : 【执行业务,读取发送消息】:[[email protected]
2019-12-04 17:36:59.958  INFO 9792 --- [io-10100-exec-5] c.z.r.producer.SyncProducerListener      : 【本地业务执行完毕】 msg:GenericMessage [payload=byte[5], headers={rocketmq_TOPIC=springboot_topic, rocketmq_FLAG=0, id=252178bf-1d37-2f33-0892-721a12c0fc23, rocketmq_TRANSACTION_ID=C0A80A8B264018B4AAC2133ACA340005, timestamp=1575452219958}], Object:springboot_tag
2019-12-04 17:36:59.958  INFO 9792 --- [io-10100-exec-5] com.zang.rocket.producer.SyncProducer    : 【发送状态】:COMMIT_MESSAGE
2019-12-04 17:37:19.508  INFO 9792 --- [io-10100-exec-7] com.zang.rocket.producer.SyncProducer    : 【发送消息】:oneworld
2019-12-04 17:37:19.509  INFO 9792 --- [io-10100-exec-7] com.zang.rocket.service.LocalService     : 【执行业务,读取发送消息】:[[email protected]
2019-12-04 17:37:19.510  INFO 9792 --- [io-10100-exec-7] c.z.r.producer.SyncProducerListener      : 【本地业务执行完毕】 msg:GenericMessage [payload=byte[8], headers={rocketmq_TOPIC=springboot_topic, rocketmq_FLAG=0, id=e9f68709-929a-acc9-bc90-426dc299fc5e, rocketmq_TRANSACTION_ID=C0A80A8B264018B4AAC2133B16940006, timestamp=1575452239509}], Object:springboot_tag
2019-12-04 17:37:19.510  INFO 9792 --- [io-10100-exec-7] com.zang.rocket.producer.SyncProducer    : 【发送状态】:COMMIT_MESSAGE
2019-12-04 17:37:42.620  INFO 9792 --- [io-10100-exec-9] com.zang.rocket.producer.SyncProducer    : 【发送消息】:好快!
2019-12-04 17:37:42.622  INFO 9792 --- [io-10100-exec-9] com.zang.rocket.service.LocalService     : 【执行业务,读取发送消息】:[[email protected]
2019-12-04 17:37:42.622  INFO 9792 --- [io-10100-exec-9] c.z.r.producer.SyncProducerListener      : 【本地业务执行完毕】 msg:GenericMessage [payload=byte[7], headers={rocketmq_TOPIC=springboot_topic, rocketmq_FLAG=0, id=088cfba8-7b49-6ecd-8c7d-1c4b3e543f7d, rocketmq_TRANSACTION_ID=C0A80A8B264018B4AAC2133B70DC0007, timestamp=1575452262622}], Object:springboot_tag
2019-12-04 17:37:42.622  INFO 9792 --- [io-10100-exec-9] com.zang.rocket.producer.SyncProducer    : 【发送状态】:COMMIT_MESSAGE
2019-12-04 17:38:00.541  INFO 9792 --- [io-10100-exec-2] com.zang.rocket.producer.SyncProducer    : 【发送消息】:事务消息!
2019-12-04 17:38:00.543  INFO 9792 --- [io-10100-exec-2] com.zang.rocket.service.LocalService     : 【执行业务,读取发送消息】:[[email protected]
2019-12-04 17:38:00.543  INFO 9792 --- [io-10100-exec-2] c.z.r.producer.SyncProducerListener      : 【本地业务执行完毕】 msg:GenericMessage [payload=byte[13], headers={rocketmq_TOPIC=springboot_topic, rocketmq_FLAG=0, id=b0fb1bcd-723a-bcac-d650-08eeff193762, rocketmq_TRANSACTION_ID=C0A80A8B264018B4AAC2133BB6DD0008, timestamp=1575452280543}], Object:springboot_tag

消费结果

2019-12-04 17:36:59.966  INFO 30024 --- [MessageThread_3] com.zang.rocket.cunsumer.SyncConsumer    : 【获取消息】
2019-12-04 17:36:59.966  INFO 30024 --- [MessageThread_3] com.zang.rocket.service.ConsumerService  : 【读取消息服务】:farst
2019-12-04 17:36:59.966  INFO 30024 --- [MessageThread_3] com.zang.rocket.cunsumer.SyncConsumer    : 【消费消息】 次数:0 ext :MessageExt [queueId=1, storeSize=316, queueOffset=0, sysFlag=8, bornTimestamp=1575452219956, bornHost=/192.168.10.139:60014, storeTimestamp=1575452219960, storeHost=/192.168.10.139:10911, msgId=C0A80A8B00002A9F0000000003C8971E, commitLogOffset=63477534, bodyCRC=1472579256, reconsumeTimes=0, preparedTransactionOffset=63477210, toString()=Message{topic=‘springboot_topic‘, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=springboot_topic, MAX_OFFSET=1, TRAN_MSG=true, CONSUME_START_TIME=1575452219966, id=b63e043d-d3d6-59ad-b455-bbc02add83e0, UNIQ_KEY=C0A80A8B264018B4AAC2133ACA340005, WAIT=false, PGROUP=spring_boot_producer_group, timestamp=1575452219956, REAL_QID=1}, body=[102, 97, 114, 115, 116], transactionId=‘C0A80A8B264018B4AAC2133ACA340005‘}]
2019-12-04 17:37:19.518  INFO 30024 --- [MessageThread_4] com.zang.rocket.cunsumer.SyncConsumer    : 【获取消息】
2019-12-04 17:37:19.518  INFO 30024 --- [MessageThread_4] com.zang.rocket.service.ConsumerService  : 【读取消息服务】:oneworld
2019-12-04 17:37:19.518  INFO 30024 --- [MessageThread_4] com.zang.rocket.cunsumer.SyncConsumer    : 【消费消息】 次数:0 ext :MessageExt [queueId=8, storeSize=319, queueOffset=0, sysFlag=8, bornTimestamp=1575452239508, bornHost=/192.168.10.139:60014, storeTimestamp=1575452239512, storeHost=/192.168.10.139:10911, msgId=C0A80A8B00002A9F0000000003C89A2D, commitLogOffset=63478317, bodyCRC=319562353, reconsumeTimes=0, preparedTransactionOffset=63477990, toString()=Message{topic=‘springboot_topic‘, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=springboot_topic, MAX_OFFSET=1, TRAN_MSG=true, CONSUME_START_TIME=1575452239518, id=9c9acec6-1fd4-d45c-1bc2-bc489e08e8bc, UNIQ_KEY=C0A80A8B264018B4AAC2133B16940006, WAIT=false, PGROUP=spring_boot_producer_group, timestamp=1575452239508, REAL_QID=8}, body=[111, 110, 101, 119, 111, 114, 108, 100], transactionId=‘C0A80A8B264018B4AAC2133B16940006‘}]
2019-12-04 17:37:42.634  INFO 30024 --- [MessageThread_5] com.zang.rocket.cunsumer.SyncConsumer    : 【获取消息】
2019-12-04 17:37:42.634  INFO 30024 --- [MessageThread_5] com.zang.rocket.service.ConsumerService  : 【读取消息服务】:好快!
2019-12-04 17:37:42.634  INFO 30024 --- [MessageThread_5] com.zang.rocket.cunsumer.SyncConsumer    : 【消费消息】 次数:0 ext :MessageExt [queueId=6, storeSize=318, queueOffset=0, sysFlag=8, bornTimestamp=1575452262620, bornHost=/192.168.10.139:60014, storeTimestamp=1575452262624, storeHost=/192.168.10.139:10911, msgId=C0A80A8B00002A9F0000000003C89D3E, commitLogOffset=63479102, bodyCRC=1129015149, reconsumeTimes=0, preparedTransactionOffset=63478776, toString()=Message{topic=‘springboot_topic‘, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=springboot_topic, MAX_OFFSET=1, TRAN_MSG=true, CONSUME_START_TIME=1575452262634, id=92e37d9b-b7b5-6064-65e5-deee043a5c48, UNIQ_KEY=C0A80A8B264018B4AAC2133B70DC0007, WAIT=false, PGROUP=spring_boot_producer_group, timestamp=1575452262620, REAL_QID=6}, body=[-27, -91, -67, -27, -65, -85, 33], transactionId=‘C0A80A8B264018B4AAC2133B70DC0007‘}]
2019-12-04 17:38:00.549  INFO 30024 --- [MessageThread_6] com.zang.rocket.cunsumer.SyncConsumer    : 【获取消息】
2019-12-04 17:38:00.549  INFO 30024 --- [MessageThread_6] com.zang.rocket.service.ConsumerService  : 【读取消息服务】:事务消息!
2019-12-04 17:38:00.549  INFO 30024 --- [MessageThread_6] com.zang.rocket.cunsumer.SyncConsumer    : 【消费消息】 次数:0 ext :MessageExt [queueId=13, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1575452280541, bornHost=/192.168.10.139:60014, storeTimestamp=1575452280544, storeHost=/192.168.10.139:10911, msgId=C0A80A8B00002A9F0000000003C8A055, commitLogOffset=63479893, bodyCRC=880426184, reconsumeTimes=0, preparedTransactionOffset=63479560, toString()=Message{topic=‘springboot_topic‘, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=springboot_topic, MAX_OFFSET=1, TRAN_MSG=true, CONSUME_START_TIME=1575452280549, id=1bc5beda-d52a-9fa3-3847-e301a28095b8, UNIQ_KEY=C0A80A8B264018B4AAC2133BB6DD0008, WAIT=false, PGROUP=spring_boot_producer_group, timestamp=1575452280541, REAL_QID=13}, body=[-28, -70, -117, -27, -118, -95, -26, -74, -120, -26, -127, -81, 33], transactionId=‘C0A80A8B264018B4AAC2133BB6DD0008‘}]

  

 

原文地址:https://www.cnblogs.com/meijsuger/p/11984597.html

时间: 2024-10-03 09:18:00

RocketMQ 整合SpringBoot发送事务消息的相关文章

Apache RocketMQ 正式开源分布式事务消息

近日,Apache RocketMQ 社区正式发布4.3版本.此次发布不仅包括提升性能,减少内存使用等原有特性增强,还修复了部分社区提出的若干问题,更重要的是该版本开源了社区最为关心的分布式事务消息,而且实现了对外部组件的零依赖.接下来,本文将详细探秘RocketMQ事务消息的设计原理以及实现机制. 一.需求缘起 在微服务架构中,随着服务的逐步拆分,数据库私有已经成为共识,这也导致所面临的分布式事务问题成为微服务落地过程中一个非常难以逾越的障碍,但是目前尚没有一个完整通用的解决方案. 其实不仅仅

RocketMQ实现事务消息方案

RocketMQ 是一个来自阿里巴巴的分布式消息中间件,于 2012 年开源,并在 2017 年正式成为 Apache 顶级项目.据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在 RocketMQ 之上,并且最近几年的双十一大促中,RocketMQ 都有抢眼表现.Apache RocketMQ 4.3之后的版本正式支持事务消息,为分布式事务实现提供了便利性支持. RocketMQ 事务消息设计则主要是为了解决 Producer 端的消息发送与本地事务执行的原子性

消息中间件 RocketMQ源码解析:事务消息

关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址 您对于源码的疑问每条留言都将得到认真回复.甚至不知道如何读源码也可以请教噢. 新的源码解析文章实时收到通知.每周更新一篇左右. 1. 概述 2. 事务消息发送 2.1 Producer 发送事务消息 2.2 Broker 处理结束事务请求 2.3 Broker 生成

RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态为UN_KNOW时,第二次提交到服务器时将不会做任何操作,也就是说此时消息还存在与RMQ_SYS_TRANS_HALF_TOPIC主题中,并不能被消息消费者消费,那这些消息最终如何被提交或回滚呢? 原来RocketMQ使用TransactionalMessa

RocketMQ事务消息学习及刨坑过程

一.背景 MQ组件是系统架构里必不可少的一门利器,设计层面可以降低系统耦合度,高并发场景又可以起到削峰填谷的作用,从单体应用到集群部署方案,再到现在的微服务架构,MQ凭借其优秀的性能和高可靠性,得到了广泛的认可. 随着数据量增多,系统压力变大,开始出现这种现象:数据库已经更新了,但消息没发出来,或者消息先发了,但后来数据库更新失败了,结果研发童鞋各种数据修复,这种生产问题出现的概率不大,但让人很郁闷.这个其实就是数据库事务与MQ消息的一致性问题,简单来讲,数据库的事务跟普通MQ消息发送无法直接绑

利用事务消息实现分布式事务

举一个电商的例子,用户在购物车中付款,会调用一个服务生成一条订单,并调用另一个服务将该商品从购物车中删除.这两个操作应该是原子性的,要么都成功,要么都失败,这就是事务要解决的问题.我们下面来谈谈事务的概念.分布式事务的复杂点和实现方式. 一. 事务 事务的四大特性ACID:原子性.一致性.隔离性.持久性 1. 原子性 原子性指一个事务的操作是不可分割的,要么成功要么失败,不会存在成功一半的情况. 2. 一致性 一致性指一个事务在执行完成的时间点前读取到的一定是更新前的状态,执行完成后读取到的一定

SpringBoot整合RabbitMQ之发送接收消息实战

实战前言 前几篇文章中,我们介绍了SpringBoot整合RabbitMQ的配置以及实战了Spring的事件驱动模型,这两篇文章对于我们后续实战RabbitMQ其他知识要点将起到奠基的作用的.特别是Spring的事件驱动模型,当我们全篇实战完毕RabbitMQ并大概了解一下RabbitMQ相关组件的源码时,会发现其中的ApplicationEvent.ApplicationListener.ApplicationEventPublisher跟RabbitMQ的Message.Listener.R

springboot项目整合rabbitMq涉及消息的发送确认,消息的消费确认机制

1.引入maven依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>2.在application.yml的配置: spring: rabbitmq: host: 106.52.82.241 port: 5672 username: yang

RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想

RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理.首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中. 官方版本未发布之前,从apache rocketmq第一个版本上线后,代码中存在与事务消息相关的代码,例如COMMIT.ROLLBACK.PREPARED,在事务消息未开源之前网上对于事务消息的"声音"基本上是使用类似二阶段提交,主要是根据消息系统标志MessageSysFlag中定义来推测的: TRANSACTION_P