RocketMQ解决分布式事务

1.原理图:

2.设计实现思路:

1.生产者(发送方)投递事务消息到Broker中,设置该消息为半消息 不可以被消费;

2.开始执行我们的本地事务,本地事务执行的结果(回滚或者提交)发送Broker;

3.Broker获取回滚或者提交,如果是回滚的情况则删除该消息、如果是提交的话,该消息就可以被消费者消费;

4.Broker如果没有及时的获取发送方本地事务结果的话,会主动查询本地事务结果

核心代码发送方

 1 @RestController
 2 public class ProducerController {
 3     @Autowired
 4     private OrderService orderService;
 5
 6     @RequestMapping("/sendMsg")
 7     public String sendMsg() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
 8         String orderId = orderService.sendOrder();
 9         return orderId;
10
11     }
12 }
 1 @Slf4j
 2 @Component
 3 @RocketMQTransactionListener(txProducerGroup = "mayiktProducer")
 4 public class SyncProducerListener implements RocketMQLocalTransactionListener {
 5
 6     @Autowired
 7     private TransationalUtils transationalUtils;
 8
 9     @Autowired
10     private OrderMapper orderMapper;
11
12     @Override
13     public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
14         TransactionStatus beginStatus = null;
15         try {
16             beginStatus = transationalUtils.begin();
17             MessageHeaders headers = message.getHeaders();
18             String objMsg = (String) headers.get("msg");
19             if (StringUtils.isEmpty(objMsg)) {
20                 return RocketMQLocalTransactionState.ROLLBACK;
21             }
22             OrderEntity orderEntity = JSONObject.parseObject(objMsg, OrderEntity.class);
23             int result = orderMapper.addOrder(orderEntity);
24             if (result > 0) {
25                 transationalUtils.commit(beginStatus);
26             }
27             log.info("【本地业务执行完毕】 msg:{}, Object:{}", message, o);
28             return null;
29         } catch (Exception e) {
30             e.printStackTrace();
31             log.error("【执行本地业务异常】 exception message:{}", e.getMessage());
32             if (beginStatus != null) {
33                 transationalUtils.rollback(beginStatus);
34             }
35             return RocketMQLocalTransactionState.ROLLBACK;
36         }
37     }
38
39     @Override
40     public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
41         log.info("【执行检查任务】");
42         MessageHeaders headers = message.getHeaders();
43         String objMsg = (String) headers.get("msg");
44         if (StringUtils.isEmpty(objMsg)) {
45             return RocketMQLocalTransactionState.UNKNOWN;
46         }
47         OrderEntity orderEntity = JSONObject.parseObject(objMsg, OrderEntity.class);
48         String orderId = orderEntity.getOrderId();
49         OrderEntity orderDbEntity = orderMapper.findOrderId(orderId);
50         if (orderDbEntity == null) {
51             return RocketMQLocalTransactionState.UNKNOWN;
52         }
53         return RocketMQLocalTransactionState.COMMIT;
54     }
55 }

消费者

 1 @Service
 2 @RocketMQMessageListener(topic = "orderTopic", consumerGroup = "mayiktTopic")
 3 public class OrdeConsumer implements RocketMQListener<String> {
 4     @Autowired
 5     private DispatchMapper dispatchMapper;
 6
 7     @Override
 8     public void onMessage(String msg) {
 9         OrderEntity orderEntity = JSONObject.parseObject(msg, OrderEntity.class);
10         System.out.println(orderEntity.toString());
11     }
12
13 }

手动事务

 1 @Service
 2 public class TransationalUtils {
 3     @Autowired
 4     public DataSourceTransactionManager transactionManager;
 5
 6     public TransactionStatus begin() {
 7         TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionAttribute());
 8         return transaction;
 9     }
10
11     public void commit(TransactionStatus transaction) {
12         transactionManager.commit(transaction);
13
14     }
15
16     public void rollback(TransactionStatus transaction) {
17         transactionManager.rollback(transaction);
18     }
19
20 }

注:该代码来源于蚂蚁课堂(www.mayikt.com),于本人学习使用。

原文地址:https://www.cnblogs.com/lmyupupblogs/p/12143386.html

时间: 2024-11-08 01:33:41

RocketMQ解决分布式事务的相关文章

搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务

搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务 初步认识RocketMQ的核心模块 rocketmq模块 rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息. rocketmq-client:提供发送.接受消息的客户端API. rocketmq-namesrv:NameServer,类似于Zookeeper,这里保存着消息的TopicName,队列等运行时的元信息.(有点NameNode的味道) rocketm

使用kafka消息队列解决分布式事务

