关于RocketMQ消息消费与重平衡的一些问题探讨

其实最好的学习方式就是互相交流,最近也有跟网友讨论了一些关于 RocketMQ 消息拉取与重平衡的问题,我姑且在这里写下我的一些总结。

关于 push 模式下的消息循环拉取问题

之前发表了一篇关于重平衡的文章:「Kafka 重平衡机制」,里面有说到 RocketMQ 重平衡机制是每隔 20s 从任意一个 Broker 节点获取消费组的消费 ID 以及订阅信息,再根据这些订阅信息进行分配,然后将分配到的信息封装成 pullRequest 对象 pull 到 pullRequestQueue 队列中,拉取线程唤醒后执行拉取任务,流程图如下:

但是其中有一些是没有详细说的,比如每次拉消息都要等 20s 吗?真的有个网友问了我如下问题:

很显然他的项目是用了 push 模式进行消息拉取,要回答这个问题,就要从 RockeMQ 的消息拉取说起:

RocketMQ 的 push 模式的实现是基于 pull 模式,只不过在 pull 模式上套了一层,所以RocketMQ push 模式并不是真正意义上的 ”推模式“,因此,在 push 模式下,消费者拉取完消息后,立马就有开始下一个拉取任务,并不会真的等 20s 重平衡后才拉取,至于 push 模式是怎么实现的,那就从源码去找答案。

之前有写过一篇文章:「RocketMQ为什么要保证订阅关系的一致性?」,里面有说过 消息拉取是从 PullRequestQueue 阻塞队列中取出 PullRequest 拉取任务进行消息拉取的,但 PullRequest 是怎么放进 PullRequestQueue 阻塞队列中的呢?

RocketMQ 一共提供了以下方法:

org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestImmediately:

public void executePullRequestImmediately(final PullRequest pullRequest) {
  try {
    this.pullRequestQueue.put(pullRequest);
  } catch (InterruptedException e) {
    log.error("executePullRequestImmediately pullRequestQueue.put", e);
  }
}

从调用链发现,除了重平衡会调用该方法之外,在 push 模式下,PullCallback 回调对象中的 onSuccess 方法在消息消费时,也调用了该方法:

org.apache.rocketmq.client.consumer.PullCallback#onSuccess:

case FOUND:

// 如果本次拉取消息为空,则继续将pullRequest放入阻塞队列中
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
  DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
  // 将消息放入消费者消费线程去执行
  DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
    pullResult.getMsgFoundList(), //
    processQueue, //
    pullRequest.getMessageQueue(), //
    dispathToConsume);
  // 将pullRequest放入阻塞队列中
  DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}

当从 broker 拉取到消息后,如果消息被过滤掉,则继续将pullRequest放入阻塞队列中继续循环执行消息拉取任务,否则将消息放入消费者消费线程去执行,在pullRequest放入阻塞队列中。

case NO_NEW_MESSAGE:

case NO_MATCHED_MSG:

pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

如果从 broker 端没有可拉取的新消息或者没有匹配到消息,则将pullRequest放入阻塞队列中继续循环执行消息拉取任务。

从以上消息消费逻辑可以看出,当消息处理完后,立即将 pullRequest 重新放入阻塞队列中,因此这就很好解释为什么 push 模式可以持续拉取消息了:

在 push 模式下消息消费完后,还会调用该方法重新将 PullRequest 对象放进 PullRequestQueue 阻塞队列中,不断地从 broker 中拉取消息,实现 push 效果。

重平衡后队列被其它消费者分配后如何处理?

继续再想一个问题,如果重平衡后,发现某个队列被新的消费者分配了,怎么办,总不能继续从该队列中拉取消息吧?

RocketMQ 重平衡后会检查 pullRequest 是否还在新分配的列表中,如果不在,则丢弃,调用 isDrop() 可查出该pullRequest是否已丢弃:

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage:

final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
  log.info("the pull request[{}] is dropped.", pullRequest.toString());
  return;
}

在消息拉取之前,首先判断该队列是否被丢弃,如果已丢弃,则直接放弃本次拉取任务。

那什么时候队列被丢弃呢?

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:

Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
  Entry<MessageQueue, ProcessQueue> next = it.next();
  MessageQueue mq = next.getKey();
  ProcessQueue pq = next.getValue();

  if (mq.getTopic().equals(topic)) {
    // 判断当前缓存 MessageQueue 是否包含在最新的 mqSet 中,如果不存在则将队列丢弃
    if (!mqSet.contains(mq)) {
      pq.setDropped(true);
      if (this.removeUnnecessaryMessageQueue(mq, pq)) {
        it.remove();
        changed = true;
        log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
      }
    } else if (pq.isPullExpired()) {
      // 如果队列拉取过期则丢弃
      switch (this.consumeType()) {
        case CONSUME_ACTIVELY:
          break;
        case CONSUME_PASSIVELY:
          pq.setDropped(true);
          if (this.removeUnnecessaryMessageQueue(mq, pq)) {
            it.remove();
            changed = true;
            log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                      consumerGroup, mq);
          }
          break;
        default:
          break;
      }
    }
  }
}

