RocketMQ(5)---RocketMQ重试机制

RocketMQ重试机制

消息重试分为两种:Producer发送消息的重试 Consumer消息消费的重试

一、Producer端重试

Producer端重试是指: Producer往MQ上发消息没有发送成功,比如网络原因导致生产者发送消息到MQ失败。

看一下代码:

@Slf4j
public class RocketMQTest {
    /**
     * 生产者组
     */
    private static String PRODUCE_RGROUP = "test_producer";

    public static void main(String[] args) throws Exception {
        //1、创建生产者对象
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCE_RGROUP);
        //设置重试次数(默认2次)
        producer.setRetryTimesWhenSendFailed(3000);
        //绑定name server
        producer.setNamesrvAddr("74.49.203.55:9876");
        producer.start();
        //创建消息
        Message message = new Message("topic_family", ("小小今年3岁" ).getBytes());
        //发送 这里填写超时时间是5毫秒 所以每次都会发送失败
        SendResult sendResult = producer.send(message,5);
        log.info("输出生产者信息={}",sendResult);
    }
}

超时重试 针对网上说的超时异常会重试的说法都是错误的,想想都觉得可怕,我查的所以文章都说超时异常都会重试,难道这么多人都没有去测试一下 或者去看个源码。

我发现这个问题,是因为我上面超时时间设置为5毫秒 ,按照正常肯定会报超时异常,但我设置1次重试和3000次的重试,虽然最终都会报下面异常,但输出错误时间报

显然不应该是一个级别。但测试发现无论我设置的多少次的重试次数,报异常的时间都差不多。

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout

针对这个疑惑,我去看了源码之后,才恍然大悟。

   /**
     * 说明 抽取部分代码
     */
    private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) {

        //1、获取当前时间
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev ;
        //2、去服务器看下有没有主题消息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            //3、通过这里可以很明显看出 如果不是同步发送消息 那么消息重试只有1次
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            //4、根据设置的重试次数,循环再去获取服务器主题消息
            for (times = 0; times < timesTotal; times++) {
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                beginTimestampPrev = System.currentTimeMillis();
                long costTime = beginTimestampPrev - beginTimestampFirst;
                //5、前后时间对比 如果前后时间差 大于 设置的等待时间 那么直接跳出for循环了 这就说明连接超时是不进行多次连接重试的
                if (timeout < costTime) {
                    callTimeout = true;
                    break;

                }
                //6、如果超时直接报错
                if (callTimeout) {
                    throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
                }
        }
    }

通过这段源码很明显可以看出以下几点

  1. 如果是异步发送 那么重试次数只有1次
  2. 对于同步而言,超时异常也是不会再去重试
  3. 如果发生重试是在一个for 循环里去重试,所以它是立即重试而不是隔一段时间去重试。

真是实践出真知!!!

二、 Consumer端重试

消费端比较有意思,而且在实际开发过程中,我们也更应该考虑的是消费端的重试。

消费者端的失败主要分为2种情况,ExceptionTimeout

1、Exception

@Slf4j
@Component
public class Consumer {
    /**
     * 消费者实体对象
     */
    private DefaultMQPushConsumer consumer;
    /**
     * 消费者组
     */
    public static final String CONSUMER_GROUP = "test_consumer";
    /**
     * 通过构造函数 实例化对象
     */
    public Consumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr("47.99.203.55:9876;47.99.203.55:9877");
        //订阅topic和 tags( * 代表所有标签)下信息
        consumer.subscribe("topic_family", "*");
        //注册消费的监听 并在此监听中消费信息,并返回消费的状态信息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            //1、获取消息
            Message msg = msgs.get(0);
            try {
                //2、消费者获取消息
                String body = new String(msg.getBody(), "utf-8");
                //3、获取重试次数
                int count = ((MessageExt) msg).getReconsumeTimes();
                log.info("当前消费重试次数为 = {}", count);
                //4、这里设置重试大于3次 那么通过保存数据库 人工来兜底
                if (count >= 2) {
                    log.info("该消息已经重试3次,保存数据库。topic={},keys={},msg={}", msg.getTopic(), msg.getKeys(), body);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                //直接抛出异常
                throw new Exception("=======这里出错了============");
                //return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        //启动监听
        consumer.start();
    }
}

这里的代码意思很明显: 主动抛出一个异常,然后如果超过3次,那么就不继续重试下去,而是将该条记录保存到数据库由人工来兜底。

看下运行结果

注意 消费者和生产者的重试还是有区别的,主要有两点

1、默认重试次数:Product默认是2次,而Consumer默认是16次

2、重试时间间隔:Product是立刻重试,而Consumer是有一定时间间隔的。它照1S,5S,10S,30S,1M,2M····2H进行重试。

2、Timeout

说明 这里的超时异常并非真正意义上的超时,它指的是指获取消息后,因为某种原因没有给RocketMQ返回消费的状态,即没有return ConsumeConcurrentlyStatus.CONSUME_SUCCESSreturn ConsumeConcurrentlyStatus.RECONSUME_LATER

那么 RocketMQ会认为该消息没有发送,会一直发送。因为它会认为该消息根本就没有发送给消费者,所以肯定没消费。

做这个测试很简单。

