RocketMQ 拉取消息-文件获取

看完了上一篇的《RocketMQ 拉取消息-通信模块》,请求进入PullMessageProcessor中,接着

PullMessageProcessor.processRequest(final ChannelHandlerContext ctx, RemotingCommand request)方法中调用了:

 final GetMessageResult getMessageResult =
                this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                        requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);

来从硬盘中获取消息体。

接着来看看DefaultMessageStore是如何从消息队列中获取消息的:

    /**
     * 获取消息结果
     * 所有的参数都是从requestheader中获取的。也就是说从consumer client端传递过来的。
     * @param group
     * @param topic
     * @param queueId
     * @param offset
     * @param maxMsgNums
     * @param subscriptionData
     * @return
     */
    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,
                                       final SubscriptionData subscriptionData) {
        if (this.shutdown) {
            log.warn("message store has shutdown, so getMessage is forbidden");
            return null;
        }

        if (!this.runningFlags.isReadable()) {
            log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
            return null;
        }

        long beginTime = this.getSystemClock().now();

        // 枚举变量,取消息结果
        GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
        // 当被过滤后,返回下一次开始的Offset
        long nextBeginOffset = offset;
        // 逻辑队列中的最小Offset
        long minOffset = 0;
        // 逻辑队列中的最大Offset
        long maxOffset = 0;

        GetMessageResult getResult = new GetMessageResult();

        final long maxOffsetPy = this.commitLog.getMaxOffset();

        //通过topic和queueid查找逻辑队列对象,相当于字典的目录,用来指定消息在物理文件commitlog上的位置
        ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
        if (consumeQueue != null) {
            minOffset = consumeQueue.getMinOffsetInQuque();
            maxOffset = consumeQueue.getMaxOffsetInQuque();

            if (maxOffset == 0) {
                status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                nextBeginOffset = nextOffsetCorrection(offset, 0);
            } else if (offset < minOffset) {
                status = GetMessageStatus.OFFSET_TOO_SMALL;
                nextBeginOffset = nextOffsetCorrection(offset, minOffset);
            } else if (offset == maxOffset) {
                status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
                nextBeginOffset = nextOffsetCorrection(offset, offset);
            } else if (offset > maxOffset) {
                status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                if (0 == minOffset) {
                    nextBeginOffset = nextOffsetCorrection(offset, minOffset);
                } else {
                    nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
                }
            } else {
                SelectMapedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
                if (bufferConsumeQueue != null) {
                    try {
                        status = GetMessageStatus.NO_MATCHED_MESSAGE;

                        long nextPhyFileStartOffset = Long.MIN_VALUE;
                        long maxPhyOffsetPulling = 0;

                        int i = 0;
                        final int MaxFilterMessageCount = 16000;
                        final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
                        for (; i < bufferConsumeQueue.getSize() && i < MaxFilterMessageCount; i += ConsumeQueue.CQStoreUnitSize) {
                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                            int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                            long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();

                            maxPhyOffsetPulling = offsetPy;

                            // 说明物理文件正在被删除
                            if (nextPhyFileStartOffset != Long.MIN_VALUE) {
                                if (offsetPy < nextPhyFileStartOffset)
                                    continue;
                            }

                            // 判断是否拉磁盘数据
                            boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
                            // 此批消息达到上限了
                            if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
                                    isInDisk)) {
                                break;
                            }

                            // 消息过滤
                            if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {
                                SelectMapedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
                                if (selectResult != null) {
                                    this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
                                    getResult.addMessage(selectResult);
                                    status = GetMessageStatus.FOUND;
                                    nextPhyFileStartOffset = Long.MIN_VALUE;
                                } else {
                                    if (getResult.getBufferTotalSize() == 0) {
                                        status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                                    }

                                    nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                                }
                            } else {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }

                                if (log.isDebugEnabled()) {
                                    log.debug("message type not matched, client: " + subscriptionData + " server: " + tagsCode);
                                }
                            }
                        }

                        if (diskFallRecorded) {
                            long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
                            brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
                        }

                        nextBeginOffset = offset + (i / ConsumeQueue.CQStoreUnitSize);

                        long diff = maxOffsetPy - maxPhyOffsetPulling;
                        long memory = (long) (StoreUtil.TotalPhysicalMemorySize
                                * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
                        getResult.setSuggestPullingFromSlave(diff > memory);
                    } finally {
                        // 必须释放资源
                        bufferConsumeQueue.release();
                    }
                } else {
                    status = GetMessageStatus.OFFSET_FOUND_NULL;
                    nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
                    log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
                            + maxOffset + ", but access logic queue failed.");
                }
            }
        }
        // 请求的队列Id没有
        else {
            status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
            nextBeginOffset = nextOffsetCorrection(offset, 0);
        }

        if (GetMessageStatus.FOUND == status) {
            this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
        } else {
            this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
        }
        long eclipseTime = this.getSystemClock().now() - beginTime;
        this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);

        getResult.setStatus(status);
        getResult.setNextBeginOffset(nextBeginOffset);
        getResult.setMaxOffset(maxOffset);
        getResult.setMinOffset(minOffset);
        return getResult;
    }

