深入理解RocketMQ(四)--消息存储

一、MQ存储分类

文件系统:RocketMQ/Kafka/RabbitMQ

关系型数据库DBActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化

分布式KV存储:ZeroMQ

对比:

存储效率, 文件系统>分布式KV存储>关系型数据库DB

易于实现和快速集成,关系型数据库DB>分布式KV存储>文件系统,但是性能会下降很多

二、RocketMQ存储概要

(一)存储文件

rocketmq

|--store

|-commitlog

|      |-00000000000000000000

|      |-00000000001073741824

|-config

|      |-consumerFilter.json

|      |-consumerOffset.json

|      |-delayOffset.json

|      |-subscriptionGroup.json

|      |-topics.json

|-consumequeue

|      |-SCHEDULE_TOPIX_XXX

|      |-topicA

|      |-topicB

|             |-0

|             |-1

|             |-2

|             |-3

|                    |-00000000000000000000

|                    |-00000000001073741824

|-index

|      |-00000000000000000000

|      |-00000000001073741824

|-abort

|-checkpoint

(二)对象封装

(1)CommitLog
(2)ConsumeQueue

(3)IndexFile

(4)MappedFile:文件存储的直接内存映射业务抽象封装类,源码中通过操作该类,可以把消息字节写入内存映射缓存区(commit),或者原子性地将消息持久化的刷盘(flush);
(5)MapedFileQueue:对连续物理存储的抽象封装类,源码中可以通过消息存储的物理偏移量位置快速定位该offset所在MappedFile(具体物理存储位置的抽象)、创建、删除MappedFile等操作;

(6)MappedFileBuff:堆外内存

三、文件存储

(一)存储对象关系

(二) 文件存储对象间流程

RocketMQ使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。

此处有一个关键参数:reputFromOffset

消息允许重复:reputFromOffset
= commitlog的提交指针

消息不允许重复:reputFromOffset
= commitlog中内存的最大偏移量

commitDispacherBuildConsumeQueue(handler):构建消息消费队列

1、 根据消息主题和消息ID获取消息消费队列ConsumeQueue

2、 依次将消息的偏移量、消息长度、taghash写入ByteBufff,然后根据ConsumeQueueOffset计算出ConsumeQueue的物理地址,将内容追加到内存映射文件中

commitDispacherBuildIndex(handler):构建索引文件

这里有个配置项:messageIndexEnable,如果位true,则会构建索引文件

1、 创建或获取indexFile的最大物理偏移量,如果该消息的物理偏移量小于索引文件的物理偏移量,说明是重复消息,则忽略本次构建

2、 如果索引唯一键不为空,则添加到hash索引中

3、 构建索引列

(三)文件存储流程(以commitlog为例)

transientStorePoolEable开启堆外内存

流程:

同步刷盘:

为了避免同步刷盘消费任务与其他消息生产者提交的任务直接产生竞争锁,因此GroupCommitService提供了写容器和读容器,每次刷盘完毕后,两者会做身份交换。

异步刷盘:

Commitlog存储消息流程:

1、 消息写入,写指针往后移动

2、 异步提交commit(commitRealTimeService)

3、 内存映射更新写指针位置

4、 移动提交指针到上次提交时的写指针

5、 异步flush(FlushRealTimeService)

Commit(commitRealTimeService):

执行间隔时间

最小提交页数

两次执行最大实际间隔

执行条件:

1、 到执行时间(每200ms执行一次),如果提交页数大于最小提交页数,则提交

2、 距上次提交时间间隔超过了两次执行的最大执行间隔

执行成功,唤醒刷盘线程,flushRealTimeService

执行条件通提交线程

刷盘完成,更新checkpoint中刷盘时间点

Commit流程

1、 校验broker状态、角色、消息大小

2、 延迟队列的特殊处理

3、 获取当前可以写入的commitlog文件

4、 获取putMessageLock,准备写入(由此可见,写入时串行写入)

5、 设置消息的存储时间(如果没有文件,则创建一个新文件)

6、 将消息加载到MappedFile中

7、 创建全局唯一的消息ID

8、 获取消息在队列的偏移量计算消息

9、 机选消息总长度

10、        
如果消息总长度+8>commitlog的空闲长度,则新建一个commitlog文件(8个长度表示文件剩余长度+魔数)

11、        
将消息存到buff中(内存映射文件)

12、        
更新消息队列偏移量

13、        
释放putMessageLock

