一、MQ存储分类
文件系统:RocketMQ/Kafka/RabbitMQ
关系型数据库DB:ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化
分布式KV存储:ZeroMQ
对比:
存储效率, 文件系统>分布式KV存储>关系型数据库DB
易于实现和快速集成,关系型数据库DB>分布式KV存储>文件系统,但是性能会下降很多
二、RocketMQ存储概要
(一)存储文件
rocketmq
|--store
|-commitlog
| |-00000000000000000000
| |-00000000001073741824
|-config
| |-consumerFilter.json
| |-consumerOffset.json
| |-delayOffset.json
| |-subscriptionGroup.json
| |-topics.json
|-consumequeue
| |-SCHEDULE_TOPIX_XXX
| |-topicA
| |-topicB
| |-0
| |-1
| |-2
| |-3
| |-00000000000000000000
| |-00000000001073741824
|-index
| |-00000000000000000000
| |-00000000001073741824
|-abort
|-checkpoint
(二)对象封装
(1)CommitLog:
(2)ConsumeQueue:
(3)IndexFile:
(4)MappedFile:文件存储的直接内存映射业务抽象封装类,源码中通过操作该类,可以把消息字节写入内存映射缓存区(commit),或者原子性地将消息持久化的刷盘(flush);
(5)MapedFileQueue:对连续物理存储的抽象封装类,源码中可以通过消息存储的物理偏移量位置快速定位该offset所在MappedFile(具体物理存储位置的抽象)、创建、删除MappedFile等操作;
(6)MappedFileBuff:堆外内存
三、文件存储
(一)存储对象关系
(二) 文件存储对象间流程
RocketMQ使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。
此处有一个关键参数:reputFromOffset
消息允许重复:reputFromOffset
= commitlog的提交指针
消息不允许重复:reputFromOffset
= commitlog中内存的最大偏移量
commitDispacherBuildConsumeQueue(handler):构建消息消费队列
1、 根据消息主题和消息ID获取消息消费队列ConsumeQueue
2、 依次将消息的偏移量、消息长度、taghash写入ByteBufff,然后根据ConsumeQueueOffset计算出ConsumeQueue的物理地址,将内容追加到内存映射文件中
commitDispacherBuildIndex(handler):构建索引文件
这里有个配置项:messageIndexEnable,如果位true,则会构建索引文件
1、 创建或获取indexFile的最大物理偏移量,如果该消息的物理偏移量小于索引文件的物理偏移量,说明是重复消息,则忽略本次构建
2、 如果索引唯一键不为空,则添加到hash索引中
3、 构建索引列
(三)文件存储流程(以commitlog为例)
transientStorePoolEable开启堆外内存
流程:
同步刷盘:
为了避免同步刷盘消费任务与其他消息生产者提交的任务直接产生竞争锁,因此GroupCommitService提供了写容器和读容器,每次刷盘完毕后,两者会做身份交换。
异步刷盘:
Commitlog存储消息流程:
1、 消息写入,写指针往后移动
2、 异步提交commit(commitRealTimeService)
3、 内存映射更新写指针位置
4、 移动提交指针到上次提交时的写指针
5、 异步flush(FlushRealTimeService)
Commit(commitRealTimeService):
执行间隔时间
最小提交页数
两次执行最大实际间隔
执行条件:
1、 到执行时间(每200ms执行一次),如果提交页数大于最小提交页数,则提交
2、 距上次提交时间间隔超过了两次执行的最大执行间隔
执行成功,唤醒刷盘线程,flushRealTimeService
执行条件通提交线程
刷盘完成,更新checkpoint中刷盘时间点
Commit流程
1、 校验broker状态、角色、消息大小
2、 延迟队列的特殊处理
3、 获取当前可以写入的commitlog文件
4、 获取putMessageLock,准备写入(由此可见,写入时串行写入)
5、 设置消息的存储时间(如果没有文件,则创建一个新文件)
6、 将消息加载到MappedFile中
7、 创建全局唯一的消息ID
8、 获取消息在队列的偏移量计算消息
9、 机选消息总长度
10、
如果消息总长度+8>commitlog的空闲长度,则新建一个commitlog文件(8个长度表示文件剩余长度+魔数)
11、
将消息存到buff中(内存映射文件)
12、
更新消息队列偏移量
13、
释放putMessageLock
flush(flushRealTimeService)
参数:
等待方式(await/sleep)
线程运行的时间间隔
一次刷写最小页数
两次执行的最大间隔(10s)
流程:
1、 确认是否可以执行(与commit一致)
2、 将内存中数据写入磁盘(FileChannel中的force),更新checkpoint中commitlog文件刷盘时间戳
说明:checkpoint中commitlog文件刷盘时间戳刷盘在更新消息消费队列时触发。
四、文件恢复
(一)consumeQueue和Index恢复
1、判断上次退出是否时异常,如果时异常退出
2、加载延迟队列
3、判断commitlog文件大小是否与配置文件大小一致,如果不一致,删除commitlog文件,创建MappedFile对象
4、加载消息消费队列,构建consumeQueue对象
5、加载checkpoint
6、加载索引文件
如果上次异常退出且索引文件的上次刷盘时间小于索引文件的最大的消息时间戳,则立即销毁该文件
7、执行恢复策略
8、consumeQueue恢复后,在commitlog存储消息的逻辑偏移量
(二)正常退出文件恢复
1、从倒数第三个文件开始恢复,如果不足三个文件,则从第一个文件开始恢复
2、校验消息。
mappedFileOffset:校验通过的偏移量
processOffset:文件已确认的偏移量
(1)消息查找校验为true,且消息大小大于0,说明是正常消息存储,继续校验下一个消息
(2)消息查找校验为true,消息大小为0,说明是到了文件尾部,继续下一个文件
(3)消息查找校验为false,说明该文件未填满,结束循环处理(此处即为消息的偏移量)
3、更新MappedFileQueue中的刷盘指针和提交指针到offset
4、删除offset之后的所有文件
(1)offset > 文件尾部offse,说明是正常文件,忽略
(2)文件头部offset < offset < 文件尾部offset,说明offset在该文件偏移量内,设置MappedFile的commitPosition和flushPosition
(3)offset < 文件头部offset,说明是在有效文件之后创建的,删除(清理MappedFile占用的资源,删除物理文件)
(三)异常退出文件恢复
异常退出恢复的流程和正常退出文件恢复的流程基本一致,有两点差异:
1、 文件读取顺序
正常恢复:从倒数第三个文件开始,向后遍历
异常恢复:从最后一个开始,向前遍历到第一个正确存储的文件
2、 空文件夹处理
正常:无需处理
异常恢复:如果commitlog文件夹是空的,则删除消息消费队列下的所有文件
判断是否是正确文件:
1、 魔数判断
2、 文件的第一条消息长度为0,说明未存储消息
3、 对比文件第一条消息的offset,与checkpoint中(commitlog/consumeQueue/index)的刷盘时间对比
第一条消息offset < checkpoint中刷盘时间,说明是正确文件
4、 验证合法性,转发到MappedFile
5、 如果未找到MappedFile,重置commitPosition和flushPosition,销毁消息消费队列文件
五、文件删除
触发条件:
1、 每天凌晨执行定时任务(4点)
2、 磁盘不足
3、 手动触发(未封装)
磁盘不足:
每10ms查询一次磁盘是否充足,不充足,则调用文件删除
磁盘不足条件:
文件所在磁盘的最大使用量
磁盘使用率
磁盘使用率阈值
磁盘使用率预警值
1、 磁盘使用率大于预警阈值,建议立即清除文件
2、 磁盘使用率大于磁盘使用率阈值
删除条件:
1、文件保存时间
2、删除物理文件时间间隔
3、距第一次删除被拒绝可保留时间
删除过程:
从倒数第二个文件开始
1、 删除MappedFile所占用的资源
2、 删除MappedFile对应的文件
原文地址:https://www.cnblogs.com/liconglong/p/12545588.html