RocketMQ源码 — 九、 RocketMQ延时消息

上一节消息重试里面提到了重试的消息可以被延时消费,其实除此之外,用户发送的消息也可以指定延时时间(更准确的说是延时等级),然后在指定延时时间之后投递消息,然后被consumer消费。阿里云的ons还支持定时消息,而且延时消息是直接指定延时时间,其实阿里云的延时消息也是定时消息的另一种表述方式,都是通过设置消息被投递的时间来实现的,但是Apache RocketMQ在版本4.2.0中尚不支持指定时间的延时,只能通过配置延时等级和延时等级对应的时间来实现延时。

一个延时消息被发出到消费成功经历以下几个过程:

  1. 设置消息的延时级别delayLevel
  2. producer发送消息
  3. broker收到消息在准备将消息写入存储的时候,判断是延时消息则更改Message的topic为延时消息队列的topic,也就是将消息投递到延时消息队列
  4. 有定时任务从延时队列中读取消息,拿到消息后判断是否达到延时时间,如果到了则修改topic为原始topic。并将消息投递到原始topic的队列
  5. consumer像消费其他消息一样从broker拉取消息进行消费

注意:批量消息是不支持延时消息的

tips:下文中说到的延时队列可以理解为一个ConsumeQueue

producer发送延时消息

在producer中发送消息的时候,设置Message的delayLevel

// org.apache.rocketmq.common.message.Message
public void setDelayTimeLevel(int level) {
    this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}

调用上面的方法设置延时等级的时候,会向message添加"DELAY"属性,后面broker处理延时消息就是依赖该属性进行特别的处理。

接下来发送消息的流程和正常发送消息的流程基本一致,只是会将该消息标记为延时消息类型

// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
    context.setMsgType(MessageType.Delay_Msg);
}

broker处理延时消息

broker收到延时消息和正常消息在前置的处理流程是一致的,对于延时消息的特殊处理体现在将消息写入存储(内存或文件)的时候

// org.apache.rocketmq.store.CommitLog#putMessage
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    // 省略中间代码...
    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

    // 拿到原始topic和对应的queueId
    String topic = msg.getTopic();
    int queueId = msg.getQueueId();

    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    // 非事务消息和事务的commit消息才会进一步判断delayLevel
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
        || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // Delay Delivery
        if (msg.getDelayTimeLevel() > 0) {
            // 纠正设置过大的level,就是delayLevel设置都大于延时时间等级的最大级
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }

            // 设置为延时队列的topic
            topic = ScheduleMessageService.SCHEDULE_TOPIC;
            // 每一个延时等级一个queue,queueId = delayLevel - 1
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

            // Backup real topic, queueId
            // 备份原始的topic和queueId
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            // 更新properties
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }
    // 省略中间代码...
}

上面的SCHEDULE_TOPIC是:

public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";

这个topic是一个特殊的topic,和正常的topic不同的地方是:

  1. 不会创建TopicConfig,因为也不需要consumer直接消费这个topic下的消息
  2. 不会将topic注册到namesrv
  3. 这个topic的队列个数和延时等级的个数是相同的

后面消息写入的过程和普通的又是一致的。

上面将消息写入延时队列中了,接下来就是处理延时队列中的消息,然后重新发送回原始topic的队列中。

在此之前先说明下至今还有疑问的一个个概念——delayLevel。这个概念和我们接下要需要用到的的类org.apache.rocketmq.store.schedule.ScheduleMessageService有关,这个类的字段delayLevelTable里面保存了具体的延时等级

private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);

看下这个字段的初始化过程