updateProcessQueueTableInRebalance 方法在重平衡时执行,用于更新 processQueueTable,它是当前消费者的队列缓存列表,以上方法逻辑判断当前缓存 MessageQueue 是否包含在最新的 mqSet 中,如果不包含其中,则说明经过这次重平衡后,该队列被分配给其它消费者了,或者拉取时间间隔太大过期了,则调用 setDropped(true) 方法将队列置为丢弃状态。

可能你会问,processQueueTable 跟 pullRequest 里面 processQueue 有什么关联,往下看:

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:

// 新建 ProcessQueue
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
  // 将ProcessQueue放入processQueueTable中
  ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
  if (pre != null) {
    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
  } else {
    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
    PullRequest pullRequest = new PullRequest();
    pullRequest.setConsumerGroup(consumerGroup);
    pullRequest.setNextOffset(nextOffset);
    pullRequest.setMessageQueue(mq);
    // 将ProcessQueue放入pullRequest拉取任务对象中
    pullRequest.setProcessQueue(pq);
    pullRequestList.add(pullRequest);
    changed = true;
  }
}

可以看出,重平衡时会创建 ProcessQueue 对象,将其放入 processQueueTable 缓存队列表中,再将其放入 pullRequest 拉取任务对象中,也就是 processQueueTable 中的 ProcessQueue 与 pullRequest 的中 ProcessQueue 是同一个对象。

重平衡后会导致消息重复消费吗?

之前在群里有个网友提了这个问题:

我当时回答他 RocketMQ 正常也是没有重复消费,但后来发现其实 RocketMQ 在某些情况下,也是会出现消息重复消费的现象。

前面讲到,RocketMQ 消息消费时,会将消息放进消费线程中去执行,代码如下:

org.apache.rocketmq.client.consumer.PullCallback#onSuccess:

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
  pullResult.getMsgFoundList(), //
  processQueue, //
  pullRequest.getMessageQueue(), //
  dispathToConsume);

ConsumeMessageService 类实现消息消费的逻辑,它有两个实现类:

// 并发消息消费逻辑实现类
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
// 顺序消息消费逻辑实现类
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;

先看并发消息消费相关处理逻辑:

ConsumeMessageConcurrentlyService:

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run:

if (this.processQueue.isDropped()) {
  log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
  return;
}

// 消息消费逻辑
// ...

// 如果队列被设置为丢弃状态,则不提交消息消费进度
if (!processQueue.isDropped()) {
    ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
    log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}

ConsumeRequest 是一个继承了 Runnable 的类,它是消息消费核心逻辑的实现类,submitConsumeRequest 方法将 ConsumeRequest 放入 消费线程池中执行消息消费,从它的 run 方法中可看出,如果在执行消息消费逻辑中有节点加入,重平衡后该队列被分配给其它节点进行消费了,此时的队列被丢弃,则不提交消息消费进度,因为之前已经消费了,此时就会造成消息重复消费的情况。

再来看看顺序消费相关处理逻辑:

ConsumeMessageOrderlyService:

org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest#run:

public void run() {
  // 判断队列是否被丢弃
  if (this.processQueue.isDropped()) {
    log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
    return;
  }

  final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
  synchronized (objLock) {
    // 如果不是广播模式,且队列已加锁且锁没有过期
    if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
        || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
      final long beginTime = System.currentTimeMillis();
      for (boolean continueConsume = true; continueConsume; ) {
        // 再次判断队列是否被丢弃
        if (this.processQueue.isDropped()) {
          log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
          break;
        }

        // 消息消费处理逻辑
        // ...

          continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
        } else {
          continueConsume = false;
        }
      }
    } else {
      if (this.processQueue.isDropped()) {
        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
        return;
      }
      ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
    }
  }
}

RocketMQ 顺序消息消费会将队列锁定,当队列获取锁之后才能进行消费,所以,即使消息在消费过程中有节点加入,重平衡后该队列被分配给其它节点进行消费了,此时的队列被丢弃,依然不会造成重复消费。

更多精彩文章请关注作者维护的公众号「后端进阶」,这是一个专注后端相关技术的公众号。
关注公众号并回复「后端」免费领取后端相关电子书籍。
欢迎分享,转载请保留出处。

