【原创】《从0开始学RocketMQ》—集群搭建

用两台服务器,搭建出一个双master双slave、无单点故障的高可用 RocketMQ 集群。此处假设两台服务器的物理 IP 分别为:192.168.50.1、192.168.50.2。

内容目录

1. 启动 NameServer 集群

2. 启动 Broker 集群

3. RocketMQ 可视化管理控制台:rocketmq-console

4. 集群测试

1. 启动 NameServer 集群

在两台服务器上分别启动 NameServer,可以得到一个无单点故障的 NameServer 服务,服务地址分别为:192.168.50.1:9876、192.168.50.2:9876。

2. 启动 Broker 集群

修改 Broker 配置文件,以使每台服务器上都可以启动一个 Master 角色 的 Broker 和 一个Slave 角色的 Broker。
首先找到 Broker 配置文件,此处我们搭建一个同步双写模式的集群,所以需要修改 2m-2s-sync 目录下的 broker 配置文件:

[[email protected]157-89 ~]# cd /usr/local/rocketmq-all-4.3.2-bin-release/conf/
[[email protected]157-89 conf]# ls
2m-2s-async  2m-2s-sync  2m-noslave  broker.conf  logback_broker.xml  logback_namesrv.xml  logback_tools.xml
[[email protected]157-89 conf]# cd 2m-2s-sync/
[[email protected]157-89 2m-2s-sync]# ls
broker-a.properties  broker-a-s.properties  broker-b.properties  broker-b-s.properties

1) 修改 192.168.50.1 服务器上的 broker-a.properties 为 Master 角色的 Broker:

namesrvAddr=192.168.50.1:9876;192.168.50.2:9876
brokerClusterName=rocketMqCluster
brokerIP1=192.168.50.1
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=120
mapedFileSizeConsumeQueue=500000
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
storePathRootDir=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a
storePathCommitLog=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a/commitlog
storePathConsumeQueue=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a/consumequeue
storePathIndex=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a/index
storeCheckpoint=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a/checkpoint
abortFile=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a/abort

2) 修改 192.168.50.2 服务器上的 broker-b.properties 为 Master 角色的 Broker:

namesrvAddr=192.168.50.1:9876;192.168.50.2:9876
brokerClusterName=rocketMqCluster
brokerIP1=192.168.50.2
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=120
mapedFileSizeConsumeQueue=500000
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
storePathRootDir=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b
storePathCommitLog=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b/commitlog
storePathConsumeQueue=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b/consumequeue
storePathIndex=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b/index
storeCheckpoint=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b/checkpoint
abortFile=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b/abort

3) 修改 192.168.50.1 服务器上的 broker-b-s.properties 为 Slave 角色的 Broker:

namesrvAddr=192.168.50.1:9876;192.168.50.2:9876
brokerClusterName=rocketMqCluster
brokerIP1=192.168.50.1
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=120
mapedFileSizeConsumeQueue=500000
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10921
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
storePathRootDir=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s
storePathCommitLog=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s/commitlog
storePathConsumeQueue=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s/consumequeue
storePathIndex=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s/index
storeCheckpoint=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s/checkpoint
abortFile=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-b-s/abort

4) 修改 192.168.50.2 服务器上的 broker-a-s.properties 为 Slave 角色的 Broker:

namesrvAddr=192.168.50.1:9876;192.168.50.2:9876
brokerClusterName=rocketMqCluster
brokerIP1=192.168.50.2
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10921
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
storePathRootDir=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s
storePathCommitLog=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s/commitlog
storePathConsumeQueue=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s/consumequeue
storePathIndex=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s/index
storeCheckpoint=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s/checkpoint
abortFile=/usr/local/rocketmq-all-4.3.2-bin-release/data/broker-a-s/abort

一台服务器上启动多个Broker 时,需指定不同的端口号,记得防火墙放开 NameServer 和 Broker 中用到的端口号哦~

分别启动四个 Broker:

nohup sh bin/mqbroker -c broker_config_file &

3. RocketMQ 可视化管理控制台:rocketmq-console

在服务器 192.168.50.1 上安装即可,无需集群

[[email protected]153-215 local]# git clone https://github.com/apache/rocketmq-externals.git
Cloning into ‘rocketmq-externals‘...
remote: Enumerating objects: 10, done.
remote: Counting objects: 100% (10/10), done.
remote: Compressing objects: 100% (10/10), done.
remote: Total 9425 (delta 2), reused 1 (delta 0), pack-reused 9415
Receiving objects: 100% (9425/9425), 11.86 MiB | 232.00 KiB/s, done.
Resolving deltas: 100% (4235/4235), done.
[[email protected]153-215 local]# cd rocketmq-externals/
[[email protected]153-215 rocketmq-externals]# ls
dev  README.md  rocketmq-console  rocketmq-docker  rocketmq-flink  rocketmq-flume  rocketmq-hbase  rocketmq-iot-bridge  rocketmq-jms  rocketmq-mysql  rocketmq-php  rocketmq-redis  rocketmq-sentinel  rocketmq-serializer  rocketmq-spark
[[email protected]153-215 rocketmq-externals]# git branch
* master
[[email protected]153-215 rocketmq-externals]# git fetch origin release-rocketmq-console-1.0.0
From https://github.com/apache/rocketmq-externals
 * branch            release-rocketmq-console-1.0.0 -> FETCH_HEAD