flush(flushRealTimeService)

参数:

等待方式(await/sleep)

线程运行的时间间隔

一次刷写最小页数

两次执行的最大间隔(10s)

流程:

1、 确认是否可以执行(与commit一致)

2、 将内存中数据写入磁盘(FileChannel中的force),更新checkpoint中commitlog文件刷盘时间戳

说明:checkpoint中commitlog文件刷盘时间戳刷盘在更新消息消费队列时触发。

四、文件恢复

(一)consumeQueue和Index恢复

1、判断上次退出是否时异常,如果时异常退出

2、加载延迟队列

3、判断commitlog文件大小是否与配置文件大小一致,如果不一致,删除commitlog文件,创建MappedFile对象

4、加载消息消费队列,构建consumeQueue对象

5、加载checkpoint

6、加载索引文件

如果上次异常退出且索引文件的上次刷盘时间小于索引文件的最大的消息时间戳,则立即销毁该文件

7、执行恢复策略

8、consumeQueue恢复后,在commitlog存储消息的逻辑偏移量

(二)正常退出文件恢复

1、从倒数第三个文件开始恢复,如果不足三个文件,则从第一个文件开始恢复

2、校验消息。

mappedFileOffset:校验通过的偏移量

processOffset:文件已确认的偏移量

(1)消息查找校验为true,且消息大小大于0,说明是正常消息存储,继续校验下一个消息

(2)消息查找校验为true,消息大小为0,说明是到了文件尾部,继续下一个文件

(3)消息查找校验为false,说明该文件未填满,结束循环处理(此处即为消息的偏移量)

3、更新MappedFileQueue中的刷盘指针和提交指针到offset

4、删除offset之后的所有文件

(1)offset > 文件尾部offse,说明是正常文件,忽略

(2)文件头部offset < offset < 文件尾部offset,说明offset在该文件偏移量内,设置MappedFile的commitPosition和flushPosition

(3)offset < 文件头部offset,说明是在有效文件之后创建的,删除(清理MappedFile占用的资源,删除物理文件)

(三)异常退出文件恢复

异常退出恢复的流程和正常退出文件恢复的流程基本一致,有两点差异:

1、 文件读取顺序

正常恢复:从倒数第三个文件开始,向后遍历

异常恢复:从最后一个开始,向前遍历到第一个正确存储的文件

2、 空文件夹处理

正常:无需处理

异常恢复:如果commitlog文件夹是空的,则删除消息消费队列下的所有文件

判断是否是正确文件:

1、 魔数判断

2、 文件的第一条消息长度为0,说明未存储消息

3、 对比文件第一条消息的offset,与checkpoint中(commitlog/consumeQueue/index)的刷盘时间对比

第一条消息offset < checkpoint中刷盘时间,说明是正确文件

4、 验证合法性,转发到MappedFile

5、 如果未找到MappedFile,重置commitPosition和flushPosition,销毁消息消费队列文件

五、文件删除

触发条件:

1、 每天凌晨执行定时任务(4点)

2、 磁盘不足

3、 手动触发(未封装)

磁盘不足:

每10ms查询一次磁盘是否充足,不充足,则调用文件删除

磁盘不足条件:

文件所在磁盘的最大使用量

磁盘使用率

磁盘使用率阈值

磁盘使用率预警值

1、 磁盘使用率大于预警阈值,建议立即清除文件

2、 磁盘使用率大于磁盘使用率阈值

删除条件:

1、文件保存时间

2、删除物理文件时间间隔

3、距第一次删除被拒绝可保留时间

删除过程:

从倒数第二个文件开始

1、 删除MappedFile所占用的资源

2、 删除MappedFile对应的文件

原文地址:https://www.cnblogs.com/liconglong/p/12545588.html

时间: 2024-11-06 19:26:01

深入理解RocketMQ(四)--消息存储的相关文章

Java反射理解(四)-- 获取成员变量构造函数信息

Java反射理解(四)-- 获取成员变量构造函数信息 步骤 获取成员变量信息: obj.getClass() 获取类类型对象 成员变量也是对象,java.lang.reflect.Field 类中封装了关于成员变量的操作: getFields() 方法获取的是所有的public的成员变量的信息 getDeclaredFields() 获取的是该类自己声明的成员变量的信息 getType() 得到成员变量的类型的类类型 getName() 得到成员变量的名称 获取成员变量构造函数信息: obj.g

