rocketmq的broker接收消息的时候,如何更新consumeQueue和indexfile的

首先解释下consumeQueue,由于commit-log是根据消息先后存储的,而我们消费的时候是根据topic来筛选的,所以需要一个队列根据topic来划分,所以consumeQueue就是干这个事情的。而indexfile顾名思义就是索引文件,用来做单纯查询的。

private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;

consumeQueue就是一个topic下面一个queueID的一个具体信息,用ConsumeQueue描述,其中ConsumeQueue跟commit-log一样,也就是用mappedFileQueue描述的,说明存储数据比较多。

indexFile是直接用mappedFile描述的。

这两类对象都是需要持久化的,他们都是通过public void doDispatch(DispatchRequest req)进行异步处理的,之所以异步是因为都是可以通过commit-log得到,所以不急着像cmmit-log一样急着落盘。dispatch方法只有两个地方用到:有新消息产生和故障恢复。故障恢复这里不提,后面提,这里只说新消息产生。

ReputMessageService线程:

每当有信息提交到mappedfile以后,那么这个线程通过判断:this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();

就知道有新消息产生,那么一直遍历这个mappedfile的buff,不停的取消息,然后一个消息就对应一个DispatchRequest,然后交给dispatch处理:

private void doReput() {
            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

                if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                    && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                    break;
                }

                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                if (result != null) {
                    try {
                        this.reputFromOffset = result.getStartOffset();

                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                            DispatchRequest dispatchRequest =
                                DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                            int size = dispatchRequest.getMsgSize();

                            if (dispatchRequest.isSuccess()) {
                                if (size > 0) {
                                    DefaultMessageStore.this.doDispatch(dispatchRequest);

                                    if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                        && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                        DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                            dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                            dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                            dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                    }

                                    this.reputFromOffset += size;
                                    readSize += size;

  

有三种dispatch:

    public void doDispatch(DispatchRequest req) {
        for (CommitLogDispatcher dispatcher : this.dispatcherList) {
            dispatcher.dispatch(req);
        }
    }

分别是consumeQueue、indexFile、bitmap,第三个默认用不到。

在这里可以构造一个新的consumeQueue:

public ConsumeQueue findConsumeQueue(String topic, int queueId) {
        ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
        if (null == map) {
            ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
            ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
            if (oldMap != null) {
                map = oldMap;
            } else {
                map = newMap;
            }
        }

        ConsumeQueue logic = map.get(queueId);
        if (null == logic) {
            ConsumeQueue newLogic = new ConsumeQueue(
                topic,
                queueId,
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
                this);
            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
            if (oldLogic != null) {
                logic = oldLogic;
            } else {
                logic = newLogic;
            }
        }

        return logic;
    }

  

然后用这个新的或者老的consumeQueue来处理这条消息:

在private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,final long cqOffset) 方法中用于处理这个request

        this.byteBufferIndex.flip();
        this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
        this.byteBufferIndex.putLong(offset);
        this.byteBufferIndex.putInt(size);
        this.byteBufferIndex.putLong(tagsCode);

        final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);

  这里的offset、size、tagscode、expectLogicOffset分别对应原始消息的全局物理offset、消息大小、tags、消费offset,这个消费offset就是commit-log的putMessage的:

  

            keyBuilder.setLength(0);
            keyBuilder.append(msgInner.getTopic());
            keyBuilder.append(‘-‘);
            keyBuilder.append(msgInner.getQueueId());
            String key = keyBuilder.toString();
            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
            if (null == queueOffset) {
                queueOffset = 0L;
                CommitLog.this.topicQueueTable.put(key, queueOffset);
            }

  CQ_STORE_UNIT_SIZE就是20,也就是consumeQueue里面只存对应的topic-queueid对应的消息,并且只存物理offset、tag、总体大小,这三个指标正好占了20比特。存贮位置可以直接通过消费offset乘以20得到。由于消息很多,所以用mappedFIledQueue来存储,具体用哪个Queue也可以通过消费offset拿到指定的mappedFile。

这里的topicQueueTable其实跟consumeQueue是可以转换得到的,后面说故障恢复的时候再说。

接下来看看处理indexfile的dispatch。

为了写入indexfile,我们首先需要拿到一个indexfile,在getAndCreateLastIndexFile方法中,在这里看到只有前一个indexfile文件完全写完以后,我们才能生成新的,并且用新线程进行刷盘操作。

看下写入的操作:

    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        if (this.indexHeader.getIndexCount() < this.indexNum) {
            int keyHash = indexKeyHashMethod(key);
            int slotPos = keyHash % this.hashSlotNum;
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            FileLock fileLock = null;

            try {

                // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
                // false);
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }

                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

                timeDiff = timeDiff / 1000;

                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }

                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;

                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }

                this.indexHeader.incHashSlotCount();
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);

                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }
            }
        } else {
            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
        }

        return false;
    }

  每个index文件只能存入指定数量的内容,如果满了返回false,创建新的indexfile。

  indexfile文件分布是:首先是文件头、然后是slot、最后是index内容。每个index内容大小是20字节,总共4个int、一个long。

  这里的key就是topic+uniqkey,然后做哈希,再对槽数取模得到slot的位置absslotpos,每个slot占用4字节,存储的就是当前this.indexHeader.getIndexCount()的序号(从1开始涨),这个数字this.indexHeader.getIndexCount()不能超过hashSlotNum,这个保证了indexFile是满的,不会超过也不会过小。

  

  index的绝对位置通过:int absIndexPos =    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize;然后在这个位置插入:

                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

  这里的slotValue其实就是上一个冲突slot位置对应的index-count,通过这个链表办法解决哈希冲突。在看下index存储的其他内容:哈希值、物理绝对位置、时间。

