RocketMQ(六)——Order Message(顺序消息)

生产者端消费者端运行效果补充
RocketMQ提供了3种模式的Producer:NormalProducer(普通)、OrderProducer(顺序)、TransactionProducer(事务),对应的分别是普通消息、顺序消息和事务消息。在前面的博客当中,涉及的都是NormalProducer,调用传统的send方法,消息是无序的。接下来,看看顺序消费。
模拟这样一个场景,如果一个用户完成一个订单需要3条消息,比如订单的创建、订单的支付、订单的发货,很显然,同一个用户的订单消息必须要顺序消费,但是不同用户之间的订单可以并行消费。

生产者端

看一下生产者端的代码:

DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
producer.setNamesrvAddr("192.168.99.9876");
producer.start();

String[] tags = new String[]{"createTag", "payTag", "sendTag"};

for (int orderId = 1; orderId <= 10; orderId++) {    //订单消息
    for (int type = 0; type < 3; type++) {            //每种订单分为:创建订单/支付订单/发货订单
        Message msg = new Message("OrderTopic",
                tag[type % tag.length],
                orderId + ":" + type,
                (orderId + ":" + type).getBytes()
        );
        SendResult sendResult = producer.send(msg, new MessageQueueSelector(){
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg){
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }
        }, orderId);
        System.out.println(sendResult);
    }
}

顺序消费-producer

注意:一个Message除了Topic/Tag外,还有Key的概念。
上图的send方法不同于以往,有一个MessageQueueSelector,将用于指定特定的消息发往特定的队列当中!

消费者端

看一下消费者端的代码:

consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        try {
            //模拟业务处理消息的时间
            Thread.sleep(new Random().nextInt(1000));
            System.out.println(new String(msgs.getBody(),"utf-8"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

顺序消费-consumer

注意:在以前普通消费消息时设置的回调是MessageListenerConcurrently,而顺序消费的回调设置是MessageListenerOrderly。

运行效果

当我们启动2个Consumer进行消费时,可以观察到:

多个消费者消费的结果
可以观察得到,虽然从全局上来看,消息的消费不是有序的,但是每一个订单下的3条消息是顺序消费的!
其实,如果需要保证消息的顺序消费,那么很简单,首先需要做到一组需要有序消费的消息发往同一个broker的同一个队列上!其次消费者端采用有序Listener即可。
补充
这里,RocketMQ底层是如何做到消息顺序消费的,看一看源码你就能大概了解到,至少来说,在多线程消费场景下,一个线程只去消费一个队列上的消息,那么自然就保证了消息消费的顺序性,同时也保证了多个线程之间的并发性。也就是说其实broker并不能完全保证消息的顺序消费,它仅仅能保证的消息的顺序发送而已!
关于多线程消费这块,RocketMQ早就替我们想好了,这样设置即可:

消费多线程设置

想一想,在ActiveMQ中,我们如果想实现并发消费的话,恐怕还得搞个线程池提交任务吧,RocketMQ让我们的工作变得简单!

我这儿整理了比较全面的JAVA相关的面试资料,

需要领取面试资料的同学,请加群:473984645

原文地址:https://www.cnblogs.com/1013wang/p/12200520.html

时间: 2024-12-17 15:37:32

RocketMQ(六)——Order Message(顺序消息)的相关文章

聊一聊顺序消息(RocketMQ顺序消息的实现机制)

当我们说顺序时,我们在说什么? 日常思维中,顺序大部分情况会和时间关联起来,即时间的先后表示事件的顺序关系. 比如事件A发生在下午3点一刻,而事件B发生在下午4点,那么我们认为事件A发生在事件B之前,他们的顺序关系为先A后B. 上面的例子之所以成立是因为他们有相同的参考系,即他们的时间是对应的同一个物理时钟的时间.如果A发生的时间是北京时间,而B依赖的时间是东京时间,那么先A后B的顺序关系还成立吗? 如果没有一个绝对的时间参考,那么A和B之间还有顺序吗,或者说怎么断定A和B的顺序? 显而易见的,

RocketMQ(七)——Transaction Message(事务消息)

分布式事务 通过MQ解决分布式事务的思路 1) 业务和消息生成耦合在一起 2) 业务和消息解耦 RocketMQ 中的事务消息 1) 目前RMQ3.2.6中事务消息的实现原理及存在的问题 2) 问题解决思路 本文介绍RocketMQ提供的第三种类型的消息——Transaction Message(事务消息).在说事务消息之前,我们先来说说分布式事务的那些事! 分布式事务 什么是分布式事务,我的理解是一半事务.怎么说,比如有2个异构系统,A异构系统要做T1,B异构系统要做T2,要么都成功,要么都失