【Android个人理解(四)】自定义Application类的使用

1.为什么要重写Application类 如果想在整个应用中使用全局变量,在java中一般是使用静态变量,public类型:而在android中如果使用这样的全局变量就不符合Android的框架架构,但是可以使用一种更优雅的方式就是使用Application context. 那么为什么这样的全局变量就不符合Android的框架架构? 众说纷纭,我理解的是static访问是无法跨进程的.Android中的Activity,Service是可以在各自进程中运行的,用static传递参数到不同进程的

深入理解ClassLoader(四)—类的父委托加载机制

上几次我们介绍到了JVM内部的几个类加载器,我们来重新画一下这个图,再来看一下他们之间的关系. JVM的ClassLoader采用的是树形结构,除了BootstrapClassLoader以外?每个ClassLoader都会有一个parentClassLoader,用户自定义的ClassLoader默认的parentClassLoader是SystemClassLoader,当然你可以自己指定需要用哪一个ClassLoader的实例,我们来看他的API 默认的无参构造方法使用的是SystemCl

深度学习浅层理解(四)-- 稀疏编码

借鉴前人的文章链接 http://blog.csdn.net/zouxy09/article/details/8777094 http://www.gene-seq.com/bbs/thread-2853-1-1.html http://ibillxia.github.io/blog/2012/09/26/convex-optimization-overview/ UFLDL教程 http://ufldl.stanford.edu/wiki/index.php/%E7%A8%80%E7%96%8

测试相关理解(四)边界值分析实例

1.现有一个学生标准化考试批阅试卷,产生成绩报告的程序.其规格说明如下:程序的输入文件由一些有80个字符的记录组成,如右图所示,所有记录分为3组: ②试卷各题标准答案记录:每个记录均在第80个字符处标以数字"2".该组的第一个记录的第1至第3个字符为题目编号(取值为1一999).第10至第59个字符给出第1至第50题的答案(每个合法字符表示一个答案).该组的第2,第3--个记录相应为第51至第100,第101至第150,-题的答案. ③每个学生的答卷描述:该组中每个记录的第80个字符均

openwrt上网配置的一些理解(四)

这次要解决的问题是3g上网和wan口上往可以随意切换,当然能够叠加也是好事,不过这不是我关心的.下面还是修改3个文件network,firewall,multiwan.首先在network中加入界面配置,这个在第一节中已有说明. config interface mobile option proto 3g option apn CMNET option device /dev/ttyUSB2 option service umts 接下来修改firewall,在wan域中加入mobile Op

RocketMQ源码学习--消息存储篇

1.序言 今天来和大家探讨一下RocketMQ在消息存储方面所作出的努力,在介绍RocketMQ的存储模型之前,可以先探讨一下MQ的存储模型选择. 2.MQ的存储模型选择 个人看来,从MQ的类型来看,存储模型分两种: 需要持久化(ActiveMQ,RabbitMQ,Kafka,RocketMQ) 不需要持久化(ZeroMQ) 本篇文章主要讨论持久化MQ的存储模型,因为现在大多数的MQ都是支持持久化存储,而且业务上也大多需要MQ有持久存储的能力,能大大增加系统的高可用性,下面几种存储方式: 分布式

(转)RocketMQ源码学习--消息存储篇

http://www.tuicool.com/articles/umQfMzA 1.序言 今天来和大家探讨一下RocketMQ在消息存储方面所作出的努力,在介绍RocketMQ的存储模型之前,可以先探讨一下MQ的存储模型选择. 2.MQ的存储模型选择 个人看来,从MQ的类型来看,存储模型分两种: 需要持久化(ActiveMQ,RabbitMQ,Kafka,RocketMQ) 不需要持久化(ZeroMQ) 本篇文章主要讨论持久化MQ的存储模型,因为现在大多数的MQ都是支持持久化存储,而且业务上也大

三次握手四次挥手简易理解

摘: 工作过程TCP标志位:TCP共有6个标志位,分别是: SYN(synchronous),建立联机.ACK(acknowledgement),确认.PSH(push),传输.FIN(finish),结束.RST(reset),重置.URG(urgent),紧急.图解三次握手和四次挥手的过程: 三次握手理解图 四次挥手理解图 三次握手建立连接阐述:第一次握手:客户端要和服务端进行通信,首先要告知服务端一声,遂发出一个SYN=1的连接请求信号,”服务端哥哥,我想给你说说话”. 第二次握手:当服务