// org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel
public boolean parseDelayLevel() {
    HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
    // 每个延时等级延时时间的单位对应的ms数
    timeUnitTable.put("s", 1000L);
    timeUnitTable.put("m", 1000L * 60);
    timeUnitTable.put("h", 1000L * 60 * 60);
    timeUnitTable.put("d", 1000L * 60 * 60 * 24);

    // 延时等级在MessageStoreConfig中配置
    // private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
    try {
        // 根据空格将配置分隔出每个等级
        String[] levelArray = levelString.split(" ");
        for (int i = 0; i < levelArray.length; i++) {
            String value = levelArray[i];
            String ch = value.substring(value.length() - 1);
            // 时间单位对应的ms数
            Long tu = timeUnitTable.get(ch);

            // 延时等级从1开始
            int level = i + 1;
            if (level > this.maxDelayLevel) {
                // 找出最大的延时等级
                this.maxDelayLevel = level;
            }
            long num = Long.parseLong(value.substring(0, value.length() - 1));
            long delayTimeMillis = tu * num;
            this.delayLevelTable.put(level, delayTimeMillis);
    // 省略部分代码...
}

上面这个load方法在broker启动的时候DefaultMessageStore会调用来初始化延时等级。

接下来就应该解决怎么处理延时消息队列中的消息的问题了。处理延时消息的服务是:ScheduleMessageService。

还是broker启动的时候DefaultMessageStore会调用org.apache.rocketmq.store.schedule.ScheduleMessageService#start来启动处理延时消息队列的服务:

public void start() {

    for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
        Integer level = entry.getKey();
        Long timeDelay = entry.getValue();
        // 记录队列的处理进度
        Long offset = this.offsetTable.get(level);
        if (null == offset) {
            offset = 0L;
        }

        if (timeDelay != null) {
            // 每个延时队列启动一个定时任务来处理该队列的延时消息
            this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
        }
    }

    this.timer.scheduleAtFixedRate(new TimerTask() {

        @Override
        public void run() {
            try {
                // 持久化offsetTable(保存了每个延时队列对应的处理进度offset)
                ScheduleMessageService.this.persist();
            } catch (Throwable e) {
                log.error("scheduleAtFixedRate flush exception", e);
            }
        }
    }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}

DeliverDelayedMessageTimerTask是一个TimerTask,启动以后不断处理延时队列中的消息,直到出现异常则终止该线程重新启动一个新的TimerTask

public void executeOnTimeup() {
    // 找到该延时等级对应的ConsumeQueue
    ConsumeQueue cq =
        ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
            delayLevel2QueueId(delayLevel));
    // 记录异常情况下一次启动TimerTask开始处理的offset
    long failScheduleOffset = offset;

    if (cq != null) {
        // 找到offset所处的MappedFile中offset后面的buffer
        SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
        if (bufferCQ != null) {
            try {
                long nextOffset = offset;
                int i = 0;
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    // 下面三个字段信息是ConsumeQueue物理存储的信息
                    long offsetPy = bufferCQ.getByteBuffer().getLong();
                    int sizePy = bufferCQ.getByteBuffer().getInt();
                    // 注意这个tagCode,不再是普通的tag的hashCode,而是该延时消息到期的时间
                    long tagsCode = bufferCQ.getByteBuffer().getLong();
                    // 省略中间代码....
                    long now = System.currentTimeMillis();
                    // 计算应该投递该消息的时间,如果已经超时则立即投递
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
                    // 计算下一个消息的开始位置,用来寻找下一个消息位置(如果有的话)
                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                    // 判断延时消息是否到期
                    long countdown = deliverTimestamp - now;

                    if (countdown <= 0) {
                        MessageExt msgExt =
                            ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                offsetPy, sizePy);

                        if (msgExt != null) {
                            try {
                                // 将消息恢复到原始消息的格式,恢复topic、queueId、tagCode等,清除属性"DELAY"
                                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                PutMessageResult putMessageResult =
                                    ScheduleMessageService.this.defaultMessageStore
                                        .putMessage(msgInner);

                                if (putMessageResult != null
                                    && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                    // 投递成功,处理下一个
                                    continue;
                                } else {
                                    // XXX: warn and notify me
                                    log.error(
                                        "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                        msgExt.getTopic(), msgExt.getMsgId());
                                    // 投递失败,结束当前task,重新启动TimerTask,从下一个消息开始处理,也就是说当前消息丢弃
                                    // 更新offsetTable中当前队列的offset为下一个消息的offset
                                    ScheduleMessageService.this.timer.schedule(
                                        new DeliverDelayedMessageTimerTask(this.delayLevel,
                                            nextOffset), DELAY_FOR_A_PERIOD);
                                    ScheduleMessageService.this.updateOffset(this.delayLevel,
                                        nextOffset);
                                    return;
                                }
                            } catch (Exception e) {
                                // 重新投递期间出现任何异常,结束当前task,重新启动TimerTask,从当前消息开始重试
                                /*
                                 * XXX: warn and notify me
                                 */
                                log.error(
                                    "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                        + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                        + offsetPy + ",sizePy=" + sizePy, e);
                            }
                        }
                    } else {
                        ScheduleMessageService.this.timer.schedule(
                            new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                            countdown);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    }
                } // end of for
                // 处理完当前MappedFile中的消息后,重新启动TimerTask,从下一个消息开始处理
                // 更新offsetTable中当前队列的offset为下一个消息的offset
                nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                    this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                return;
            } finally {

                bufferCQ.release();
            }
        } // end of if (bufferCQ != null)
        else {
            // 如果根据offsetTable中的offset没有找到对应的消息(可能被删除了),则按照当前ConsumeQueue的最小offset开始处理
            long cqMinOffset = cq.getMinOffsetInQueue();
            if (offset < cqMinOffset) {
                failScheduleOffset = cqMinOffset;
                log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                    + cqMinOffset + ", queueId=" + cq.getQueueId());
            }
        }
    } // end of if (cq != null)

    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
        failScheduleOffset), DELAY_FOR_A_WHILE);
}

对于上面的tagCode做一下特别说明,延时消息的tagCode和普通消息不一样:

  • 延时消息的tagCode:存储的是消息到期的时间
  • 非延时消息的tagCode:tags字符串的hashCode

对延时消息的tagCode的特别处理是在下面这个方法中完成的,也就是在build ConsumeQueue信息的时候

org.apache.rocketmq.store.CommitLog#checkMessageAndReturnSize(java.nio.ByteBuffer, boolean, boolean)

总结