        //1、消费者获得消息
        String body = new String(msg.getBody(), "utf-8");
        //2、获取重试次数
        int count = ((MessageExt) msg).getReconsumeTimes();
        log.info("当前消费重试次数为 = {}", count);
        //3、这里睡眠60秒
        Thread.sleep(60000);
       log.info("休眠60秒 看还能不能走到这里。topic={},keys={},msg={}", msg.getTopic(), msg.getKeys(), body);
        //返回成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

当获得 当前消费重试次数为 = 0 后 , 关掉该进程。再重新启动该进程,那么依然能够获取该条消息

consumer消费者  当前消费重试次数为 = 0
休眠60秒 看还能不能走到这里。topic=topic_family,keys=1a2b3c4d5f,msg=小小今年3岁
只要自己变优秀了,其他的事情才会跟着好起来(上将2)

原文地址:https://www.cnblogs.com/qdhxhz/p/11117379.html

时间: 2024-08-03 10:50:20

RocketMQ(5)---RocketMQ重试机制的相关文章

RocketMQ重试机制和消息幂

一.重试机制 1.由于MQ经常处于复杂的分布式系统中,考虑网络波动,服务宕机,程序异常因素,很有可能出现消息发送或者消费失败的问题.因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点.如果没有消息重试,就可能产生消息丢失的问题,可能对系统产生很大的影响.所以,秉承宁可多发消息,也不可丢失消息的原则,大部分MQ都对消息重试提供了很好的支持. 2.RocketMQ为了使用者封装了消息重试的处理流程,无需开发人员手动处理.RocketMQ支持了生产端和消费端两类重试机制. 模拟异常 Consum

Rocket重试机制,消息模式,刷盘方式

一.Consumer 批量消费(推模式) 可以通过 consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条 这里需要分为2种情况 Consumer端先启动 Consumer端后启动.   正常情况下:应该是Consumer需要先启动 注意:如果broker采用推模式的话,consumer先启动,会一条一条消息的消费,consumer后启动会才用批量消费 Consumer端先启动 1.Consumer.java package quickstart

Volley超时重试机制详解

Volley超时重试机制 基础用法 Volley为开发者提供了可配置的超时重试机制,我们在使用时只需要为我们的Request设置自定义的RetryPolicy即可. 参考设置代码如下: int DEFAULT_TIMEOUT_MS = 10000; int DEFAULT_MAX_RETRIES = 3; StringRequest stringRequest = new StringRequest(Request.Method.GET, url, new Response.Listener<S

jedis超时重试机制注意事项

最近使用redis集群进行incr操作,总是发现计数不准确,后来经过检查发现redis在执行incr超时会执行重试机制,造成计数不准确,测试代码: /** * incrf: * 将 key 中储存的数字值增一. 如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCR 操作. 如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误. 本操作的值限制在 64 位(bit)有符号数字表示之内. 这是一个针对字符串的操作,因为 Redis 没有专用的整数类型,

为你的代码加上一层重试机制

为代码加上重试机制 1.前言:对于经常跟网络编程打交道的你来说,并不是你的每次Request,Server都会给你想要的Response.重试机制虽然并不能解决这种情况,但是却可以大大减少这种情况的发生. 2.介绍下重试机制类:RetryUtil.cs 使用了委托,代码很短,也不难理解. 1 public class RetryUtil 2 { 3 public delegate void NoArgumentHandler(); 4 /// <summary> 5 /// retry mec

guava的重试机制guava-retrying使用

1,添加maven依赖 <dependency> <groupId>com.github.rholder</groupId> <artifactId>guava-retrying</artifactId> <version>2.0.0</version> </dependency> 2,定义重试机制 Retryer<CMSResultDTO> smsRetryer = RetryerBuilder.

PHP-RESQUE重试机制

因为PHP-Resque 的重试需要自己写,网上又没啥轮子,而且resque也很久不更新了,所以自己研究下resque的源码,然后也借鉴了Laravel的队列重试机制,实现了PHP-Resque的重试机制. 设计思路 阅读resque源码,我们知道,resque把失败的队列的数据都放在了resque:failed列表,数据如下. 我的项目是在yii2下统一处理resque的,我直接resque源码在新增了retry方法,在每一个failJob的payload增加了attempts尝试次数,一开始

SpringCloud | FeignClient和Ribbon重试机制区别与联系

在spring cloud体系项目中,引入的重试机制保证了高可用的同时,也会带来一些其它的问题,如幂等操作或一些没必要的重试. 今天就来分别分析一下 FeignClient 和 Ribbon 重试机制的实现原理和区别,主要分为三点: 1)FeignClient重试机制分析 2)Ribbon重试机制分析 3)FeignClient和Ribbon重试机制的区别于联系 1)FeignClient 重试机制分析: FeignClient 重试机制的实现原理相对简单.首先看一下feignClient处理请

【Dubbo 源码解析】07_Dubbo 重试机制

Dubbo 重试机制 通过前面 Dubbo 服务发现&引用 的分析,我们知道,Dubbo 的重试机制是通过 com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker 来实现的: public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcExce