RocketMQ源码 — 六、 RocketMQ高可用(1)

高可用究竟指的是什么?请参考:关于高可用的系统

RocketMQ做了以下的事情来保证系统的高可用

  • 多master部署,防止单点故障
  • 消息冗余(主从结构),防止消息丢失
  • 故障恢复(本篇暂不讨论)

那么问题来了:

  • 怎么支持多broker的写?
  • 怎么实现消息冗余?

下面分开说明这两个问题

多master集群

这里强调出master集群,是因为需要多个broker set,而一个broker set只有一个master(见下文的“注意”),所以是master集群

broker有三种角色:ASYNC_MASTER、SYNC_MASTER和SLAVE,这些角色常用的搭配为:

  1. ASYNC_MASTER、SLAVE:容许丢消息,但是要broker一直可用,master异步传输CommitLog到slave
  2. SYNC_MASTER、SLAVE:不允许丢消息,master同步传输CommitLog到slave
  3. ASYNC_MASTER:如果只是想简单部署则使用这种方式

master:负责消息的读写

slave:只负责读消息

SYNC_MASTER与ASYNC_MASTER的区别是sync会等待消息传输到slave才算消息写完成,而async不会同步等待,而是异步复制到slave

RocketMQ的架构图(原图地址

注意:在RocketMQ里面有一个概念broker set,一个broker set由一个master和多个slave组成,一个broker set内的每个broker的brokerName相同。

在broker集群中每个master相互之间是独立,master之间不会有交互,每个master维护自己的CommitLog、自己的ConsumeQueue,但是每一个master都有可能收到同一个topic下的producer发来的消息

为了支持多master集群,需要解决几个问题:

  • namesrv怎么管理broker
  • producer发送消息的时候知道发送到哪一个broker(为什么是master)

1. namesrv怎么管理broker

broker启动的时候会向namesrv注册自己的信息

// org.apache.rocketmq.broker.BrokerController#registerBrokerAll
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {
    TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();

    // 省略中间代码...
    RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(
        this.brokerConfig.getBrokerClusterName(),
        this.getBrokerAddr(),
        this.brokerConfig.getBrokerName(),
        this.brokerConfig.getBrokerId(),
        this.getHAServerAddr(),
        topicConfigWrapper,
        this.filterServerManager.buildNewFilterServerList(),
        oneway,
        this.brokerConfig.getRegisterBrokerTimeoutMills());
    // 省略中间代码...
}

信息中包括:

clusterName:broker 集群的名字,如:DefaultCluster

brokerAddr:broker的ip:port,如:192.168.0.102:10911

brokerName:注意这个字段,上面介绍过了,一个broker set中的brokerName是相同的,需要在部署的时候配置

brokerId:用来唯一标示一个broker set中的broker,master是0(org.apache.rocketmq.common.MixAll#MASTER_ID),slave是正整数

haServerAddr:haServer的ip:port,如:192.168.0.102:10912

topicConfigWrapper:是比较复杂的数据结构,主要包含了broker上所有的topic信息,如:

{
    "dataVersion": {
        "counter": 2,
        "timestamp": 1514252649572
    },
    "topicConfigTable": {
        "TopicTest": {
            "order": false,
            "perm": 6,
            "readQueueNums": 4,
            "topicFilterType": "SINGLE_TAG",
            "topicName": "TopicTest",
            "topicSysFlag": 0,
            "writeQueueNums": 4
        },
        "%RETRY%please_rename_unique_group_name_4": {
            "order": false,
            "perm": 6,
            "readQueueNums": 1,
            "topicFilterType": "SINGLE_TAG",
            "topicName": "%RETRY%please_rename_unique_group_name_4",
            "topicSysFlag": 0,
            "writeQueueNums": 1
        }
    }
}

上面包含了两个topic:TopicTest和%RETRY%please_rename_unique_group_name_4,相关字段的含义:

order:是否是顺序消息

perm:表明该topic的权限,可读(4)、可写(2)、可继承(1),通过位运算组合

readQueueNums:决定了consume消费的MessageQueue共有几个

writeQueueNums:决定了producer发送消息的MessageQueue共有几个

这些信息发送给namesrv之后,namesrv转化为自己的数据结构,namesrv处理broker注册的方法是:

// org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker
public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            // 省略中间代码...
            // 这里会判断只有master才会创建QueueData,因为只有master才包含了读写队列的信息
            // slave没有自己独立的读写队列信息(salve不会创建自己的queue信息),只是和master的的读写队列信息一致
            if (null != topicConfigWrapper
                && MixAll.MASTER_ID == brokerId) {
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                    || registerFirst) {
                    ConcurrentMap<String, TopicConfig> tcTable =
                        topicConfigWrapper.getTopicConfigTable();
                    if (tcTable != null) {
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                            // 这个方法创建了QueueData,QueueData包含broker set下的读写队列的信息
                            this.createAndUpdateQueueData(brokerName, entry.getValue());
                        }
                    }
                }
            }

            // 省略中间代码...
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    }

    return result;
}