以上就是RocketMQ延时消息的实现方式,上面没有详说的是重试消息的延时是怎么实现的,其实就是在consumer将延时消息发送回broker的时候设置了(用户可以自己设置,如果没有自己设置默认是0)delayLevel,到了broker处理重试消息的时候如果delayLevel是0(也就是说是默认的延时等级)的时候会在原来的基础上加3,后面的处理就和上面说的延时消息一样了,存储的时候将消息投递到延时队列,等待延时到期后再重新投递到原始topic队列中等到consumer消费。

原文地址:https://www.cnblogs.com/sunshine-2015/p/9017426.html

时间: 2024-11-08 17:22:00

RocketMQ源码 — 九、 RocketMQ延时消息的相关文章

消息中间件 RocketMQ源码解析:事务消息

关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址 您对于源码的疑问每条留言都将得到认真回复.甚至不知道如何读源码也可以请教噢. 新的源码解析文章实时收到通知.每周更新一篇左右. 1. 概述 2. 事务消息发送 2.1 Producer 发送事务消息 2.2 Broker 处理结束事务请求 2.3 Broker 生成

rocketmq源码分析4-事务消息实现原理

为什么消息要具备事务能力 参见还是比较清晰的.简单的说 就是在你业务逻辑过程中,需要发送一条消息给订阅消息的人,但是期望是 此逻辑过程完全成功完成之后才能使订阅者收到消息.业务逻辑过程 假设是这样的:逻辑部分a-->发消息给MQ-->逻辑部分b假设我们在发送消息给MQ之后执行逻辑部分b时产生了异常,那如果MQ不具备事务消息能力时,订阅者也收到了消息.这是我们不希望见到的. 分布式事务基础概念 关于分布式事务.两阶段提交协议.三阶提交协议 理解分布式事务的两阶段提交2pc 分布式事务(一)两阶段

RocketMQ 源码分析

RocketMQ 源码分析 RocketMQ 的设计思想来自于Kafka,在具体设计时体现了自己的选择和需求,具体差别可以看RocketMQ与Kafka对比(18项差异).接下来记录下自己阅读源码的一些探索. RocketMQ的整体架构如下,可以看到各个组件充当的角色,Name Server 负责维护一些全局的路由信息:当前有哪些broker,每个Topic在哪个broker上等; Broker具体处理消息的存储和服务:生产者和消费者是消息的源头和归宿. 在知道各个角色的基本位置后,就该让程序跑

RocketMQ源码 — 六、 RocketMQ高可用(1)

高可用究竟指的是什么?请参考:关于高可用的系统 RocketMQ做了以下的事情来保证系统的高可用 多master部署,防止单点故障 消息冗余(主从结构),防止消息丢失 故障恢复(本篇暂不讨论) 那么问题来了: 怎么支持多broker的写? 怎么实现消息冗余? 下面分开说明这两个问题 多master集群 这里强调出master集群,是因为需要多个broker set,而一个broker set只有一个master(见下文的"注意"),所以是master集群 broker有三种角色:ASY

RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想

RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理.首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中. 官方版本未发布之前,从apache rocketmq第一个版本上线后,代码中存在与事务消息相关的代码,例如COMMIT.ROLLBACK.PREPARED,在事务消息未开源之前网上对于事务消息的"声音"基本上是使用类似二阶段提交,主要是根据消息系统标志MessageSysFlag中定义来推测的: TRANSACTION_P

RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚)

本文将重点分析RocketMQ Broker如何处理事务消息提交.回滚命令,根据前面的介绍,其入口EndTransactionProcessor#proce***equest: OperationResult result = new OperationResult();if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // @1result = this.brokerCont

RocketMQ源码解析-消息消费

RocketMQ源码解析-消息消费 1.消费者相关类 2.消费者的启动 3.消息的拉取 4.消费者的负载均衡 5.消息的消费 6.消费进度管理 看了很多遍的代码,还是决定动手把记录下来,梳理一下整体结构和实现细节,给大家一个参考,写的不好的地方请多多指教 RocketMQ中消息的消费分为2种方式,一种是pull模式,一种为push模式(基于pull模式实现),大部分的业务场合下业界用的比较多的是push模式,一句话你没有特殊需求就用push,push模式可以达到准实时的消息推送 那什么时候可以用

RocketMQ 源码分析(二) —— Message 存储

CommitLog 结构 CommitLog.MappedFileQueue.MappedFile 的关系如下: CommitLog : MappedFileQueue : MappedFile = 1 : 1 : N. 反应到系统文件如下: ··· Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd /Users/yunai/store/commitlog Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -l t

rocketmq源码解析之NamesrvController创建

说在前面 本次开始进行rocketmq源码解析,比较喜欢rocketmq的架构设计,rocketmq内嵌了namesrv注册中心保存了元数据,进行负载均衡.容错的一些处理,4.3以上支持消息事务,有管理控制台.命令行工具,底层namesrv与broker.client与server交互netty实现. 源码解析 创建NamesrvController,进入这个方法org.apache.rocketmq.namesrv.NamesrvStartup#main,再进入这个方法org.apache.r