RocketMQ源码解析-消息消费

RocketMQ源码解析-消息消费

1.消费者相关类

2.消费者的启动

3.消息的拉取

4.消费者的负载均衡

5.消息的消费

6.消费进度管理

看了很多遍的代码,还是决定动手把记录下来,梳理一下整体结构和实现细节,给大家一个参考,写的不好的地方请多多指教
RocketMQ中消息的消费分为2种方式,一种是pull模式,一种为push模式(基于pull模式实现),大部分的业务场合下业界用的比较多的是push模式,一句话你没有特殊需求就用push,push模式可以达到准实时的消息推送
那什么时候可以用pull模式呢?比如在高并发的场景下,消费端的性能可能会达到瓶颈的情况下,消费端可以采用pull模式,消费端根据自身消费情况去拉取,虽然push模式在消息拉取的过程中也会有流控(当前ProcessQueue队列有1000条消息还没有消费或者当前ProcessQueue中最大偏移量和最小偏移量超过2000将会触发流控,流控的策略就是延迟50ms再拉取消息),但是这个值在实际情况下,可能每台机器的性能都不太一样,会不好控制。

消费者相关类和API

  • MQConsumer
  • sendMessageBack(),If consuming failure,message will be send back to the broker,and delay consuming some time,如果消费端消费失败会把消息发送回broker端,稍后会重新消费
  • fetchSubscribeMessageQueues(topic) ,Fetch message queues from consumer cache according to the topic,根据topic从消费端缓存中获取MessageQueue的Set集合
  • MQPushConsumer
  • registerMessageListener() 注册消息事件监听器,有并发模式和顺序模式俩种
  • subscribe() 基于主题订阅消息,可以带上消息过滤表达式subExpression,TAG或者SQL92,类模式过滤会在5.0.0版本中去掉,不建议使用
  • DefaultMQPushConsumer
  • 先看下DefaultMQPushConsumer的重要参数,如果第一次看会不记得,后面的代码看多了慢慢会体会到这些参数的作用,类中持有DefaultMQPushConsumerImpl对象,DefaultMQPushConsumerImpl类实现了大部分的消息消费功能
//消费组
private String consumerGroup;
//消费端模式,默认为集群模式,还有一种广播模式
private MessageModel messageModel = MessageModel.CLUSTERING;
//根据消费进度从broker拉取不到消息时采取的策略
//1.CONSUME_FROM_LAST_OFFSET 最大偏移量开始
//2.CONSUME_FROM_FIRST_OFFSET 最小偏移量开始
//3.CONSUME_FROM_TIMESTAMP 从消费者启动时间戳开始
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
//集群模式下消息队列负载策略
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;

//消息过滤关系
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();

   //消息消费监听器
    private MessageListener messageListener;

    //消息消费进度存储器
    private OffsetStore offsetStore;

    //消费线程最小线程数
    private int consumeThreadMin = 20;

    //消费线程最大线程数,因为消费线程池用的是无界队列,所以这个参数用不上,原因请参考线程池原理
    private int consumeThreadMax = 64;

    //动态调整线程数量的阀值
    private long adjustThreadPoolNumsThreshold = 100000;

    //并发消费时拉取消息前会有流控,会判断处理队列中最大偏移量和最小偏移量的跨度,不能大于2000
    private int consumeConcurrentlyMaxSpan = 2000;

    //push模式下任务拉取的时间间隔
    private long pullInterval = 0;

    //每次消费者实际消费的数量,不是从broker端拉取的数量
    private int consumeMessageBatchMaxSize = 1;

    //从broker端拉取的数量
    private int pullBatchSize = 32;

    //是否每次拉取之后都跟新订阅关系
    private boolean postSubscriptionWhenPull = false;

   //消息最大消费重试次数
    private int maxReconsumeTimes = -1;

    //延迟将该消息提交到消费者的线程池等待时间,默认1s
    private long suspendCurrentQueueTimeMillis = 1000;

    //消费超时时间,15分钟
    private long consumeTimeout = 15;

消费者的启动

代码入口,DefaultMQPushConsumerImpl#start()方法

 public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                ...
                this.serviceState = ServiceState.START_FAILED;
                //1.检查配置信息
                this.checkConfig();
                //2.加工订阅信息,将Map<String /* topic*/, String/* subExtentions*/>转换为Map<String,SubscriptionData>,同时,如果消息消费模式为集群模式,还需要为该消费组创建一个重试主题。
                this.copySubscription();