上面涉及到的namesrv的几个重要数据结构

// 每个cluster下的broker set信息,一个brokerName对应的broker set
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// 每个broker set中的broker信息(set中有哪些broker,每个broker的brokerId和brokerAddr)
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// 每个broker的存活情况
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// 每个topic下的queue信息,包括每个broker set中读写队列的个数,consumer消费消息和producer发送消息的路由信息都从这个数据结构中获取
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;

所以,namesrv通过将broker注册来的信息构造成自己的数据结构:

  • 每个cluster有哪些broker set
  • 每个broker set包括哪些broker,brokerId和broker的ip:port
  • 每个broker的存活情况,根据每次broker上报来的信息,清除可能下线的broker
  • 每个topic的消息队列信息,几个读队列,几个写队列

namesrv汇总所有的broker的这些信息,然后供consumer和producer拉取

2. producer发送消息的时候知道发送到哪一个master

之前我们知道producer发送消息的时候发往哪一个broker是由MessageQueue决定的,所以我们先要搞清楚producer发送消息时候的MessageQueue怎么来的。producer维护了一个topicPublishInfoTable,里面包含了每个topic对应的MessageQueue,所以问题就变成了topicPublishInfoTable怎么构造的。

producer发送消息之前都会获取topic对应的队列信息,当topicPublishInfoTable中没有的时候会从namesrv获取,获取的方法如下:

// org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
    DefaultMQProducer defaultMQProducer) {
    try {
        if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                TopicRouteData topicRouteData;
                if (isDefault && defaultMQProducer != null) {
                    // 省略中间代码...
                } else {
                    // 从manesrv获取topic的路由信息,namesrv从topicQueueTable获取到该topic对应的所有的QueueData
                    // 然后将每个brokerName下的BrokerData返回
                    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                }
                      // 省略中间代码...
                      for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                            // 每个broker set下所有的broker地址(ip:port)
                            this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                        }
                        // Update Pub info
                        {
                            // 将从namesrv获取到的路由信息转换为TopicPublishInfo
                            // 期间会将没有master的broker set的queue信息去除
                            TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
    // 省略中间代码...
    } catch (InterruptedException e) {
        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
    }

    return false;
}

到此,producer也知道自己可以向哪些MessageQueue发送消息了,接下来就是producer的负载均衡算法选出其中一个MessageQueue发送消息(org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue,这个暂时不详表),MessageQueue包含的信息有topic、brokerName、queueId,但是producer发送的时候得知道broker的ip:port信息,而且一个brokerName对应的是一个broker set,并不能确定具体的broker,所以接下来应该找到具体的broker

// org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInPublish
public String findBrokerAddressInPublish(final String brokerName) {
    // 上面updateTopicRouteInfoFromNameServer方法将broker set下的broker地址信息保存到brokerAddrTable
    // 再次重申:一个broker set下的broker的brokerName相同
    HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
    if (map != null && !map.isEmpty()) {
        // 没有花样,就是直接返回brokerId时MixAll.MASTER_ID的broker的ip:port信息
        // 前面说过master的brokerId就是MixAll.MASTER_ID,所以获取到的broker是broker set中的master
        return map.get(MixAll.MASTER_ID);
    }

    return null;
}

终于真相大白,producer只会向是master的broker发送消息,也就是一个broker set中brokerId是0的broker。

producer只能发送消息到master,而不能发送到slave,这也说明了master负责读“写”,而slave只负责读(当然,这里只说明了“写”的部分,关于master 和slave的“读”下一篇介绍)。

总结

本篇介绍了RocketMQ究竟做了什么来实现作为一个消息队列中间件的高可用,由于篇幅会偏长,所以分为两篇文章来说明,下一篇说明文中遗留下的另一个问题——RocketMQ源码 — 六、 RocketMQ高可用(2)



参考:

关于高可用的系统

RocketMQ Architecture

RocketMQ源码 — 三、 Producer消息发送过程

原文地址:https://www.cnblogs.com/sunshine-2015/p/8994705.html

时间: 2024-07-28 22:17:11