先来看一下获取消息队列的方法:

    public ConsumeQueue findConsumeQueue(String topic, int queueId) {
        ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
        if (null == map) {
            ConcurrentHashMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
            ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
            if (oldMap != null) {
                map = oldMap;
            } else {
                map = newMap;
            }
        }

        ConsumeQueue logic = map.get(queueId);
        if (null == logic) {
            ConsumeQueue newLogic = new ConsumeQueue(//
                    topic, //
                    queueId, //
                    StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
                    this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
                    this);
            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
            if (oldLogic != null) {
                logic = oldLogic;
            } else {
                logic = newLogic;
            }
        }

        return logic;
    }
时间: 2024-07-30 08:41:33

RocketMQ 拉取消息-文件获取的相关文章

kafka 消费者拉取消息

本文只跟踪消费者拉取消息的流程.对于 java 客户端, kafka 的生产者和消费者复用同一个网络 io 类 NetworkClient. 入口在 KafkaConsumer#pollOnce 中,抽出主要步骤: // 构造 FetchRequest 请求,将请求对象放入 unsent 集合,等待发送 fetcher.sendFetches(); // 取出 unsent 中的请求,调用 NetworkClient#send,NetworkClinet#poll client.poll(pol

.net MVC 微信公众号 点击菜单拉取消息时的事件推送

官方文档:https://mp.weixin.qq.com/wiki?t=resource/res_main&id=mp1421141016&token=&lang=zh_CN 业务描述:点击菜单,推送消息,消息内容为自定义的读取数据库信息之后,为用户推送的动态消息. 首先需要用代码实现自定义菜单,为对应的菜单指定click事件. 关于自定义菜单的创建及事件指定,请看上一篇文章,本篇主要介绍事件响应的实现. MVC controller 中的代码如下: public void Me

Python抓取远程文件获取真实文件名

用urllib下载远程文件并转存到hdfs服务器,在下载时,下载地址中不一定包含文件名,需要从连接信息中获取. 1 file_url = request.form.get('file_url') 2 fo = urllib.urlopen(file_url) 3 blob = fo.read() 4 file_size = len(blob) 5 if fo.info().has_key('Content-Disposition'): 6 file_name = fo.info()['Conte

rsync拉取远程文件

mkdir -p   /docsshpass -p ''pwd" rsync -avz -e 'ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no' --bwlimit=500 [email protected]_ip:/data/file   /doc

Python 拉取日志文件paramiko

paramiko的另一篇博文:http://467754239.blog.51cto.com/4878013/1619166 场景: 在游戏行业的集群中,日志分析或许是必不可少的,那么为了更方便的管理日志,就是统一存放日志,然后入库数据库 #!/usr/bin/env python #coding:utf8 from multiprocessing import Process from datetime import * import paramiko import string import

Kafka consumer消息的拉取及偏移的管理

消费者拉取消息并处理主要有4个步骤: 获取消费者所拉取分区的偏移位置OffsetFetchRequest(新的消息是从偏移位置开始的) 创建FetchReqeust,生成Map<Node, FetchRequest>,以消费者所拉取消息的节点为key来分组,所消费的TopicPartition的数据为value,并放入到unsent队列 调用poll方法实际发送请求给相应的node,如果返回成功,在onSecuss方法中,消息被保存在completedFetches中 从completedFe

K8s 从懵圈到熟练 – 镜像拉取这件小事

作者 | 声东 阿里云售后技术专家 导读:相比 K8s 集群的其他功能,私有镜像的自动拉取,看起来可能是比较简单的.而镜像拉取失败,大多数情况下都和权限有关.所以,在处理相关问题的时候,我们往往会轻松的说:这问题很简单,肯定是权限问题.但实际的情况是,我们经常为一个问题,花了多个人的时间却找不到原因.这主要还是我们对镜像拉取,特别是私有镜像自动拉取的原理理解不深.这篇文章,作者将带领大家讨论下相关原理. 顺序上来说,私有镜像自动拉取会首先通过阿里云 Acr credential helper 组

ansible-playbook基于role的配置一键安装zabbix客户端以及拉取自定义监控脚本

在IT工作中,您可能会一遍又一遍地执行相同的任务:没有人喜欢重复的任务.通过Ansible,IT管理员可以开始自动化日常任务中的苦差事.自动化解放了管理人员,专注于通过加快应用交付时间和建立在成功文化基础之上,为业务提供更多价值的努力.最终,Ansible为团队提供了他们永远无法获得足够的一件事:时间.让聪明的人专注于聪明的事情. Ansible是一种简单的自动化语言,可以完美地描述IT应用程序基础结构.它易于学习,自我记录,并且不需要毕业级的计算机科学学位来阅读.自动化不应该比它正在取代的任务

git只拉取github部分代码的方法

需求:github某个项目所有代码太大,有600+M,甚至更大:只需要拉取部分代码,一是可以降低网络消耗,二是可以降低磁盘占用 分析了下空间占用情况:发现每个项目下的.git/objects/pack都很大,这应该是git的机制,使用git就避免不了下载这个东西,所以降低网络消耗的需求暂时满足不了,后续再研究吧 ~/gitlab/druid/.git/objects/pack 方法一:参考 https://blog.csdn.net/u022812849/article/details/5302