还不知道事务消息吗?这篇文章带你全面扫盲!

在分布式系统中,为了保证数据一致性是必须使用分布式事务。分布式事务实现方式就很多种,今天主要介绍一下使用 RocketMQ 事务消息,实现分布事务。

文末有彩蛋,看完再走

为什么需要事务消息?

很多同学可能不知道事务消息是什么,没关系,举一个真实业务场景,先来带你了解一下普通的消息存在问题。

上面业务场景中,当用户支付成功,将会更新支付订单,然后发送 MQ 消息。手续费系统将会通过拉取消息,计算手续费然后保存到另外一个手续费数据库中。

由于计算手续费这个步骤可以离线计算,所以这里采用 MQ 解耦支付与计算手续费的流程。

流程主要涉及三个步骤:

  • 更新订单数据
  • 发送消息给 MQ
  • 手续费系统拉取消息

上面提到的步骤,任何一个都会失败,如果我们没有处理,就会使两边数据不一致,将会造成下面两种情况:

  • 订单数据更新了,手续费数据没有生成
  • 手续费数据生成,订单数据却没有更新

这可是涉及到真正的钱,一旦少计算,就会造成资损,真的赔不起!

对于最后一步来讲,比较简单。如果消费消息失败,只要没有提交消息确认,MQ 服务端将会自动重试。

最大的问题在于我们无法保证更新操作与发送消息一致性。无论我们采用先更新订单数据,再发送消息,还是先发送消息,再更新订单数据,都在存在一个成功,一个失败的可能。

如下所示,采用先发送消息,然后再更新数据库的方式。

上面流程消息发送成功之后,再进行本地事务的提交。这个流程看起来很完美,但是想象一下,如果在提交事务时数据库执行失败,导致事务回滚了。

然而此时消息已经发送出去,无法撤回。这就导致手续费系统紧接会消费消息,计算手续费并更新到数据库中。这就造成支付数据未更新,手续费系统却生成的不一致的情况。

那如果我们流程反一下,是不是就好了呢?

我们使用下面的伪码表示:

// 开始事务
try {
    // 1.执行数据库操作
    // 2.提交事务
}catch (Exception e){
    // 3.回滚事务
}
// 4.发送 mq 消息

这里如果事务提交成功,但是 mq 消息发送失败,就会导致支付数据更新但是手续费数据未生成的的不一致情况。

这里有的同学可能会想到,将发送 mq 消息步骤移动到事务中,消息发送失败,回滚事务,不就完美了吗?

伪码如下:

// 开始事务
try {
  // 1.执行数据库操作
  // 2.发送 mq 消息
  // 3.提交事务
}catch (Exception e){
  // 4.回滚事务
}

上面代码看起来确实没什么问题,消息发送失败,回滚事务。

但是实际上第二步有可能存在消息已经发送到 MQ 服务端,但是由于网络问题未及时收到 MQ 的响应消息,从而导致消息发送端认为消息消息发送失败。

这就会导致订单事务回滚了,但是手续费系统却能消费消息,两边数据库又不一致了。

熟悉 MQ 的同学,可能会想到,消息发送失败,可以重试啊。

是的,我们可以增加重试次数,重新发送消息。但是这里我们需要注意,由于消息发送耦合在事务中,过多的重试会拉长数据库事务执行时间,事务处理时间过长,导致事务中锁的持有时间变长,影响整体的数据库吞吐量。

实际业务中,不太建议将消息发送耦合在数据库事务中。

事务消息

事务消息是 RocketMQ 提供的事务功能,可以实现分布式事务,从而保证上面事务操作与消息发送要么都成功,要么都失败。

使用事务消息,整体流程如下:

首先我们将会发送一个半(half) 消息到 MQ 中,通知其开启一个事务。这里半消息并不是说消息内容不完整,实际上它包含所有完整的消息内容。

这个半消息与普通的消息唯一的区别在于,在事物提交之前,这个消息对消费者来说是不可见的,消费者不会消费这个消息。

一旦半消息发送成功,我们就可以执行数据库事务。然后根据事务的执行结果再决定提交或回滚事务消息。

如果事务提交成功,将会发送确认消息至 MQ,手续费系统就可以成功消费到这条消息。

如果事务被回滚,将会发送回滚通知至 MQ,然后 MQ 将会删除这条消息。对于手续费系统来说,都不会知道这条消息的存在。