进入copySubscription()方法:

Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
            if (sub != null) {
                for (final Map.Entry<String, String> entry : sub.entrySet()) {
                    final String topic = entry.getKey();
                    final String subString = entry.getValue();
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                        topic, subString);
                    this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                }
            }

            if (null == this.messageListenerInner) {
                this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
            }

            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    break;
                case CLUSTERING:
                    //集群模式下给topic创建一个retry的topic,retry+comsumerGroup
                    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                        retryTopic, SubscriptionData.SUB_ALL);
                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
 ···

原文地址:https://www.cnblogs.com/langtutengyixiao/p/10807497.html

时间: 2024-11-05 18:51:45

RocketMQ源码解析-消息消费的相关文章

rocketmq源码解析之NamesrvController创建

说在前面 本次开始进行rocketmq源码解析,比较喜欢rocketmq的架构设计,rocketmq内嵌了namesrv注册中心保存了元数据,进行负载均衡.容错的一些处理,4.3以上支持消息事务,有管理控制台.命令行工具,底层namesrv与broker.client与server交互netty实现. 源码解析 创建NamesrvController,进入这个方法org.apache.rocketmq.namesrv.NamesrvStartup#main,再进入这个方法org.apache.r

源码解析——消息机制

映象笔记的链接:源码解析--消息机制

消息中间件 RocketMQ源码解析:事务消息

关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址 您对于源码的疑问每条留言都将得到认真回复.甚至不知道如何读源码也可以请教噢. 新的源码解析文章实时收到通知.每周更新一篇左右. 1. 概述 2. 事务消息发送 2.1 Producer 发送事务消息 2.2 Broker 处理结束事务请求 2.3 Broker 生成

RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想

RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理.首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中. 官方版本未发布之前,从apache rocketmq第一个版本上线后,代码中存在与事务消息相关的代码,例如COMMIT.ROLLBACK.PREPARED,在事务消息未开源之前网上对于事务消息的"声音"基本上是使用类似二阶段提交,主要是根据消息系统标志MessageSysFlag中定义来推测的: TRANSACTION_P

RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚)

本文将重点分析RocketMQ Broker如何处理事务消息提交.回滚命令,根据前面的介绍,其入口EndTransactionProcessor#proce***equest: OperationResult result = new OperationResult();if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // @1result = this.brokerCont

RocketMQ 源码分析(二) —— Message 存储

CommitLog 结构 CommitLog.MappedFileQueue.MappedFile 的关系如下: CommitLog : MappedFileQueue : MappedFile = 1 : 1 : N. 反应到系统文件如下: ··· Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd /Users/yunai/store/commitlog Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -l t

Android 开源项目源码解析(第二期)

Android 开源项目源码解析(第二期) 阅读目录 android-Ultra-Pull-To-Refresh 源码解析 DynamicLoadApk 源码解析 NineOldAnimations 源码解析 SlidingMenu 源码解析 Cling 源码解析 BaseAdapterHelper 源码分析 Side Menu.Android 源码解析 DiscreteSeekBar 源码解析 CalendarListView 源码解析 PagerSlidingTabStrip 源码解析 公共

RocketMQ源码 — 六、 RocketMQ高可用(1)

高可用究竟指的是什么?请参考:关于高可用的系统 RocketMQ做了以下的事情来保证系统的高可用 多master部署,防止单点故障 消息冗余(主从结构),防止消息丢失 故障恢复(本篇暂不讨论) 那么问题来了: 怎么支持多broker的写? 怎么实现消息冗余? 下面分开说明这两个问题 多master集群 这里强调出master集群,是因为需要多个broker set,而一个broker set只有一个master(见下文的"注意"),所以是master集群 broker有三种角色:ASY

[slf4j+log] 源码解析

slf4j: The Simple Logging Facade for java即 java简单的日志门面.统一定义了一系列的日志接口,使用户可以使用统一的接口来记录日志.logback,log4j等框架都实现了这些接口,启动时动态地决定真正的日志框架.本文以slf4j+logback的源码来讲解整个绑定和打印日志的流程. 手动阅读目录如下: 绑定日志框架 解析配置文件获取LoggerFactory:对于logback而言就是LoggerContext 获取Logger:在LoggerActi