[[email protected]153-215 rocketmq-externals]# git checkout -b release-1.0.0 origin/release-rocketmq-console-1.0.0
Branch ‘release-1.0.0‘ set up to track remote branch ‘release-rocketmq-console-1.0.0‘ from ‘origin‘.
Switched to a new branch ‘release-1.0.0‘
[[email protected]153-215 rocketmq-externals]# ls
README.md  rocketmq-console
[[email protected]153-215 rocketmq-externals]# ls rocketmq-console/
doc  LICENSE  NOTICE  pom.xml  README.md  src  style
[[email protected]153-215 rocketmq-externals]# vim rocketmq-console/src/main/resources/application.properties 

编辑 application.properties:

server.contextPath=/rocketmq
server.port=8080
#spring.application.index=true
spring.application.name=rocketmq-console
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
logging.config=classpath:logback.xml
#if this value is empty,use env value rocketmq.config.namesrvAddr  NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
rocketmq.config.namesrvAddr=192.168.50.1:9876;192.168.50.2:9876
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
#rocketmq-console‘s data path:dashboard/monitor
rocketmq.config.dataPath=/tmp/rocketmq-console/data
#set it false if you don‘t want use dashboard.default true
rocketmq.config.enableDashBoardCollect=true

移动 rocketmq-console 所在目录,编译并启动 rocketmq-console:

[[email protected]153-215 rocketmq-console]# mv /usr/local/rocketmq-externals/rocketmq-console /usr/local/rocketmq-console
[[email protected]153-215 rocketmq-console]# cd /usr/local/rocketmq-console/
[[email protected]153-215 rocketmq-console]# ls
doc  LICENSE  NOTICE  pom.xml  README.md  src  style
[[email protected]153-215 rocketmq-console]# mvn clean package -Dmaven.test.skip=true
........
[INFO] Building jar: /usr/local/rocketmq-console/target/rocketmq-console-ng-1.0.0-sources.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 02:54 min
[INFO] Finished at: 2019-01-11T17:02:34+08:00
[INFO] ------------------------------------------------------------------------
[[email protected]153-215 rocketmq-console]# ls
doc  LICENSE  NOTICE  pom.xml  README.md  src  style  target
[[email protected]153-215 rocketmq-console]# ls target/
checkstyle-cachefile  checkstyle-checker.xml  checkstyle-result.xml  classes  generated-sources  maven-archiver  maven-status  rocketmq-console-ng-1.0.0.jar  rocketmq-console-ng-1.0.0.jar.original  rocketmq-console-ng-1.0.0-sources.jar
[[email protected]153-215 rocketmq-console]# java -jar target/rocketmq-console-ng-1.0.0.jar
.......
[2019-01-11 17:04:15.980]  INFO Initializing ProtocolHandler ["http-nio-8080"]
[2019-01-11 17:04:15.991]  INFO Starting ProtocolHandler [http-nio-8080]
[2019-01-11 17:04:16.232]  INFO Using a shared selector for servlet write/read
[2019-01-11 17:04:16.251]  INFO Tomcat started on port(s): 8080 (http)
[2019-01-11 17:04:16.257]  INFO Started App in 6.594 seconds (JVM running for 7.239)

4. 集群测试

Producer 测试代码:

public class SyncProducerTest {
    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer("producer_test_group");
        producer.setNamesrvAddr("39.107.153.215:9876;39.107.157.89:9876");
        try{
            producer.start();
            for(int i=0;i<100;i++){
                Message message = new Message("topic_test", "tag_test", ("Hello World" + 1).getBytes("UTF-8"));
                SendResult sendResult = producer.send(message);
                System.out.println(JSON.toJSON(sendResult));
            }
            producer.shutdown();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

Consumer 测试代码:

public class SyncConsumerTest {
    public static void main(String[] args) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_test_group");
        consumer.setNamesrvAddr("39.107.153.215:9876;39.107.157.89:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        try {
            consumer.subscribe("topic_test", "*");
            consumer.registerMessageListener((MessageListenerConcurrently) (messageList, context) -> {
                System.out.println(Thread.currentThread().getName() + " Receive New Message:" + messageList);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            consumer.start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

SyncProducerTest 运行日志:

SyncConsumerTest 运行日志:

通过日志可以看到,消费者、生产者收发消息都是正常的,我们去可视化管理控制台查看下 http://192.168.50.1:8080/rocketmq:

通过管控台可以看到,双 master 双 slave 的 broker 集群一切正常,并可进一步看到每个 broker 处理消息的情况。

原文地址:https://www.cnblogs.com/mengyi/p/10296599.html

时间: 2024-10-08 23:29:40

【原创】《从0开始学RocketMQ》—集群搭建的相关文章

RocketMQ集群搭建及安装rocketmq-console

RocketMQ集群搭建-4.2.0版本https://juejin.im/post/5a911ea16fb9a0633f0e36a1 直接在官网下载的二进制包,编译容易出现问题 启动broker,报错:rocketmq Cannot allocate memory 可根据机器内存大小,配置jvm参数 http://www.cnblogs.com/quchunhui/p/8350904.html 安装rocketmq-console - 大墨垂杨 - 博客园https://www.cnblogs

RocketMQ集群搭建

下面是双主双从异步复制集群搭建 主机分配 两台主机192.168.86.126和192.168.86.127 nameServer1 注册中心 192.168.86.126:9876 nameServer2 注册中心 192.168.86.127:9876 broker-a broker-a-master 192.168.86.126:10911 broker-a.properties broker-b-s broker-b-slave 192.168.86.126:11011 broker-b

Spark修炼之道(进阶篇)——Spark入门到精通:第十五节 Kafka 0.8.2.1 集群搭建

作者:周志湖 微信号:zhouzhihubeyond 本节为下一节Kafka与Spark Streaming做铺垫 主要内容 1.kafka 集群搭建 1. kafka 集群搭建 kafka 安装与配置 到下面的地址下载:Scala 2.10 - kafka_2.10-0.8.2.1.tgz http://kafka.apache.org/downloads.html 下载完成后,使用命令 tar -zxvf kafka_2.10-0.8.2.1.tgz 解压,解压后的目录如下 进入config

RocketMQ 集群搭建--双Master方案

安装环境 jdk1.7 alibaba-rocketmq-3.2.6.tar.gz VM虚拟机redhat6.5-x64:192.168.1.201  192.168.1.202 Xshell4 部署方案 序号 IP 角色 模式 1 192.168.1.201 nameServer1,brokerServer1 Master1 2 192.168.1.202 nameServer2,brokerServer2 Master2 安装步骤:[两台机器同样操作  以201为例] 步骤一:解压安装文件a

【原创】《从0开始学RocketMQ》—单机搭建

内容目录 1. RocketMQ是什么? 2. 下载并解压 3. 启动NameServer 4. 启动 Broker 5. 关闭消息队列 1. RocketMQ是什么? RocketMQ是一种消息队列.何为消息队列?即数据结构中一种"先进先出"的数据结构.在微服务中,分布式消息队列可以解决什么问题?应用解耦.流量削峰.消息分发.保证最终一致性.方便动态扩容等. RocketMQ中不可不知的四个角色:Producer(消息生产者).Consumer(消息消费者).Broker(消息暂存者

【原创】大数据基础之集群搭建

Cluster Platform redhat/centos7, docker, mesos, cloudera manager(cdh) Checklist 1 check user & password & network reachability, make sure everything is fine to login all remote servers by ssh client2 check linux release, upgrade or reinstall if ne

Kafka【第一篇】Kafka集群搭建

Kafka初识 1.Kafka使用背景 在我们大量使用分布式数据库.分布式计算集群的时候,是否会遇到这样的一些问题: 我们想分析下用户行为(pageviews),以便我们设计出更好的广告位 我想对用户的搜索关键词进行统计,分析出当前的流行趋势 有些数据,存储数据库浪费,直接存储硬盘效率又低 这些场景都有一个共同点: 数据是又上游模块产生,上游模块,使用上游模块的数据计算.统计.分析,这个时候就可以使用消息系统,尤其是分布式消息系统! 2.Kafka的定义 What is Kafka:它是一个分布

RabbitMQ的安装及集群搭建方法

RabbitMQ安装 1 安装erlang 下载地址:http://www.erlang.org/downloads 博主这里采用的是otp_src_19.1.tar.gz (200MB+) [[email protected] util]# tar zxvf otp_src_19.1.tar.gz [[email protected] util]# cd otp_src_19.1 [[email protected] otp_src_19.1]# ./configure --prefix=/o

Ubuntu 12.04下spark1.0.0 集群搭建(原创)

spark1.0.0新版本的于2014-05-30正式发布啦,新的spark版本带来了很多新的特性,提供了更好的API支持,spark1.0.0增加了Spark SQL组件,增强了标准库(ML.streaming.GraphX)以及对JAVA和Python语言的支持: 下面,我们首先进行spark1.0.0集群的安装,在这里我使用了两台服务器,一台作为master即namenode主机,另一台作为slave即datanode主机,增加更多的slave只需重复slave部分的内容即可.: 系统版本