这就解决了要么都成功,要么都失败的一致性要求。

实际上面的流程还是存在问题,如果我们提交/回滚事务消息失败怎么办?

对于这个问题,RocketMQ 给出一种事务反查的机制。我们需要需要注册一个回调接口,用于反查本地事务状态。

RocketMQ 若未收到提交或回滚的请求,将会定期去反查回调接口,然后可以根据反查结果决定回滚还是提交事务。

RocketMQ 事务消息流程整体如下:

事务消息示例代码如下:

public class TransactionMQProducerExample {
    public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
        TransactionMQProducer producer = new TransactionMQProducer("test_transaction_producer");
        // 不定义将会使用默认的
        ExecutorService executorService =
                new ThreadPoolExecutor(2, 5, 100,
                        TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000),
                                       new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName("client-transaction-msg-check-thread");
                        return thread;
                    }
                });
        producer.setExecutorService(executorService);
        TransactionListener transactionListener = new TransactionListenerImpl();
        producer.setTransactionListener(transactionListener);
        // 改成自己的地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        Order order = new Order("66666", "books");

        Message msg =
                new Message("transaction_tp",
                        JSON.toJSONString(order).getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 发送半消息
        SendResult sendResult = producer.sendMessageInTransaction(msg, null);
        System.out.println(sendResult.getSendStatus());
        producer.shutdown();
    }

    public static class TransactionListenerImpl implements TransactionListener {

        /**
         * 半消息发送成功将会自动执行该逻辑
         *
         * @param msg
         * @param arg
         * @return
         */
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // 执行本地事务
            Order order = null;
            try {
                order = JSON.parseObject(new String(msg.getBody(),
                        RemotingHelper.DEFAULT_CHARSET), Order.class);
                boolean isSuccess = updateOrder(order);
                if (isSuccess) {
                    // 本地事务执行成功,提交半消息
                    System.out.println("本地事务执行成功,提交事务事务消息");
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else {
                    // 本地事务执行成功,回滚半消息
                    System.out.println("本地事务执行失败,回滚事务消息");
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            } catch (Exception e) {
                System.out.println("本地事务执行异常");
            }
            // 异常情况返回未知状态
            return LocalTransactionState.UNKNOW;
        }

        /**
         * 更新订单
         * 这里模拟数据库更新,返回事务执行成功
         *
         * @param order
         * @return
         */
        private boolean updateOrder(Order order) throws InterruptedException {
            TimeUnit.SECONDS.sleep(1);
            return true;
        }

        /***
         * 若提交/回滚事务消息失败,rocketmq 自动反查事务状态
         * @param msg
         * @return
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            try {
                Order order = JSON.parseObject(new String(msg.getBody(),
                        RemotingHelper.DEFAULT_CHARSET), Order.class);
                boolean isSuccess = queryOrder(order.getOrderId());
                if (isSuccess) {
                    // 本地事务执行成功,提交半消息
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else {
                    // 本地事务执行成功,回滚半消息
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }

            } catch (Exception e) {
                System.out.println("查询失败");
            }
            // 异常情况返回未知状态
            return LocalTransactionState.UNKNOW;
        }

        /**
         * 查询订单状态
         * 模拟返回查询成功
         *
         * @param orderId
         * @return
         */
        private boolean queryOrder(String orderId) throws InterruptedException {
            TimeUnit.SECONDS.sleep(1);
            return true;
        }
    }

    @Data
    public static class Order {
        private String orderId;

        private String goods;

        public Order(String orderId, String goods) {
            this.orderId = orderId;
            this.goods = goods;
        }
    }
}

上面代码中:

  1. 我们需要为生产者指定一个唯一ProducerGroup
  2. 需要继承 TransactionListener 注解回调接口,其中 executeLocalTransaction 方法执行本地事务,checkLocalTranscation 用来执行检查本地事务。
  3. 返回事务状态有三种:
    • LocalTransactionState.UNKNOW 中间状态,RocketMQ 将会反查
    • LocalTransactionState.COMMIT_MESSAGE 提交事务,消息这后续将会消费这条消息
    • LocalTransactionState.ROLLBACK_MESSAGE,回滚事务,RocketMQ 将会删除这条消息

事务消息使用注意点

事务消息最大反查次数

由于单个消息反查次数过多,将会导致半消息队列堆积,影响性能。 RocketMQ 默认将单个消息的检查次数限制为 15 次。

