RocketMQ-事务消费

理论部分在https://www.jianshu.com/p/453c6e7ff81c中的 “三、事务消息”。下面从代码层面看一下rockemq的事务消息

一、事务消费端。

  从代码中看到跟其他模式的消费端没有什么两样。

package org.hope.lee.consumer.transaction;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;

public class ConsumerTransaction {
    public ConsumerTransaction() {
        String group_name = "transaction_consumer";
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
        consumer.setNamesrvAddr("192.168.31.xxx:9876;192.168.31.xxx:9876");
        try {
            consumer.subscribe("TopicTransaction", "*");
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

    }

    class Listener implements MessageListenerConcurrently{

        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

                try {
                    for(MessageExt msg : list) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:" + "topic:" + topic + ", tags:" + tags + ",msg:" + msgBody);
                        msg.getTags();
                    }
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
    public static void main(String[] args) {
        ConsumerTransaction c = new ConsumerTransaction();
        System.out.println("transaction consumer start......");
    }
}

二、本地事务的执行器,实现 LocalTransactionExecuter。

package org.hope.lee.producer.transaction;

import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.common.message.Message;

import java.util.concurrent.atomic.AtomicInteger;

public class TransactionExecuterImpl implements LocalTransactionExecuter{
    private AtomicInteger transactionIndex = new AtomicInteger(1);
    public LocalTransactionState executeLocalTransactionBranch(Message message, Object o) {
        System.out.println("msg = " + new String(message.getBody()));
        System.out.println("o = " + o);
        String tag = message.getTags();
        if(tag.equals("Transaction3")) {
            //这里有一个分阶段提交任务的概念
            System.out.println("这里处理业务逻辑,比如操作数据库,失败情况下进行ROLLBACK");

            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return LocalTransactionState.COMMIT_MESSAGE;
//        return LocalTransactionState.ROLLBACK_MESSAGE;
//        return LocalTransactionState.COMMIT_MESSAGE.UNKNOW;
    }
}

三、事务Producer端

  在这里可以看到我们用了new TransactionMQProducer()。并且在发送消息的时候添加了事务执行器producer.sendMessageInTransaction(msg, transactionExecuter, "tq")

package org.hope.lee.producer.transaction;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.*;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.concurrent.TimeUnit;

public class ProducerTransaction {
    public static void main(String[] args) throws MQClientException {
        String group_name = "transaction_producer";
        //这里使用TransactionMQProducer
        final TransactionMQProducer producer = new TransactionMQProducer(group_name);
        producer.setNamesrvAddr("192.168.31.165:9876;192.168.31.176:9876");
        //事务最小并法数
        producer.setCheckThreadPoolMinSize(5);
        //事务最大并发数
        producer.setCheckThreadPoolMaxSize(20);
        //队列数
        producer.setCheckRequestHoldMax(2000);
        /**
         * Producer对象在使用之前必须要调用start()初始化,初始化一次即可
         * 注意:切记不可以在每次发送消息时,都调用start()
         */
        producer.start();
        //服务器回调Producer,检查本地事务分支成功还是失败
        //rocketmq会定时的调用这个checklistener,
        //在这里,我们可以根据由MQ回传的key去数据库查询,
        //判断这条数据到底是成功了还是失败了。
        //就是在这个定时check,rocketmq把这个功能在开源的代码中去除掉了。
        producer.setTransactionCheckListener(new TransactionCheckListener() {
            public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {
                System.out.println("key: " + messageExt.getKeys());
                System.out.println("state--" + new String(messageExt.getBody()));
                // return LocalTransactionState.ROLLBACK_MESSAGE;
                return LocalTransactionState.COMMIT_MESSAGE;
                // return LocalTransactionState.UNKNOW;
            }
        });
        /**
         * 下面这段代码表明一个Producer对象可以发送多个topic, 多个tag的消息
         * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态
         * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用
         * 如果对消息可靠性要求很高需要对这种情况做处理。另外,消息可能会存在发送失败的情况
         * 失败重试由应用来处理
         */
        TransactionExecuterImpl transactionExecuter = new TransactionExecuterImpl();
        for(int i = 1; i <= 3; i++) {
            Message msg = new Message("TopicTransaction", "Transaction" + i, "key",
                    ("Hello Rocket" + i).getBytes());
            SendResult result = producer.sendMessageInTransaction(msg, transactionExecuter, "tq");
            System.out.println(result);

            try {
                TimeUnit.MICROSECONDS.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        /**
         * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
         * 注意:我们建议应用在JBOSS,Tomcat等容器的退出钩子里调用shutdown 方法
         */
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                producer.shutdown();
            }
        }));

        System.exit(0);
    }
}

https://gitee.com/huayicompany/RocketMQ-learn/tree/master/rocketmq-api

实验步骤:

1、先运行Consumer端

2、运行Producer端,然后看Consumer端和Produer端的控制台输出。

Consumer端Console

收到消息:topic:TopicTransaction, tags:Transaction1,msg:Hello Rocket1
收到消息:topic:TopicTransaction, tags:Transaction2,msg:Hello Rocket2

Producer端Console

msg = Hello Rocket1
o = tq
SendResult [sendStatus=SEND_OK, msgId=C0A81FB000002A9F0000000000003CDC, messageQueue=MessageQueue [topic=TopicTransaction, brokerName=broker-a, queueId=0], queueOffset=0]
msg = Hello Rocket2
o = tq
SendResult [sendStatus=SEND_OK, msgId=C0A81FB000002A9F0000000000003DA3, messageQueue=MessageQueue [topic=TopicTransaction, brokerName=broker-a, queueId=1], queueOffset=0]
msg = Hello Rocket3
o = tq
这里处理业务逻辑,比如操作数据库,失败情况下进行ROLLBACK
SendResult [sendStatus=SEND_OK, msgId=C0A81FB000002A9F0000000000003FF8, messageQueue=MessageQueue [topic=TopicTransaction, brokerName=broker-a, queueId=2], queueOffset=0]


从输出结果来看,第三条消息并没有被Consumer端消费,被回滚了。

原文地址:https://www.cnblogs.com/happyflyingpig/p/8283525.html

时间: 2024-10-21 01:52:16

RocketMQ-事务消费的相关文章

RocketMQ事务消费和顺序消费详解

一.RocketMq有3中消息类型 1.普通消费 2. 顺序消费 3.事务消费 顺序消费场景 在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一.创建订单 ,第二:订单付款,第三:订单完成.也就是这个三个环节要有顺序,这个订单才有意义.RocketMQ可以保证顺序消费. rocketMq实现顺序消费的原理 produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息

RocketMQ的顺序消费和事务消费

一.三种消费 :1.普通消费 2. 顺序消费 3.事务消费 1.1  顺序消费:在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一.创建订单 ,第二:订单付款,第三:订单完成.也就是这个三个环节要有顺序,这个订单才有意义.RocketMQ可以保证顺序消费,他的实现是生产者(一个生产者可以对多个主题去发送消息)将这个三个消息放在topic(一个topic默认有4个队列)的一个队列里面,单机支持上万个持久化队列,消费端去消费的时候也是只能有一个Consumer去取得这个队列里面的数据,然后

RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想

RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理.首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中. 官方版本未发布之前,从apache rocketmq第一个版本上线后,代码中存在与事务消息相关的代码,例如COMMIT.ROLLBACK.PREPARED,在事务消息未开源之前网上对于事务消息的"声音"基本上是使用类似二阶段提交,主要是根据消息系统标志MessageSysFlag中定义来推测的: TRANSACTION_P

RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚)

