RocketMQ源码阅读

RocketMQ 是一款开源的消息中间件,采用Java实现,设计思想来自于Kafka(Scala实现),在具体设计时体现了自己的选择和需求,具体差别可以看RocketMQ与Kafka对比。接下来是自己阅读源码的一些探索。

RocketMQ的整体架构如下,可以看到各个组件充当的角色,Name Server 负责维护一些全局的路由信息:当前有哪些broker,每个Topic在哪个broker上; Broker具体处理消息的存储和服务;生产者和消费者是消息的源头和归宿。

一、Producer 发送消息

Producer发送消息是如何得知发到哪个broker的 ? 每个应用在收发消息之前,一般会调用一次producer.start()/consumer.start()做一些初始化工作,其中包括:创建需要的实例对象,如MQClientInstance;设置定时任务,如从Nameserver中定时更新本地的Topic route info,发送心跳信息到所有的 broker,动态调整线程池的大小,把当前producer加入到指定的组中等等。客户端会缓存路由信息TopicPublishInfo, 同时定期从NameServer取Topic路由信息,每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有的NameServer。Producer在发送消息的时候会去查询本地的topicPublishInfoTable(一个ConcurrentHashMap),如果没有命中的话就会询问NameServer得到路由信息 (RequestCode=GET_ROUTEINTO_BY_TOPIC) 如果nameserver中也没有查询到(表示该主题的消息第一次发送),那么将会发送一个default的topic进行路由查询。

具体过程如下图所示:
       Producer 在得到了具体的通信地址后,发送过程就显而易见了。通过代码可以看到在选择消息队列进行发送时采用随机方式,同时和上一次发送的broker保持不同,防止热点。

二、Broker处理来自Producer的消息

每个producer在发送消息的时候都和对应的Broker建立了长连接,此时broker已经准备好接收Message,Broker的SendMessageProcessor.sendMessage处理消息的存储,具体过程如下。接收到消息后,会先写入Commit Log文件(顺序写,写满了会新建一个新的文件),然后更新Consume queue文件(存储如何由topic定位到具体的消息)。

三、RocketMQ 存储特点

RocketMQ的消息采用顺序写到commitlog文件,然后利用consume queue文件作为索引,如图。RocketMQ采用零拷贝mmap+write的方式来回应Consumer的请求,RocketMQ宣称大部分请求都会在Page Cache层得到满足,所以消息过多不会因为磁盘读使得性能下降,这里自己的理解是,在64bit机器下,虚存地址空间(vm_area_struct)不是问题,所以相关的文件都会被映射到内存中(有定期删除文件的操作),即使此刻不在内存,操作系统也会因为缺页异常进行换入,虽然地址空间不是问题,但是一个进程映射文件的个数(/proc/sys/vm/max_map_count)是有限的,所以可能在这里发生OOM。

通过Broker中的存储目录(默认路径是 $HOME/store)也能看到存储的逻辑视图:

四、顺序消息是如何保证的?

需要业务层自己决定哪些消息应该顺序到达,然后发送的时候通过规则(hash)映射到同一个队列,因为没有谁比业务自己更加知道关于消息顺序的特点。这样的顺序是相对顺序,局部顺序,因为发送方只保证把这些消息顺序的发送到broker上的同一队列,但是不保证其他Producer也会发送消息到那个队列,所以需要Consumer在拉到消息后做一些过滤。

五、RocketMQ 刷盘实现

Broker 在消息的存取时直接操作的是内存(内存映射文件),这可以提供系统的吞吐量,但是无法避免机器掉电时数据丢失,所以需要持久化到磁盘中。刷盘的最终实现都是使用NIO中的 MappedByteBuffer.force() 将映射区的数据写入到磁盘,如果是同步刷盘的话,在Broker把消息写到CommitLog映射区后,就会等待写入完成。异步而言,只是唤醒对应的线程,不保证执行的时机,流程如图所示,更多细节可以参考

六、消息过滤

类似于重复数据删除技术(Data Deduplication),可以在源端做,也可以在目的端实现,就是网络和存储的权衡,如果在Broker端做消息过滤就需要逐一比对consume queue 的 tagsCode 字段(hashcode),如果符合则传输给消费者,因为是 hashcode,所以存在误判,需要在 Consumer 接收到消息后进行字符串级别的过滤,确保准确性。

小结

这次代码阅读主要着眼于消息的发送过程和Broker上的存储,其他方面的细节有待深入。

时间: 2024-10-14 22:40:45

RocketMQ源码阅读的相关文章

RocketMQ 源码分析

RocketMQ 源码分析 RocketMQ 的设计思想来自于Kafka,在具体设计时体现了自己的选择和需求,具体差别可以看RocketMQ与Kafka对比(18项差异).接下来记录下自己阅读源码的一些探索. RocketMQ的整体架构如下,可以看到各个组件充当的角色,Name Server 负责维护一些全局的路由信息:当前有哪些broker,每个Topic在哪个broker上等; Broker具体处理消息的存储和服务:生产者和消费者是消息的源头和归宿. 在知道各个角色的基本位置后,就该让程序跑