原文地址:https://www.cnblogs.com/objcoding/p/11809010.html

时间: 2024-10-10 12:54:51

关于RocketMQ消息消费与重平衡的一些问题探讨的相关文章

RocketMQ(消息重发、重复消费、事务、消息模式)

RocketMQ基础:https://github.com/apache/rocketmq/tree/rocketmq-all-4.5.1/docs/cn 分布式消息系统作为实现分布式系统可扩展.可伸缩性的关键组件,需要具有高吞吐量.高可用等特点.而谈到消息系统的设计,就回避不了两个问题: 消息的顺序问题 消息的重复问题 RocketMQ作为阿里开源的一款高性能.高吞吐量的消息中间件,它是怎样来解决这两个问题的?RocketMQ 有哪些关键特性?其实现原理是怎样的? 关键特性以及其实现原理 一.

程序重启RocketMQ消息重复消费

最近在调试RocketMQ消息发送与消费的Demo时,发现一个问题:只要重启程序,RocketMQ消息就会重复消费. 那么这是什么原因导致的,又该如何解决呢? 经过一番排查,发现程序使用的RocketMQ客户端版本是3.6.2,而测试环境安装的RocketMQ环境的版本是4.1.0.原来是客户端和服务器端版本不一样导致的,消息并没有最终被消费,即没有ACK消息确认,只要程序重启就会重复消费. 解决方案:RocketMQ客户端版本使用与服务器端的同一版本,即4.1.0版本. 划重点:使用Rocke

RocketMQ源码解析-消息消费

RocketMQ源码解析-消息消费 1.消费者相关类 2.消费者的启动 3.消息的拉取 4.消费者的负载均衡 5.消息的消费 6.消费进度管理 看了很多遍的代码,还是决定动手把记录下来,梳理一下整体结构和实现细节,给大家一个参考,写的不好的地方请多多指教 RocketMQ中消息的消费分为2种方式,一种是pull模式,一种为push模式(基于pull模式实现),大部分的业务场合下业界用的比较多的是push模式,一句话你没有特殊需求就用push,push模式可以达到准实时的消息推送 那什么时候可以用

RocketMQ事务消费和顺序消费详解

一.RocketMq有3中消息类型 1.普通消费 2. 顺序消费 3.事务消费 顺序消费场景 在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一.创建订单 ,第二:订单付款,第三:订单完成.也就是这个三个环节要有顺序,这个订单才有意义.RocketMQ可以保证顺序消费. rocketMq实现顺序消费的原理 produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息

51.RocketMQ 顺序消费

3种不同模式的Producer NormalProducer(普通) OrderProducer(顺序) TransactionProducer(事务) 生产者 1 /** 2 * Copyright (C) 2010-2013 Alibaba Group Holding Limited 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this fil

RocketMQ 消息队列单机部署及使用

转载请注明来源:http://blog.csdn.net/loongshawn/article/details/51086876 相关文章: <RocketMQ 消息队列单机部署及使用> < java编写简单消息队列.实现高德坐标变形服务> 0 RocketMQ简单介绍 0.1 介绍 RocketMQ是一个消息中间件. 消息中间件中有两个角色:消息生产者和消息消费者.RocketMQ里相同有这两个概念.消息生产者负责创建消息并发送到RocketMQ服务器.RocketMQ服务器会将

RocketMq消息队列使用

最近在看消息队列框架 ,alibaba的RocketMQ单机支持1万以上的持久化队列,支持诸多特性, 目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景 比kafka还是有过之无不及,其实kafka文档很丰富 但RocketMQ网上的文章太少,找不到相关的操作教程 于是研究了下源码 做个单机操作的教程,如果你也对此有兴趣不妨共同研究 下载源码的地址 https://github.com/alibaba/RocketMQ/relea

RocketMQ 顺序消费只消费一次 坑

rocketMq实现顺序消费的原理 produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息 注意:是把把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue 单个节点(Producer端1个.Consumer端1个) 1.Producer.java  package order; import java.util.List;

WM_PAINT消息在窗口重绘的时候产生,那什么时候窗口会重绘(异步工作方式,效率更高,灵活性更强)

Q:wm_paint消息在窗口重绘的时候产生,那什么时候窗口会重绘?? A: 严格地说,只有当收到WM_PAINT消息后窗口会重绘 但是引起这个消息的事件有很多, 比如: 首次创建 移动 改变大小 showwindow/ activate window/ invalidate window .... 系统为什么不在调用Invalidate时发送WM_PAINT消息呢?又为什么非要等应用消息队列为空时才发送WM_PAINT消息呢?这是因为系统把在窗口中的绘制操作当作一种低优先级的操作,于是尽可能地