我们可以通过修改 broker 配置文件,增加如下配置:

# N 为最大检查次数
transactionCheckMax=N

当检查次数超过最大次数后,RocketMQ 将会丢弃消息并且打印错误日志。

若想自定义丢弃消息行为,需要修改 RocketMQ broker 端代码,继承 AbstractTransactionalMessageCheckListener 重写 resolveDiscardMsg 方法,加入自定义逻辑。

同步的双重写入机制

为了确保事务消息不丢失,并且保证事务完整性,需要将事务消息复制到集群其他节点,建议使用同步双重写入机制。

事务反查时间设置

我们可以设置以下参数,设置 MQ 服务端多久之后开始反查事务消息(自事务消息保存成功之后开始计算)。

msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "10");

或者我们可以在 broker.conf 设置以下参数:

# 单位为 ms,默认为 6 s
transactionTimeout=60000

发送端主动设置配置参数优先级大于 broker 端配置。

另外 RocketMQ 还有一个配置用于控制事务性消息检查间隔:

## 默认为 60s
transactionCheckInterval=5000

如果自定义配置如上,事务消息检查间隔为 5 秒,事务消息设置检查时间为 60 s。

这就代表 broker 每隔 5s 检查一次事务消息,如果此时事务消息到 MQ 服务端时间还未超过 60s,此时将不会反查,直到时间大于 60s。

彩蛋

查找事务消息资料的时候,发现 RocketMQ 文档存在相关错误。

文档地址:https://github.com/9526xu/rocketmq/blob/master/docs/cn/RocketMQ_Example.md

如上两处实际是错误的,应该修改为:AbstractTransactionalMessageCheckListenertransactionTimeout

issue 地址:https://github.com/apache/rocketmq/issues/481

顺手修改了一下,提交 PR 。哈哈,也为开源项目贡献了一份力量。

Reference

  1. https://github.com/apache/rocketmq/issues/481
  2. https://github.com/9526xu/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
  3. 极客时间-消息队列高手课

最后说一句(求关注)

以前总以为参加开源项目很难,直到最近接连参与两次开源项目修改,才发现其实并没有想象中那么难。由于版本变更,开源项目文档有些是存在错误的,如果我们看到了,顺手修复一下,这也是为开源项目贡献一份力。

才疏学浅,难免会有纰漏,如果你发现了错误的地方,还请你留言给我指出来,我对其加以修改。

再次感谢您的阅读,我是楼下小黑哥,一位还未秃头的工具猿,下篇文章我们再见~

欢迎关注我的公众号:程序通事,获得日常干货推送。如果您对我的专题内容感兴趣,也可以关注我的博客:studyidea.cn

原文地址:https://www.cnblogs.com/goodAndyxublog/p/12596402.html

时间: 2024-08-28 20:23:46

还不知道事务消息吗?这篇文章带你全面扫盲!的相关文章

两篇文章带你走入.NET Core 世界:Kestrel+Nginx+Supervisor 部署上云服务器(二)

背景: 上一篇:两篇文章带你走入.NET Core 世界:CentOS+Kestrel+Ngnix 虚拟机先走一遍(一) 已经交待了背景,这篇就省下背景了,这是第二篇文章了,看完就木有下篇了. 直接进入主题: 1.购买云服务器 之前在虚拟机跑了一下,感觉还不够真实,于是,准备买台服务器,认真的跑一下. 有阿里云,腾讯云,华为云,还有好多云,去哪买一个? 之前做为华为云的云享专家去参加了一下活动,本来也准备写篇文章,不过相同游记文太多, 这里就转一篇了:让华为云MVP告诉你——在华为的一天可以做什

三篇文章带你极速入门php(三)之php原生实现登陆注册

看下成果 ps:纯天然h5,绝不添加任何添加剂(css)以及化学成分(js)(<( ̄ ﹌  ̄)我就是喜欢纯天然,不接受任何反驳) 关于本文 用原生的php和html做了一个登陆注册,大概是可以窥见一般php开发的样子了.不过,low的地方区别提前说一下: 这个是多入口,一般程序都是单入口,单入口就是统一通过index.php进入,然后再引入其他文件,调用其代码,多入口就是每次通过不同文件进入(比如一会展示的login.php和register.php) 保留登陆信息用的是session,现在普遍