RocketMQ事务消费和顺序消费详解

一.RocketMq有3中消息类型 1.普通消费 2. 顺序消费 3.事务消费 顺序消费场景 在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一.创建订单 ,第二:订单付款,第三:订单完成.也就是这个三个环节要有顺序,这个订单才有意义.RocketMQ可以保证顺序消费. rocketMq实现顺序消费的原理 produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息

Kafka、RabbitMQ、RocketMQ等消息中间件的对比 —— 消息发送性能和区别

Kafka.RabbitMQ.RocketMQ等消息中间件的对比 -- 消息发送性能和区别 那么,消息中间件性能究竟哪家强? 带着这个疑问,我们中间件测试组对常见的三类消息产品(Kafka.RabbitMQ.RocketMQ)做了性能比较. Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目.Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输.0.8版本开始支持复制,不支持事务,对消息的重复.丢失.错误没

Apache RocketMQ 正式开源分布式事务消息

近日,Apache RocketMQ 社区正式发布4.3版本.此次发布不仅包括提升性能,减少内存使用等原有特性增强,还修复了部分社区提出的若干问题,更重要的是该版本开源了社区最为关心的分布式事务消息,而且实现了对外部组件的零依赖.接下来,本文将详细探秘RocketMQ事务消息的设计原理以及实现机制. 一.需求缘起 在微服务架构中,随着服务的逐步拆分,数据库私有已经成为共识,这也导致所面临的分布式事务问题成为微服务落地过程中一个非常难以逾越的障碍,但是目前尚没有一个完整通用的解决方案. 其实不仅仅

rocketmq源码分析3-consumer消息获取

使用rocketmq的大体消息发送过程如下: 在前面已经分析过MQ的broker接收生产者客户端发过来的消息的过程,此文主要讲述订阅者获取消息的过程,或者说broker是怎样将消息传递给消费者客户端的,即上面时序图中拉取消息(pull message)动作.. 1. 如何找到入口(MQ-broker端) 分析一个机制或者功能时,我们首先希望的是找到入口,前一篇我们是通过端口号方式顺藤摸瓜的方式找到了入口.但是此篇略微不同,涉及到consumer客户端与broker的两边分析,最终发现逻辑还是比较

RocketMQ 整合SpringBoot发送事务消息

环境 jdk: 8u22rocketmq: rocketmq-all-4.5.2-bin-releasespringboot: 2.1.6.RELEASErocketmq-springboot: 2.0.3 发送流程(事务消息) Rocket发送事务消息:1.由producer发送prepare(半消息)给MQ的broker2.prepare消息发送成功以后执行本地业务(本地事务),根据本地事务执行结果手动返回相应状态(RocketMQLocalTransactionState.COMMIT.R

ActiveMQ(17):Message之消息属性与自身的系统消息地址

一.消息属性 ActiveMQ支持很多消息属性,具体可以参见 http://activemq.apache.org/activemq-message-properties.html 常见的一些属性说明 1:Queue的消息默认是持久化的 2:消息的优先级默认是4 3:消息发送时设置了时间戳 4:消息的过期时间默认是永不过期,过期的消息进入DLQ,可以配置DLQ及其处理策略 5:如果消息时重发的,将会标记出来 6:JMSReplyTo标识响应消息发送到哪个Queue 7:JMSCorelation

JS Message 网页消息提醒

JS message是一个非常小的(用gzip压缩之后才3kb)JavaScript library 用于轻松在网页上展示通知提醒.除了通知,它还支持创建带风格的对话框和确认对话框.不需要任何JS框架.每一种消息类型(除了通知提醒)都拥有回调事件用于实现可定制的交互功能.消息外观也通过CSS自定义. 支持大部分浏览器除了IE6. 项目地址: http://dhtmlx.github.com/message/ 下载地址: https://github.com/DHTMLX/message Cust