Rocket重试机制,消息模式,刷盘方式

一、Consumer 批量消费(推模式)

可以通过

consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条  

这里需要分为2种情况

  • Consumer端先启动
  • Consumer端后启动.   正常情况下:应该是Consumer需要先启动

注意:如果broker采用推模式的话,consumer先启动,会一条一条消息的消费,consumer后启动会才用批量消费

Consumer端先启动

1、Consumer.java

package quickstart;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
 * Consumer,订阅消息
 */
public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
        consumer.setConsumeMessageBatchMaxSize(10);
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费 ,(消费顺序消息的时候设置)
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  

                try {
                    System.out.println("msgs的长度" + msgs.size());
                    System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }  

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });  

        consumer.start();  

        System.out.println("Consumer Started.");
    }
}  

由于这里是Consumer先启动,所以他回去轮询MQ上是否有订阅队列的消息,由于每次producer插入一条,Consumer就拿一条所以测试结果如下(每次size都是1)

2、Consumer端后启动,也就是Producer先启动

由于这里是Consumer后启动,所以MQ上也就堆积了一堆数据,Consumer的

consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条    

所以这段代码就生效了测试结果如下(每次size最多是10):

二、消息重试机制:消息重试分为2种

  • 1、Producer端重试

  • 2、Consumer端重试

1、Producer端重试 

也就是Producer往MQ上发消息没有发送成功,我们可以设置发送失败重试的次数,发送并触发回调函数

          //设置重试的次数
            producer.setRetryTimesWhenSendFailed(3);
            //开启生产者
            producer.start();
            //创建一条消息
            Message msg = new Message("PushTopic", "push", "1",   "我是一条普通消息".getBytes());
            //发送消息
            SendResult result = producer.send(msg);
            //发送,并触发回调函数
            producer.send(msg, new SendCallback() {  

                @Override
                //成功的回调函数
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult.getSendStatus());
                    System.out.println("成功了");
                }  

                @Override
                //出现异常的回调函数
                public void onException(Throwable e) {
                System.out.println("失败了"+e.getMessage());  

                }
            }); 

2、Consumer端重试

2.1、exception的情况,一般重复16次 10s、30s、1分钟、2分钟、3分钟等等

上面的代码中消费异常的情况返回

return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试

正常则返回:

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功

package quickstart;  

import java.util.List;  

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;  

/**
 * Consumer,订阅消息
 */
public class Consumer {  

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
        consumer.setConsumeMessageBatchMaxSize(10);
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  

        consumer.subscribe("TopicTest", "*");  

        consumer.registerMessageListener(new MessageListenerConcurrently() {  

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  

                try {
                    // System.out.println("msgs的长度" + msgs.size());
                    System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                    for (MessageExt msg : msgs) {
                        String msgbody = new String(msg.getBody(), "utf-8");
                        if (msgbody.equals("Hello RocketMQ 4")) {
                            System.out.println("======错误=======");
                            int a = 1 / 0;
                        }
                    }  

                } catch (Exception e) {
                    e.printStackTrace();
                    if(msgs.get(0).getReconsumeTimes()==3){
                        //记录日志  

                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
                    }else{  

                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
                    }
                }  

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
            }
        });  

        consumer.start();  

        System.out.println("Consumer Started.");
    }
}  

打印结果:

假如超过了多少次之后我们可以让他不再重试记录 日志。

if(msgs.get(0).getReconsumeTimes()==3){
//记录日志  
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}

2.2超时的情况,这种情况MQ会无限制的发送给消费端。

就是由于网络的情况,MQ发送数据之后,Consumer端并没有收到导致超时。也就是消费端没有给我返回return 任何状态,这样的就认为没有到达Consumer端。

这里模拟Producer只发送一条数据。consumer端暂停1分钟并且不发送接收状态给MQ

package model;  

import java.util.List;  

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;  

/**
 * Consumer,订阅消息
 */
public class Consumer {  

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
        consumer.setConsumeMessageBatchMaxSize(10);
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  

        consumer.subscribe("TopicTest", "*");  

