RocketMQ 源码分析

RocketMQ 源码分析



RocketMQ 的设计思想来自于Kafka,在具体设计时体现了自己的选择和需求,具体差别可以看RocketMQ与Kafka对比(18项差异)。接下来记录下自己阅读源码的一些探索。

  1. RocketMQ的整体架构如下,可以看到各个组件充当的角色,Name Server 负责维护一些全局的路由信息:当前有哪些broker,每个Topic在哪个broker上等; Broker具体处理消息的存储和服务;生产者和消费者是消息的源头和归宿。

    在知道各个角色的基本位置后,就该让程序跑起来,这样才能看的更真切,RocketMQ依赖很少,可以很容易的在本地部署,首先要设置环境变量ROCKETMQ_HOME,这是RocketMQ需要的工作目录。nameserver 首先启动,这样才会监听(监听端口为9876)来自Broker的连接,Broker启动的时候指定nameserver,否则会到某个服务器寻找可用的nameserver列表,然后可以运行rocketmq-example中的例子进行简单的测试。

  2. Name server短小精悍,协调全局

    从代码可以看到主要是初始化服务端通信层和线程执行组件, 然后启动ServerBootstrap启动。通过jstack命令你可以看到启动了哪些线程。

  3. Producer发送消息是如何得知发到哪个broker的 ?

    每个应用在收发消息之前,一般会调用一次producer.start()/consumer.start(),幕后的工作就是:创建需要的实例对象,如MQClientInstance;设置定时任务,如从Nameserver中定时更新本地的Topic route info,发送心跳信息到所有的broker,动态调整线程池的大小,等等;把当前producer加入到指定的组中。客户端会缓存路由信息TopicPublishInfo, 同时定期从NameServer取Topic路由信息,每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有的NameServer。Producer在发送消息的时候会去查询本地的topicPublishInfoTable(一个HashMap),如果没有命中的话就会与NameServer沟通得到路由信息,RequestCode=GET_ROUTEINTO_BY_TOPIC 如果nameserver中也木有查询到,那么将会发送一个default的topic进行路由查询。具体过程如下图所示:

  4. Producer发送消息的具体过程,以同步模式为例。

    得到了通信地址,发送过程就显而易见了。可以看到在选择消息队列进行发送时采用随机方式,同时和上一次发送的broker保持不同,防止热点。

  5. Broker是如何接收来自Producer的消息呢?

    每个producer咋发送消息的时候都和对应的Broker建立了长连接,此时broker已经准备好接收Message,具体过程如下。接收到消息后,会先写入Commit Log文件(顺序写,写满了会新建一个新的文件),然后更新Consume queue文件(存储如何由topic定位到具体的消息)。

  6. RocketMQ 存储特点

    RocketMQ的消息采用顺序写到commitlog文件,然后利用consume queue文件作为索引,如图。RocketMQ采用零拷贝mmap+write的方式来回应Consumer的请求,RocketMQ宣称大部分请求都会在Page Cache层得到满足,所以消息过多不会因为磁盘读使得性能下降,这里自己的理解是,在64bit机器下,虚存地址空间(vm_area_struct)不是问题,所以相关的文件都会被映射到内存中(有定期删除文件的操作),即使此刻不在内存,操作系统也会因为缺页异常进行换入,虽然地址空间不是问题,但是一个进程映射文件的个数是有限的,所以可能在这里发生OOM。

    传统读写和两种零拷贝的简单对比如下,其实就是CPU和IO的权衡。

    通过Broker中的存储目录也能看到上述表述的体现:

  7. 顺序消息是如何保证的?

    需要业务层自己觉得哪些消息应该顺序保证,然后发送的时候通过规则映射到同一个队列,因为没有谁比业务自己更加知道关于消息顺序的特点。这样的顺序是相对顺序,局部顺序,因为发送方只保证把这些消息顺序的发送到broker上的同一队列,但是不保证其他Producer也会发送到那个队列,所以需要Consumer在拉到消息后做一些过滤。

  8. RocketMQ 刷盘实现

    刷盘的最终实现都是使用NIO中的MappedByteBuffer.force()将映射区的数据写入到磁盘,如果是同步刷盘的话,在Broker把消息写到CommitLog映射区后,就会等待写入完成。异步而言,只是唤醒对应的线程,不保证执行的时机,流程如下。

  9. 消息过滤

    类似于重复数据删除技术,可以在源端做,也可以在目的端实现,就是网络和存储的权衡,如果在Broker端做消息过滤就需要逐一比对consume queue的tagsCode字段(hashcode),如果符合则传输给消费者,因为是hashcode,所以存在误判,需要在Consumer接收到消息后进行字符串级别的过滤,确保准确性。