在看看如何使用index文件的:

    public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
        final long begin, final long end, boolean lock) {
        if (this.mappedFile.hold()) {
            int keyHash = indexKeyHashMethod(key);
            int slotPos = keyHash % this.hashSlotNum;
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            FileLock fileLock = null;
            try {
                if (lock) {
                    // fileLock = this.fileChannel.lock(absSlotPos,
                    // hashSlotSize, true);
                }

                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                // if (fileLock != null) {
                // fileLock.release();
                // fileLock = null;
                // }

                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                    || this.indexHeader.getIndexCount() <= 1) {
                } else {
                    for (int nextIndexToRead = slotValue; ; ) {
                        if (phyOffsets.size() >= maxNum) {
                            break;
                        }

                        int absIndexPos =
                            IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                                + nextIndexToRead * indexSize;

                        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);

                        long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                        int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);

                        if (timeDiff < 0) {
                            break;
                        }

                        timeDiff *= 1000L;

                        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                        boolean timeMatched = (timeRead >= begin) && (timeRead <= end);

                        if (keyHash == keyHashRead && timeMatched) {
                            phyOffsets.add(phyOffsetRead);
                        }

                        if (prevIndexRead <= invalidIndex
                            || prevIndexRead > this.indexHeader.getIndexCount()
                            || prevIndexRead == nextIndexToRead || timeRead < begin) {
                            break;
                        }

                        nextIndexToRead = prevIndexRead;
                    }
                }
            } catch (Exception e) {
                log.error("selectPhyOffset exception ", e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }

                this.mappedFile.release();
            }
        }
    }

  通过int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);就可以拿到槽冲突对应的index-count,继续找下去。

  为了防止冲突找到错误的信息,还加上了过滤条件:

if (keyHash == keyHashRead && timeMatched) {phyOffsets.add(phyOffsetRead);}

这里面还有一个checkpoint机制,对于indexfile,在load方法里面:

            if (!lastExitOK) {
                        if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()
                            .getIndexMsgTimestamp()) {
                            f.destroy(0);
                            continue;
                        }
                    }

  

endtimestamp是最后一次写入数据时间,右边是刷盘时间。如果右边小,说明不是一个完整文件。一个没有及时刷盘的文件肯定是一个空文件,因为index只有两个可能:空文件或者满文件,所以直接舍弃这个文件。

原文地址:https://www.cnblogs.com/notlate/p/12008333.html

时间: 2024-10-14 20:03:58

rocketmq的broker接收消息的时候,如何更新consumeQueue和indexfile的的相关文章

rocketmq的broker恢复commit-log的时候如何恢复consumeQueue、indexfile