        consumer.registerMessageListener(new MessageListenerConcurrently() {  

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  

                try {  

                    // 表示业务处理时间
                    System.out.println("=========开始暂停===============");
                    Thread.sleep(60000);  

                    for (MessageExt msg : msgs) {
                        System.out.println(" Receive New Messages: " + msg);
                    }  

                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
                }  

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
            }
        });  

        consumer.start();  

        System.out.println("Consumer Started.");
    }
}
 

三、消费模式

1、集群消费

2、广播消费

rocketMQ默认是集群消费,我们可以通过在Consumer来支持广播消费

consumer.setMessageModel(MessageModel.BROADCASTING);// 广播消费
package model;  

import java.util.List;  

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;  

/**
 * Consumer,订阅消息
 */
public class Consumer2 {  

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
        consumer.setConsumeMessageBatchMaxSize(10);
        consumer.setMessageModel(MessageModel.BROADCASTING);// 广播消费  

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  

        consumer.subscribe("TopicTest", "*");  

        consumer.registerMessageListener(new MessageListenerConcurrently() {  

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  

                try {  

                    for (MessageExt msg : msgs) {
                        System.out.println(" Receive New Messages: " + msg);
                    }  

                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
                }  

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
            }
        });  

        consumer.start();  

        System.out.println("Consumer Started.");
    }
}  

四、conf下的配置文件说明

异步复制和同步双写主要是主和从的关系。消息需要实时消费的,就需要采用主从模式部署

异步复制:比如这里有一主一从,我们发送一条消息到主节点之后,这样消息就算从producer端发送成功了,然后通过异步复制的方法将数据复制到从节点

同步双写:比如这里有一主一从,我们发送一条消息到主节点之后,这样消息就并不算从producer端发送成功了,需要通过同步双写的方法将数据同步到从节点后, 才算数据发送成功。

如果rocketMq才用双master部署,Producer往MQ上写入20条数据 其中Master1中拉取了12条 。Master2中拉取了8 条,这种情况下,Master1宕机,那么我们消费数据的时候,只能消费到Master2中的8条,Master1中的12条默认持久化,不会丢失消息,需要Master1恢复之后这12条数据才能继续被消费,如果想保证消息实时消费,就才用双Master双Slave的模式

五、刷盘方式

同步刷盘:在消息到达MQ后,RocketMQ需要将数据持久化,同步刷盘是指数据到达内存之后,必须刷到commitlog日志之后才算成功,然后返回producer数据已经发送成功。

异步刷盘:,同步刷盘是指数据到达内存之后,返回producer说数据已经发送成功。,然后再写入commitlog日志。

commitlog:

commitlog就是来存储所有的元信息,包含消息体,类似于MySQLOracle的redolog,所以主要有CommitLog在,Consume Queue即使数据丢失,仍然可以恢复出来。

consumequeue:记录数据的位置,以便Consume快速通过consumequeue找到commitlog中的数据

时间: 2024-12-25 05:18:55

Rocket重试机制,消息模式,刷盘方式的相关文章

RocketMQ(消息重发、重复消费、事务、消息模式)

RocketMQ基础:https://github.com/apache/rocketmq/tree/rocketmq-all-4.5.1/docs/cn 分布式消息系统作为实现分布式系统可扩展.可伸缩性的关键组件,需要具有高吞吐量.高可用等特点.而谈到消息系统的设计,就回避不了两个问题: 消息的顺序问题 消息的重复问题 RocketMQ作为阿里开源的一款高性能.高吞吐量的消息中间件,它是怎样来解决这两个问题的?RocketMQ 有哪些关键特性?其实现原理是怎样的? 关键特性以及其实现原理 一.

RocketMQ(5)---RocketMQ重试机制