三篇文章带你极速入门php(一)之语法

本文适合阅读用户 有其他语言基础的童鞋 看完w3cschool语法教程来回顾一下的童鞋(传送门,想全面看一下php语法推荐这里) 毫无基础然而天资聪慧颇有慧根(不要左顾右看说的就是你,老夫这里有一本<php从入门到放弃>,观你根骨清奇10两银子卖给你如何) 看完本文后你会收获到什么 php的变量的定义,使用 函数的定义,使用,传递参数 数组的定义,调用,常用方法,使用场景 php中循环,判断,选择结构的语法 类的定义,成员变量和成员函数的定义和使用 相信我,认真看完本文,你就已经掌握了php常

一篇文章,带你明白什么是过拟合,欠拟合以及交叉验证

误差模型:过拟合,交叉验证,偏差-方差权衡 作者Natasha Latysheva;Charles Ravarani 发表于cambridgecoding 介绍 ??在本文中也许你会掌握机器学习中最核心的概念:偏差-方差权衡.其主要想法是,你想创建尽可能预测准确并且仍能适用于新数据的模型(这是泛化).危险的是,你可以轻松的在你制定的数据中创建过度拟合本地噪音的模型,这样的模型是无用的,并且导致弱泛化能力,因为噪声是随机的,故而在每个数据集中是不同的.从本质上讲,你希望创建仅捕获数据集中有用成份的

这篇文章带你彻底理解synchronized

本人免费整理了Java高级资料,涵盖了Java.Redis.MongoDB.MySQL.Zookeeper.Spring Cloud.Dubbo高并发分布式等教程,一共30G,需要自己领取.传送门:https://mp.weixin.qq.com/s/JzddfH-7yNudmkjT0IRL8Q 1. synchronized简介在学习知识前,我们先来看一个现象: public class SynchronizedDemo implements Runnable { private static

这篇文章来自我的微信朋友圈,并不特别好玩,但可以给创业者补点财务知识

来自我发布在微信朋友圈的文章和图片.就是这两天的事情. 我直接从朋友圈屏幕截图过来转到这里.来自我的真实事情,我觉得值得分享,增长一点财务知识. 我上研究生时已经上过财务课程,但现在遇到这个问题还得重新理解, 这篇文章来自我的微信朋友圈,并不特别好玩,但可以给创业者补点财务知识

还不知道JavaScript-DOM怎么弄?这篇文章不容错过

文档对象模型DOM,是一种基于树的API文档,它要求在处理过程中整个文档都表示在存储器中.另外一种简单的API是基于事件的SAX,它可以用于处理很大的XML文档,由于大,所以不适合全部放在存储器中处理.今天大家就跟随我的脚步,来详细了解一下关于DOM的那些事. DOM的增加 DOM操作中增指的是增加节点,分为两部分:创建节点和插入节点. 创建节点 创建节点中常用的API方法主要有: document.createElement():创建指定的HTML元素 document.createTextN

看了这篇文章你还不懂傅里叶变换,那我就真没办法呢!

首先,请允许小编带着崇高的敬意向牛逼的作者"韩昊"表示感谢.按照原文要求,转载注明出处.内容整理自知乎!发布仅为学习交流!版权归原作者所有!原文地址:https://zhuanlan.zhihu.com/p/19759362 ---好文开始了-- 我保证这篇文章和你以前看过的所有文章都不同,这是 2012 年还在果壳的时候写的,但是当时没有来得及写完就出国了--于是拖了两年,嗯,我是拖延症患者-- 这篇文章的核心思想就是:要让读者在不看任何数学公式的情况下理解傅里叶分析. 傅里叶分析不

实操 | 内存占用减少高达90%,还不用升级硬件?没错,这篇文章教你妙用Pandas轻松处理大规模数据

相比较于 Numpy,Pandas 使用一个二维的数据结构 DataFrame 来表示表格式的数据, 可以存储混合的数据结构,同时使用 NaN 来表示缺失的数据,而不用像 Numpy 一样要手工处理缺失的数据,并且 Pandas 使用轴标签来表示行和列. 通常用于处理小数据(小于 100Mb),而且对计算机的性能要求不高,但是当我们需要处理更大的数据时(100Mb到几千Gb),计算机性能就成了问题,如果配置过低就会导致更长的运行时间,甚至因为内存不足导致运行失败. 在处理大型数据集时(100Gb