总结:阅读这个代码也花了好几天时间,虽然在分布式可靠性方面还未仔细研究,主要看了关键的设计思想和流程,有很多东西要不断的沉淀和消化,特别是多线程同步,异步等。

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-12 03:31:18

RocketMQ 源码分析的相关文章

RocketMQ 源码分析(二) —— Message 存储

CommitLog 结构 CommitLog.MappedFileQueue.MappedFile 的关系如下: CommitLog : MappedFileQueue : MappedFile = 1 : 1 : N. 反应到系统文件如下: ··· Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd /Users/yunai/store/commitlog Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -l t

RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想

RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理.首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中. 官方版本未发布之前,从apache rocketmq第一个版本上线后,代码中存在与事务消息相关的代码,例如COMMIT.ROLLBACK.PREPARED,在事务消息未开源之前网上对于事务消息的"声音"基本上是使用类似二阶段提交,主要是根据消息系统标志MessageSysFlag中定义来推测的: TRANSACTION_P

RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚)

本文将重点分析RocketMQ Broker如何处理事务消息提交.回滚命令,根据前面的介绍,其入口EndTransactionProcessor#proce***equest: OperationResult result = new OperationResult();if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // @1result = this.brokerCont

rocketmq源码分析3-consumer消息获取

使用rocketmq的大体消息发送过程如下: 在前面已经分析过MQ的broker接收生产者客户端发过来的消息的过程,此文主要讲述订阅者获取消息的过程,或者说broker是怎样将消息传递给消费者客户端的,即上面时序图中拉取消息(pull message)动作.. 1. 如何找到入口(MQ-broker端) 分析一个机制或者功能时,我们首先希望的是找到入口,前一篇我们是通过端口号方式顺藤摸瓜的方式找到了入口.但是此篇略微不同,涉及到consumer客户端与broker的两边分析,最终发现逻辑还是比较

rocketmq源码分析4-事务消息实现原理

为什么消息要具备事务能力 参见还是比较清晰的.简单的说 就是在你业务逻辑过程中,需要发送一条消息给订阅消息的人,但是期望是 此逻辑过程完全成功完成之后才能使订阅者收到消息.业务逻辑过程 假设是这样的:逻辑部分a-->发消息给MQ-->逻辑部分b假设我们在发送消息给MQ之后执行逻辑部分b时产生了异常,那如果MQ不具备事务消息能力时,订阅者也收到了消息.这是我们不希望见到的. 分布式事务基础概念 关于分布式事务.两阶段提交协议.三阶提交协议 理解分布式事务的两阶段提交2pc 分布式事务(一)两阶段

RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态为UN_KNOW时,第二次提交到服务器时将不会做任何操作,也就是说此时消息还存在与RMQ_SYS_TRANS_HALF_TOPIC主题中,并不能被消息消费者消费,那这些消息最终如何被提交或回滚呢? 原来RocketMQ使用TransactionalMessa

rocketmq源码分析1-benchmark学习

benchmark 分析 组成部分 三个java类,都含有main方法,可选的传递一些参数,诸如测试线程数量,消息体积大小.三个类分别用于测试普通生产者,事务生产者,消费者.生产者 默认64个测试线程 1280byte消息大小. 测试指标 普通生产者 这段时间内 每秒发送成功了多少条 采样时间断内 发送消息最大耗时 每条耗时多少毫秒 发送请求失败条数 接收响应失败条数 消费者 每秒消费多少条 产生到消费的平均时间差 存储到现在的平均时间差 产生到消费的最大时间差 存储到现在的最大时间差 代码分析

RocketMQ 源码分析(三) —— 高可用

概述 本文主要解析 Namesrv.Broker 如何实现高可用,Producer.Consumer 怎么与它们通信保证高可用. Namesrv 高可用 启动多个 Namesrv 实现高可用. 相较于 Zookeeper.Consul.Etcd 等,Namesrv 是一个超轻量级的注册中心,提供命名服务. 2.1 Broker 注册到 Namesrv ?? 多个 Namesrv 之间,没有任何关系(不存在类似 Zookeeper 的 Leader/Follower 等角色),不进行通信与数据同步

源码分析 Kafka 消息发送流程(文末附流程图)

温馨提示:本文基于 Kafka 2.2.1 版本.本文主要是以源码的手段一步一步探究消息发送流程,如果对源码不感兴趣,可以直接跳到文末查看消息发送流程图与消息发送本地缓存存储结构. 从上文 初识 Kafka Producer 生产者,可以通过 KafkaProducer 的 send 方法发送消息,send 方法的声明如下: Future<RecordMetadata> send(ProducerRecord<K, V> record) Future<RecordMetada