微服务框架Spring Cloud介绍 Part1: 使用事件和消息队列实现分布式事务 本文转自:http://skaka.me/blog/2016/04/21/springcloud1/ 不同于单一架构应用(Monolith), 分布式环境下, 进行事务操作将变得困难, 因为分布式环境通常会有多个数据源, 只用本地数据库事务难以保证多个数据源数据的一致性. 这种情况下, 可以使用两阶段或者三阶段提交协议来完成分布式事务.但是使用这种方式一般来说性能较差, 因为事务管理器需要在多个数据源之间进行

构建基于RocketMQ的分布式事务服务

说在前面 Apache RocketMQ-4.3.0正式Release了事务消息的特性,顺着最近的这个热点.第一篇文章,就来聊一下在软件工程学上的长久的难题--分布式事务(Distributed Transaction). 这个技术也在各个诸如阿里,腾讯等大厂的内部,被广泛地实现,利用及优化.但是由于理论上就有难点,所以分布式事务就隐晦得成了大厂对于小厂的技术壁垒.相信来看这篇文章的同学,一定都听过很多关于分布式事务的术语,比较二阶段提交,TCC,最终一致性等,所以这里也不多普及概念. 基于Ro

【分布式事务】使用atomikos+jta解决分布式事务问题

一.前言 分布式事务,这个问题困惑了小编很久,在3个月之前,就间断性的研究分布式事务.从MQ方面,数据库事务方面,jta方面.近期终于成功了,使用JTA解决了分布式事务问题.先写一下心得,后面的二级提交也会在研究. 二.介绍 分布式事务 说到分布式事务,可以理解为,由于分布式而引起的事务不一致的问题.随着项目做大,模块拆分,数据库拆分.一次包含增删改操作数据库涉及到了更新两个不同物理节点的数据库,这样的数据库事务只能保证自己处理的部分的事务,但是整个的事务就不能保证一致性. 网上针对分布式事务常

RabbitMQ解决分布式事务

案例:经典案例,以目前流行点外卖的案例,用户下单后,调用订单服务,让后订单服务调用派单系统通知送外卖人员送单,这时候订单系统与派单系统采用MQ异步通讯. RabbitMQ解决分布式事务原理: 采用最终一致性原理.需要保证以下三要素1.确认生产者一定要将数据投递到MQ服务器中(采用MQ消息确认机制)2.MQ消费者消息能够正确消费消息,采用手动ACK模式,使用不补偿机制(注意重试幂等性问题)3.如何保证第一个事务先执行,采用补偿机制(补单机制),在创建一个补单消费者进行监听,如果订单没有创建成功,进

一文教你迅速解决分布式事务 XA 一致性问题

欢迎大家前往腾讯云技术社区,获取更多腾讯海量技术实践干货哦~ 作者:腾讯云数据库团队 近日,腾讯云发布了分布式数据库解决方案(DCDB),其最明显的特性之一就是提供了高于开源分布式事务XA的性能.大型业务系统有着用户多.并发高的特点,在这方面,集中式数据库(单机数据库)的性能很难支持,因此主流的互联网公司往往采用分布式(架构)数据库,物理上利用更多的低端设备,逻辑上对大表水平拆分支撑业务的需要. 虽然分布式数据库能解决性能难题,但事务一致性(Consistency)的问题,却很难在分布式数据库上

Spring Boot微服务如何集成fescar解决分布式事务?

什么是fescar? 关于fescar的详细介绍,请参阅fescar wiki. 传统的2PC提交协议,会持有一个全局性的锁,所有局部事务预提交成功后一起提交,或有一个局部事务预提交失败后一起回滚,最后释放全局锁.锁持有的时间较长,会对并发造成较大的影响,死锁的风险也较高. fescar的创新之处在于,每个局部事务执行完立即提交,释放本地锁:它会去解析你代码中的sql,从数据库中获得事务提交前的事务资源即数据,存放到undo_log中,全局事务协调器在回滚的时候直接使用undo_log中的数据覆

Spring Boot 集成 Seata 解决分布式事务问题

文章首发于公众号<程序员果果>地址 : https://mp.weixin.qq.com/s/aDhGG3Y2t4lPYetK01Wmxg seata 简介 Seata 是 阿里巴巴2019年开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务.在 Seata 开源之前,Seata 对应的内部版本在阿里内部一直扮演着分布式一致性中间件的角色,帮助阿里度过历年的双11,对各业务进行了有力的支撑.经过多年沉淀与积累,2019.1 Seata 正式宣布对外开源 .目前

【故障处理】分布式事务ORA-01591错误解决

[故障处理]分布式事务ORA-01591错误解决 1  BLOG文档结构图       2  前言部分 2.1  导读和注意事项 各位技术爱好者,看完本文后,你可以掌握如下的技能,也可以学到一些其它你所不知道的知识,~O(∩_∩)O~: ① 分布式事务的简单概念         ② ORA-01591错误解决   Tips: ① 本文在ITpub(http://blog.itpub.net/26736162).博客园(http://www.cnblogs.com/lhrbest)和微信公众号(x