消息消费队列和索引文件的更新

ConsumeQueue,IndexFile需要及时更新,否则无法及时被消费,根据消息属性查找消息也会出现较大延迟。

mq通过开启一个线程ReputMessageService来准时转发commitLog文件更新事件,相应的任务处理器根据转发的消息及时更新ConsumeQueue,IndexFile文件

DefaultMessageStore#start

ReputMessageService线程每执行一次任务推送休息1毫秒旧继续尝试推送消息到消息消费队列和索引文件。

返回reputFromOffset偏移量开始的全部有效数据,然后循环读取每一条消息。

在DefaultMessageStore的构造方法中:

topic:消息主题名称

queueId:消息队列ID

commitLogOffset:消息物理偏移量

msgSize:消息长度

tagsCode:消息过滤tag hashcode

storeTimestamp:消息存储时间戳

consumeQueueOffset:消息队列偏移量

key:消息索引

success:是否成功解析道完整的消息

uniqKey:消息唯一键

sysFlag:消息系统标记

preparetransactionOffset:消息预处理事务偏移量

propertiesMap:消息属性

bitMap:位图

根据消息主题与队列ID,先获取对应的ConsumeQueue文件。

最后会调用这个方法,一次将消息偏移量,消息长度,tag hashcode写入到ByteBuffer中,并根据consumeQueueOffset计算ConumeQueue中的物理地址,将内容追加到ConsumeQueue的内存映射文件中,ConsumeQueue的刷盘方式固定为异步刷盘模式

如果messageIndexEnable设置为true。

获取或创建IndexFile文件并获取所有文件最大的物理偏移量。如果该消息的物理偏移量小于索引文件中的物理偏移量,则说明是重复数据,忽略本次索引构建。

如果消息的唯一键不为空,则添加到Hash索引中,以便加速根据唯一键检索消息

构建索引键,mq支持为同一个消息建立多个索引,多个索引键空格分开。

原文地址:https://www.cnblogs.com/lccsblog/p/12235294.html

时间: 2024-11-05 15:55:52

消息消费队列和索引文件的更新的相关文章

kafka-Message、日志和索引文件、消费组、rebalance

记录下和kafka相关的Message.日志文件.索引文件.consumer记录消费的offset相关内容,文中很多理解参考文末博文.书籍还有前辈. kafka中的消息 kafka中的消息Message,在V1版本中是如下部分组成,主要关系key和value. (1)key:当需要将消息写入到某个topic下的指定partition分区时,需要给定key的值. (2)value:实际消息内容保存在这里. (3)其他均是消息的元数据,一般不用关心,对用户来说是透明的. 为了保存这些消息数据,kaf

kali linux 更换更新源,apt-get update报:已下载 25.9 MB,耗时 38秒 (677 kB/s) 正在读取软件包列表... 完成 W: 校验数字签名时出错。此仓库未被更新,所以仍然使用此前的索引文件。GPG 错误:http://mirrors.ustc.edu.cn/kali kali

正在读取软件包列表... 完成 W: 校验数字签名时出错.此仓库未被更新,所以仍然使用此前的索引文件.GPG 错误:http://mirrors.ustc.edu.cn/kali kali-rolling InRelease: 下列签名无效: EXPKEYSIG ED444FF07D8D0BF6 Kali Linux Repository <[email protected]> W: 无法下载 http://http.kali.org/kali/dists/kali-rolling/InRel

关于RocketMQ消息消费与重平衡的一些问题探讨

其实最好的学习方式就是互相交流,最近也有跟网友讨论了一些关于 RocketMQ 消息拉取与重平衡的问题,我姑且在这里写下我的一些总结. 关于 push 模式下的消息循环拉取问题 之前发表了一篇关于重平衡的文章:「Kafka 重平衡机制」,里面有说到 RocketMQ 重平衡机制是每隔 20s 从任意一个 Broker 节点获取消费组的消费 ID 以及订阅信息,再根据这些订阅信息进行分配,然后将分配到的信息封装成 pullRequest 对象 pull 到 pullRequestQueue 队列中

Java操作RabbitMQ添加队列、消费队列和三个交换机

一.发送消息到队列(生产者) 新建一个maven项目,在pom.xml文件加入以下依赖 <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> </dependencies>

Solr4.8.0源码分析(10)之Lucene的索引文件(3)

Solr4.8.0源码分析(10)之Lucene的索引文件(3) 1. .si文件 .si文件存储了段的元数据,主要涉及SegmentInfoFormat.java和Segmentinfo.java这两个文件.由于本文介绍的Solr4.8.0,所以对应的是SegmentInfoFormat的子类Lucene46SegmentInfoFormat. 首先来看下.si文件的格式 头部(header) 版本(SegVersion) doc个数(SegSize) 是否符合文档格式(IsCompoundF

Solr4.8.0源码分析(9)之Lucene的索引文件(2)

Solr4.8.0源码分析(9)之Lucene的索引文件(2) 一. Segments_N文件 一个索引对应一个目录,索引文件都存放在目录里面.Solr的索引文件存放在Solr/Home下的core/data/index目录中,一个core对应一个索引. Segments_N例举了索引所有有效的segments信息以及删除的具体信息,一个索引可以有多个Segments_N,但是有效的往往总是N最大的那个,为什么会出现多个segments_N,主要是由于暂时无法删除它们或者有indexwriter

sphinx增量索引和主索引来实现索引的实时更新

项目中文章的信息内容因为持续有新增,而文章总量的基数又比较大,所以做搜索的时候,用了主索引+增量索引这种方式来实现索引的实时更新. 实现原理: 1. 新建一张表,记录一下上一次已经创建好索引的最后一条记录的ID 2. 当索引时,然后从数据库中取出所有ID大于上面那个sphinx中的那个ID的数据, 这些就是新的数据,然后创建一个小的索引文件 3. 把上边我们创建的增量索引文件合并到主索引文件上去 4. 把最后一条记录的ID更新到第一步创建的表中 值得注意的两点: 1)当合并索引的时候,只是把增量

MongoDB索引文件破坏后导致查询错误的问题

问题描述: MongoDB在非正常情况下关闭时,可能会导致索引文件破坏,造成数据在更新时没有反映到索引上. 解决方案: 使用脚本,重建MongoDB所有表的索引. var names  = db.getCollectionNames(); for( var i in names ){     var name = names[i];     print(name);          var coll = db.getCollection(name);     coll.reIndex(); }

lucene的索引文件2

Lucene保存了从Index到Segment到Document到Field一直到Term的正向信息,也包括了从Term到Document映射的反向信息,还有其他一些Lucene特有的信息.下面对这三种信息一一介绍. 正向信息: index-->segments(segment,.gen,segment_N)-->Field(fnm,fdx,fdt)-->Term(tvx,tvd,tvf)segments.gen和segments_N保存的是段(segment)的元数据信息(metada