RocketMQ源码 — 六、 RocketMQ高可用(1)的相关文章

RocketMQ 源码分析(三) —— 高可用

概述 本文主要解析 Namesrv.Broker 如何实现高可用,Producer.Consumer 怎么与它们通信保证高可用. Namesrv 高可用 启动多个 Namesrv 实现高可用. 相较于 Zookeeper.Consul.Etcd 等,Namesrv 是一个超轻量级的注册中心,提供命名服务. 2.1 Broker 注册到 Namesrv ?? 多个 Namesrv 之间,没有任何关系(不存在类似 Zookeeper 的 Leader/Follower 等角色),不进行通信与数据同步

RocketMQ 源码分析

RocketMQ 源码分析 RocketMQ 的设计思想来自于Kafka,在具体设计时体现了自己的选择和需求,具体差别可以看RocketMQ与Kafka对比(18项差异).接下来记录下自己阅读源码的一些探索. RocketMQ的整体架构如下,可以看到各个组件充当的角色,Name Server 负责维护一些全局的路由信息:当前有哪些broker,每个Topic在哪个broker上等; Broker具体处理消息的存储和服务:生产者和消费者是消息的源头和归宿. 在知道各个角色的基本位置后,就该让程序跑

RocketMQ源码解析-消息消费

RocketMQ源码解析-消息消费 1.消费者相关类 2.消费者的启动 3.消息的拉取 4.消费者的负载均衡 5.消息的消费 6.消费进度管理 看了很多遍的代码,还是决定动手把记录下来,梳理一下整体结构和实现细节,给大家一个参考,写的不好的地方请多多指教 RocketMQ中消息的消费分为2种方式,一种是pull模式,一种为push模式(基于pull模式实现),大部分的业务场合下业界用的比较多的是push模式,一句话你没有特殊需求就用push,push模式可以达到准实时的消息推送 那什么时候可以用

rocketmq源码解析之NamesrvController创建

说在前面 本次开始进行rocketmq源码解析,比较喜欢rocketmq的架构设计,rocketmq内嵌了namesrv注册中心保存了元数据,进行负载均衡.容错的一些处理,4.3以上支持消息事务,有管理控制台.命令行工具,底层namesrv与broker.client与server交互netty实现. 源码解析 创建NamesrvController,进入这个方法org.apache.rocketmq.namesrv.NamesrvStartup#main,再进入这个方法org.apache.r

RocketMQ 源码分析(二) —— Message 存储

CommitLog 结构 CommitLog.MappedFileQueue.MappedFile 的关系如下: CommitLog : MappedFileQueue : MappedFile = 1 : 1 : N. 反应到系统文件如下: ··· Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd /Users/yunai/store/commitlog Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -l t

RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想

RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理.首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中. 官方版本未发布之前,从apache rocketmq第一个版本上线后,代码中存在与事务消息相关的代码,例如COMMIT.ROLLBACK.PREPARED,在事务消息未开源之前网上对于事务消息的"声音"基本上是使用类似二阶段提交,主要是根据消息系统标志MessageSysFlag中定义来推测的: TRANSACTION_P

RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚)

本文将重点分析RocketMQ Broker如何处理事务消息提交.回滚命令,根据前面的介绍,其入口EndTransactionProcessor#proce***equest: OperationResult result = new OperationResult();if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // @1result = this.brokerCont

hadoop源码解读namenode高可靠:HA;web方式查看namenode下信息;dfs/data决定datanode存储位置

点击browserFilesystem,和命令查看结果一样 当我们查看hadoop源码时,我们看到hdfs下的hdfs-default.xml文件信息 我们查找${hadoop.tmp.dir}这是引用变量,肯定在其他文件有定义,在core-default.xml中查看到,这两个配置文件有个共同点: 就是不要修改此文件,但可以复制信息到core-site.xml和hdfs-site.xml中修改 usr/local/hadoop 是我存放hadoop文件夹的地方 几个关于namenode的重要文

Python源码剖析(高清版)PDF

Python源码剖析(高清版)PDF百度网盘链接:https://pan.baidu.com/s/1v0nalmMRYTJn1VTw-AHjxw 提取码:kjb4 复制这段内容后打开百度网盘手机App,操作更方便哦内容简介 · · · · · · 作为主流的动态语言,Python不仅简单易学.移植性好,而且拥有强大丰富的库的支持.此外,Python强大的可扩展性,让开发人员既可以非常容易地利用C/C++编写Python的扩展模块,还能将Python嵌入到C/C++程序中,为自己的系统添加动态扩展