Apache RocketMQ之所以能在众多的消息中间件中脱颖而出,能吸引数千企业用户与RocketMQ的关键特性是分不开的,本文详细介绍RocketMQ中的关键特性。
一、过万的单机队列数
诸如Kafka之类的消息中间件,在队列数上升时性能会产生巨大的损失,RocketMQ之所以能单机支持上万的持久化队列与其独特的存储结构分不开。
如上图所示,所有的消息数据单独存储到一个Commit Log,完全顺序写,随机读。对最终用户展现的队列实际只存储消息在Commit Log的位置信息,并且串行方式刷盘。
这样做的好处如下:
队列轻量化,单个队列数据量非常少。
对磁盘的访问串行化,避免磁盘竟争,不会因为队列增加导致IOWAIT增高。
每个方案都有缺点,它的缺点如下:
写虽然完全是顺序写,但是读却变成了完全的随机读。
读一条消息,会先读Consume Queue,再读Commit Log,增加了开销。
要保证Commit Log与Consume Queue完全的一致,增加了编程的复杂度。
以上缺点如何克服:
随机读,尽可能让读命中PAGECACHE,减少IO读操作,所以内存越大越好。如果系统中堆积的消息过多,读数据要访问磁盘会不会由于随机读导致系统性能急剧下降,答案是否定的。
访问PAGECACHE时,即使只访问1k的消息,系统也会提前预读出更多数据,在下次读时,就可能命中内存。
随机访问Commit Log磁盘数据,系统IO调度算法设置为NOOP方式,会在一定程度上将完全的随机读变成顺序跳跃方式,而顺序跳跃方式读较完全的随机读性能会高5倍以上。
另外4k的消息在完全随机访问情况下,仍然可以达到8K次每秒以上的读性能。
由于Consume Queue存储数据量极少,而且是顺序读,在PAGECACHE预读作用下,Consume Queue的读性能几乎与内存一致,即使堆积情况下。所以可认为Consume Queue完全不会阻碍读性能。
Commit Log中存储了所有的元信息,包含消息体,类似于Mysql、Oracle的redolog,所以只要有Commit Log在,Consume Queue即使数据丢失,仍然可以恢复出来。
二、两种刷盘策略
RocketMQ的所有消息都是持久化的,先写入系统PAGECACHE,然后刷盘,可以保证内存与磁盘都有一份数据,访问时,直接从内存读取。
异步刷盘
在有RAID卡,SAS 15000转磁盘测试顺序写文件,速度可以达到300M每秒左右,而线上的网卡一般都为千兆网卡,写磁盘速度明显快于数据网络入口速度,那么是否可以做到写完内存就向用户返回,由后台线程刷盘呢?
由于磁盘速度大于网卡速度,那么刷盘的进度肯定可以跟上消息的写入速度。
万一由于此时系统压力过大,可能堆积消息,除了写入IO,还有读取IO,万一出现磁盘读取落后情况,会不会导致系统内存溢出,答案是否定的,原因如下:
写入消息到PAGECACHE时,如果内存不足,则尝试丢弃干净的PAGE,腾出内存供新消息使用,策略是LRU方式。
如果干净页不足,此时写入PAGECACHE会被阻塞,系统尝试刷盘部分数据,大约每次尝试32个PAGE,来找出更多干净PAGE。
综上,内存溢出的情况不会出现。
同步刷盘
同步刷盘与异步刷盘的唯一区别是异步刷盘写完PAGECACHE直接返回,而同步刷盘需要等待刷盘完成才返回,同步刷盘流程如下:
写入PAGECACHE后,线程等待,通知刷盘线程刷盘。
刷盘线程刷盘后,唤醒前端等待线程,可能是一批线程。
前端等待线程向用户返回成功。
三、多种消息查询手段
丰富的消息查询手段,帮助用户快速定位消息,排查问题,RocketMQ支持按Message Id查询、按Message Key查询等。
按照Message Id查询消息
如上图所示,MsgId总共16字节,包含消息存储主机地址,消息Commit Log offset。从MsgId中解析出Broker的地址和Commit Log的偏移地址,然后按照存储格式所在位置消息buffer解析成一个完整的消息。
按照Message Key查询消息
RocketMQ可以为每条消息指定Key,并根据建立高效的消息索引,索引逻辑结果如上图所示,查询过程如下:
根据查询的key的hashcode%slotNum得到具体的槽的位置(slotNum是一个索引文件里面包含的最大槽的数目,例如图中所示slotNum=5000000)。
根据slotValue(slot位置对应的值)查找到索引项列表的最后一项(倒序排列,slotValue总是指向最新的一个索引项)。
遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的32条记录)
Hash冲突;寻找key的slot位置时相当于执行了两次散列函数,一次key的hash,一次key的hash值取模,因此这里存在两次冲突的情况;第一种,key的hash值不同但模数相同,此时查询的时候会在比较一次key的hash值(每个索引项保存了key的hash值),过滤掉hash值不相等的项。第二种,hash值相等但key不等,出于性能的考虑冲突的检测放到客户端处理(key的原始值是存储在消息文件中的,避免对数据文件的解析),客户端比较一次消息体的key是否相同。
存储;为了节省空间索引项中存储的时间是时间差值(存储时间-开始时间,开始时间存储在索引文件头中),整个索引文件是定长的,结构也是固定的 。
四、消息过滤机制
RocketMQ的消息过滤方式有别于其他消息中间件,是在订阅时,再做过滤,先来看下Consume Queue的存储结构。
在Broker端进行Message Tag比对,先遍历Consume Queue,如果存储的Message Tag与订阅的Message Tag不符合,则跳过,继续比对下一个,符合则传输给Consumer。注意:Message Tag是字符串形式,Consume Queue中存储的是其对应的hashcode,比对时也是比对hashcode。
Consumer收到过滤后的消息后,同样也要执行在Broker端的操作,但是比对的是真实的Message Tag字符串,而不是Hashcode。
为什么过滤要这样做?
Message Tag存储Hashcode,是为了在Consume Queue定长方式存储,节约空间。
过滤过程中不会访问Commit Log数据,可以保证堆积情况下也能高效过滤。
即使存在Hash冲突,也可以在Consumer端进行修正,保证万无一失。
五、顺序消息
很多业务有顺序消息的需求,RocketMQ支持全局和局部的顺序,一般推荐使用局部顺序,将具有顺序要求的一类消息hash到同一个队列中便可保持有序,如下图所示。
但顺序消息,有自己的缺陷:
发送顺序消息无法利用集群FailOver特性
消费顺序消息的并行度依赖于队列数量
队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题
遇到消息失败的消息,无法跳过,当前队列消费暂停
目前,中间件团队正在攻克这些缺陷,很快将出现在新特性当中。
六、事务消息
事务消息特性介绍参考Aliware MQ的文档介绍。
七、定时消息
日常业务中有很多定时消息的场景,比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条 MQ 延时消息,这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单,如已完成支付则忽略。
RocketMQ为了实现定时消息,引入延时级别,牺牲部分灵活性,事实上很少有业务需要随意指定定时时间的灵活性。定时消息内容被存储在数据文件中,索引按延时级别堆积在定时消息队列中,具有跟普通消息一致的堆积能力,如下图所示。
八、总结
以上为用户比较关注的RocketMQ关键特性,RocketMQ中更多的技术将有专门的章节介绍,比如低延迟技术、高可用以及高可靠技术等。
原文地址:https://www.cnblogs.com/john8169/p/9180701.html