源码阅读技巧篇

转载请注明原创出处,谢谢! 说在前面 本人水平有限,下面的一些都是本人的思考与理解,如果有那里不对,希望各位大佬积极指出,欢迎在留言区进行评论交流.探讨. 主题 为什么要读源码 读什么样的源码 有什么技巧 思考.交流 坚持 为什么要源码 说到读源码,让我想起来了读书,古语有云:"读破万卷书,下笔如有神". 多读读大师的想法技巧 通过大量阅读进行积累 把一些零碎的知识点整合起来 就拿RocketMQ来说,它是如何实现高性能.高可用.之前写过高可用的一些思考和理解里面的特性他应该都满足,R

RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想

RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理.首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中. 官方版本未发布之前,从apache rocketmq第一个版本上线后,代码中存在与事务消息相关的代码,例如COMMIT.ROLLBACK.PREPARED,在事务消息未开源之前网上对于事务消息的"声音"基本上是使用类似二阶段提交,主要是根据消息系统标志MessageSysFlag中定义来推测的: TRANSACTION_P

CI框架源码阅读笔记3 全局函数Common.php

从本篇开始,将深入CI框架的内部,一步步去探索这个框架的实现.结构和设计. Common.php文件定义了一系列的全局函数(一般来说,全局函数具有最高的加载优先权,因此大多数的框架中BootStrap引导文件都会最先引入全局函数,以便于之后的处理工作). 打开Common.php中,第一行代码就非常诡异: if ( ! defined('BASEPATH')) exit('No direct script access allowed'); 上一篇(CI框架源码阅读笔记2 一切的入口 index

淘宝数据库OceanBase SQL编译器部分 源码阅读--生成逻辑计划

body, td { font-family: tahoma; font-size: 10pt; } 淘宝数据库OceanBase SQL编译器部分 源码阅读--生成逻辑计划 SQL编译解析三部曲分为:构建语法树,生成逻辑计划,指定物理执行计划.第一步骤,在我的上一篇博客淘宝数据库OceanBase SQL编译器部分 源码阅读--解析SQL语法树里做了介绍,这篇博客主要研究第二步,生成逻辑计划. 一. 什么是逻辑计划?我们已经知道,语法树就是一个树状的结构组织,每个节点代表一种类型的语法含义.如

JDK部分源码阅读与理解

本文为博主原创,允许转载,但请声明原文地址:http://www.coselding.cn/article/2016/05/31/JDK部分源码阅读与理解/ 不喜欢重复造轮子,不喜欢贴各种东西.JDK代码什么的,让整篇文章很乱...JDK源码谁都有,没什么好贴的...如果你没看过JDK源码,建议打开Eclipse边看源码边看这篇文章,看过的可以把这篇文章当成是知识点备忘录... JDK容器类中有大量的空指针.数组越界.状态异常等异常处理,这些不是重点,我们关注的应该是它的一些底层的具体实现,这篇

如何阅读Java源码 阅读java的真实体会

刚才在论坛不经意间,看到有关源码阅读的帖子.回想自己前几年,阅读源码那种兴奋和成就感(1),不禁又有一种激动. 源码阅读,我觉得最核心有三点:技术基础+强烈的求知欲+耐心. 说到技术基础,我打个比方吧,如果你从来没有学过Java,或是任何一门编程语言如C++,一开始去啃<Core Java>,你是很难从中吸收到营养的,特别是<深入Java虚拟机>这类书,别人觉得好,未必适合现在的你. 虽然Tomcat的源码很漂亮,但我绝不建议你一开始就读它.我文中会专门谈到这个,暂时不展开. 强烈

Memcache-Java-Client-Release源码阅读(之七)

一.主要内容 本章节的主要内容是介绍Memcache Client的Native,Old_Compat,New_Compat三个Hash算法的应用及实现. 二.准备工作 1.服务器启动192.168.0.106:11211,192.168.0.106:11212两个服务端实例. 2.示例代码: String[] servers = { "192.168.0.106:11211", "192.168.0.106:11212" }; SockIOPool pool =

源码阅读笔记 - 1 MSVC2015中的std::sort

大约寒假开始的时候我就已经把std::sort的源码阅读完毕并理解其中的做法了,到了寒假结尾,姑且把它写出来 这是我的第一篇源码阅读笔记,以后会发更多的,包括算法和库实现,源码会按照我自己的代码风格格式化,去掉或者展开用于条件编译或者debug检查的宏,依重要程度重新排序函数,但是不会改变命名方式(虽然MSVC的STL命名实在是我不能接受的那种),对于代码块的解释会在代码块前(上面)用注释标明. template<class _RanIt, class _Diff, class _Pr> in