如果一个broker正常退出,是会删除abort文件的.那么启动broker的时候发现abort文件还存在,那么说明上次是异常终止,会进入到commit-log的recoverAbnormally逻辑里面,因为所有其他的信息都是从commit-log获取到的,所以追根溯源只能从commit-log开始着手. public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) { // recover by the minimum time

RocketMQ源码解析-消息消费

RocketMQ源码解析-消息消费 1.消费者相关类 2.消费者的启动 3.消息的拉取 4.消费者的负载均衡 5.消息的消费 6.消费进度管理 看了很多遍的代码,还是决定动手把记录下来,梳理一下整体结构和实现细节,给大家一个参考,写的不好的地方请多多指教 RocketMQ中消息的消费分为2种方式,一种是pull模式,一种为push模式(基于pull模式实现),大部分的业务场合下业界用的比较多的是push模式,一句话你没有特殊需求就用push,push模式可以达到准实时的消息推送 那什么时候可以用

类比 RocketMq 和 淘宝消息服务:

rocketMq建立监听: 一个groupId下通常会挂载多个consumer实例. 集群订阅方式 (默认):一个监听到之后,另一个consumer实例就不会再监听到(不管在不在一个服务器上). 由于默认集群订阅方式,只能有一个监听到,所以,本地测试和服务器上topic不能一致,否则会影响服务器上监听不到消息.而topic 的不一致导致本地测试和服务端测试,groupID也不一致,但多台服务器上需要多个groupId, 淘宝消息服务: 通过SDK接受消息,Java接口使用说明: public i

RocketMQ重试机制和消息幂

一.重试机制 1.由于MQ经常处于复杂的分布式系统中,考虑网络波动,服务宕机,程序异常因素,很有可能出现消息发送或者消费失败的问题.因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点.如果没有消息重试,就可能产生消息丢失的问题,可能对系统产生很大的影响.所以,秉承宁可多发消息,也不可丢失消息的原则,大部分MQ都对消息重试提供了很好的支持. 2.RocketMQ为了使用者封装了消息重试的处理流程,无需开发人员手动处理.RocketMQ支持了生产端和消费端两类重试机制. 模拟异常 Consum

【iOS开发每日小笔记(四)】iOS 7中如何除去UIAlertView 规避delegate对象销毁后接收消息的crash

这篇文章是我的[iOS开发每日小笔记]系列中的一片,记录的是今天在开发工作中遇到的,可以用很短的文章或很小的demo演示解释出来的小心得小技巧.该分类的文章,内容涉及的知识点可能是很简单的.或是用很短代码片段就能实现的,但在我看来它们可能会给用户体验.代码效率得到一些提升,或是之前自己没有接触过的技术,很开心的学到了,放在这里得瑟一下.其实,90%的作用是帮助自己回顾.记忆.复习.如果看官觉得太easy,太碎片,则可以有两个选择:1,移步[iOS探究]分类,对那里的文章进行斧正:2,在本文的评论

微信公众平台开发学习系列(二):微信公众平台接收消息与发送消息

本篇主要介绍如何使用senparc来处理微信公众平台的接收消息与发送消息. 首先微信端会将用户的发送的信息以post请求发送到填写的url上,服务端代码如下: 1 [HttpPost] 2 public ActionResult Get(PostModel postModel) 3 { 4 var messageHandler = new CustomMessageHandler(Request.InputStream, postModel); 5 6 messageHandler.Execut

rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理

rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理

ActiveMQ实例1--简单的发送和接收消息

一.环境准备 1,官网http://activemq.apache.org/下载最新版本的ActiveMQ,并解压 2,打开对应的目录,在Mac环境下,一般可以运行命令: cd /Users/***/Downloads/apache-activemq-***/bin/macosx ./activemq start 3,启动成功后,登录http://localhose:8161/admin/,登陆账号和密码都为admin,创建一个queue名为jyQueue. 二.创建Eclipse项目 1,新建

ActiveMQ 部署及发送接收消息

一.           下载 下载地址:http://activemq.apache.org/ 我这里使用的版本为当前最新5.8.0. 下载版本有Windows和Linux两个版本,且都分为32位和64位.根据自己需要选择下载. 二.           安装 我这里下载的为windows的32位版本(apache-activemq-5.8.0-bin.zip),下载后直接解压到需要安装的目录或在直接解压到当前目录也可,解压完安装也完成. 解压后目录如上图,里面包含了示例和文档,及所有的jar