本文将重点分析RocketMQ Broker如何处理事务消息提交.回滚命令,根据前面的介绍,其入口EndTransactionProcessor#proce***equest: OperationResult result = new OperationResult();if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // @1result = this.brokerCont

搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务

搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务 初步认识RocketMQ的核心模块 rocketmq模块 rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息. rocketmq-client:提供发送.接受消息的客户端API. rocketmq-namesrv:NameServer,类似于Zookeeper,这里保存着消息的TopicName,队列等运行时的元信息.(有点NameNode的味道) rocketm

RocketMQ 事务消息

RocketMQ 事务消息在实现上充分利用了 RocketMQ 本身机制,在实现零依赖的基础上,同样实现了高性能.可扩展.全异步等一系列特性. 在具体实现上,RocketMQ 通过使用 Half Topic 以及 Operation Topic 两个内部队列来存储事务消息推进状态,如下图所示: 其中,Half Topic 对应队列中存放着 prepare 消息,Operation Topic 对应的队列则存放了 prepare message 对应的 commit/rollback 消息,消息体

RocketMQ事务消息学习及刨坑过程

一.背景 MQ组件是系统架构里必不可少的一门利器,设计层面可以降低系统耦合度,高并发场景又可以起到削峰填谷的作用,从单体应用到集群部署方案,再到现在的微服务架构,MQ凭借其优秀的性能和高可靠性,得到了广泛的认可. 随着数据量增多,系统压力变大,开始出现这种现象:数据库已经更新了,但消息没发出来,或者消息先发了,但后来数据库更新失败了,结果研发童鞋各种数据修复,这种生产问题出现的概率不大,但让人很郁闷.这个其实就是数据库事务与MQ消息的一致性问题,简单来讲,数据库的事务跟普通MQ消息发送无法直接绑

51.RocketMQ 顺序消费

3种不同模式的Producer NormalProducer(普通) OrderProducer(顺序) TransactionProducer(事务) 生产者 1 /** 2 * Copyright (C) 2010-2013 Alibaba Group Holding Limited 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this fil

RocketMQ 顺序消费只消费一次 坑

rocketMq实现顺序消费的原理 produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息 注意:是把把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue 单个节点(Producer端1个.Consumer端1个) 1.Producer.java  package order; import java.util.List;

RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态为UN_KNOW时,第二次提交到服务器时将不会做任何操作,也就是说此时消息还存在与RMQ_SYS_TRANS_HALF_TOPIC主题中,并不能被消息消费者消费,那这些消息最终如何被提交或回滚呢? 原来RocketMQ使用TransactionalMessa