RocketMQ重试机制 消息重试分为两种:Producer发送消息的重试 和 Consumer消息消费的重试. 一.Producer端重试 Producer端重试是指: Producer往MQ上发消息没有发送成功,比如网络原因导致生产者发送消息到MQ失败. 看一下代码: @Slf4j public class RocketMQTest { /** * 生产者组 */ private static String PRODUCE_RGROUP = "test_producer"; pub

RocketMQ重试机制和消息幂

一.重试机制 1.由于MQ经常处于复杂的分布式系统中,考虑网络波动,服务宕机,程序异常因素,很有可能出现消息发送或者消费失败的问题.因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点.如果没有消息重试,就可能产生消息丢失的问题,可能对系统产生很大的影响.所以,秉承宁可多发消息,也不可丢失消息的原则,大部分MQ都对消息重试提供了很好的支持. 2.RocketMQ为了使用者封装了消息重试的处理流程,无需开发人员手动处理.RocketMQ支持了生产端和消费端两类重试机制. 模拟异常 Consum

MySQL InnoDB 日志管理机制中的MTR和日志刷盘

1.MTR(mini-transaction) 在MySQL的 InnoDB日志管理机制中,有一个很重要的概念就是MTR.MTR是InnoDB存储擎中一个很重要的用来保证物理写的完整性和持久性的机制. 先看下MTR在MysQL架构中的位置. MTR是上面的逻辑层与下面物理层的交互窗口,同时也是用来保证下层物理数据正确性.完整性及持久性的机制. 2.日志刷盘的触发条件 触发条件 描述 时间 线程默认每秒刷新一次. 空间 Log Buffer空间用完了 Check Point checkPoint的

【WCF全析(一)】--服务协定及消息模式

上周微软开发布会说.NET支持完全跨平台和并开放Core源码的新闻,让我们顿时感到.NET要迎来它的春天.虽然早在几年前.NET就能开发Android和IOS,但是这次的跨平台把Linux都放到了微软战略之中,以后的.NET Developer就可以使用Vs开发Linux应用了,Developer又有了新的选择,从微软的战略转型也可以看出互联网已经步入到了新的模式,以后不再是PC的时代,移动互联和云时代已经到来. 最近做项目时使用到了WCF,项目把数据层和程序层进行了分割,相互之间的数据传输使用

【架构之路之WCF全析(一)】--服务协定及消息模式

上周微软开发布会说.NET支持完全跨平台和并开放Core源码的新闻,让我们顿时感到.NET要迎来它的春天.虽然早在几年前.NET就能开发Android和IOS,但是这次的跨平台把Linux都放到了微软战略之中,以后的.NET Developer就可以使用Vs开发Linux应用了,Developer又有了新的选择,从微软的战略转型也可以看出互联网已经步入到了新的模式,以后不再是PC的时代,移动互联和云时代已经到来. 最近做项目时使用到了WCF,项目把数据层和程序层进行了分割,相互之间的数据传输使用

Redis消息模式与主从复制

第1章 消息模式: 1.1 redis发布消息有两种模式: 1.      队列模式 2.      发布订阅模式 a)        任务队列:就是传递消息的队列,与任务队列进行交互的实体有两类,一类是生产者,另一类是消费者,生产者将需要处理的任务放在任务队里中,而消费者不断的从任务独立中读入任务消息并执行 任务队列的好处:松耦合,生产者和消费者只需按照约定的任务描述格式,进行编写代码 易于扩展,多消费者模式下,消费者可以分布在多个不通过额服务器中,由此降低单台服务器的负载 1.2 Redis

SpringCloud | FeignClient和Ribbon重试机制区别与联系

在spring cloud体系项目中,引入的重试机制保证了高可用的同时,也会带来一些其它的问题,如幂等操作或一些没必要的重试. 今天就来分别分析一下 FeignClient 和 Ribbon 重试机制的实现原理和区别,主要分为三点: 1)FeignClient重试机制分析 2)Ribbon重试机制分析 3)FeignClient和Ribbon重试机制的区别于联系 1)FeignClient 重试机制分析: FeignClient 重试机制的实现原理相对简单.首先看一下feignClient处理请

MySQL延迟问题和数据刷盘策略

一.MySQL复制流程官方文档流程图如下: 1.绝对的延时,相对的同步 2.纯写操作,线上标准配置下,从库压力大于主库,最起码从库有relaylog的写入. 二.MySQL延迟问题分析 1.主库DML请求频繁 原因:主库并发写入数据,而从库为单线程应用日志,很容易造成relaylog堆积,产生延迟. 解决思路:做sharding,打散写请求.考虑升级到MySQL 5.7+,开启基于逻辑时钟的并行复制. 2.主库执行大事务 原因:类似主库花费很长时间更新了一张大表,在主从库配置相近的情况下,从库也