RocketMq 集群方式搭建 步骤教学包教包会

mq集群方式搭建

有段时间没写这些技术文章了, 今天抽空写一点,不然自己都快忘记了
这篇文章记录了rocketmq 集群方式搭建的过程, 也是自己半天的成果记录吧! 感兴趣的朋友点个赞在走呗!

好了,废话不多,下面开搞。

本文章参考https://blog.csdn.net/qq_35400008/article/details/82467562#comments 这个博客文章编写

准备工作

第一步:关闭要搭建的所有机器的防火墙
第二步:每台机器执行下如下步骤

[[email protected] ~]# vim /etc/sysconfig/selinux
......
SELINUX=disabled
[[email protected]~]# setenforce 0
[[email protected]~]# getenforce

第三步:所有机器装好jdk, maven , zip , unzip , ssh 免密登录

配置crt连接: https://blog.csdn.net/cmqwan/article/details/61932792
安装maven参考老哥博客:  https://www.cnblogs.com/clicli/p/5866390.html
安装zip,unzip参考: http://www.rpmfind.net/linux/rpm2html/search.php?query=zip&submit=Search+...&system=&arch=
安装ssh参考: https://blog.csdn.net/m0_37590135/article/details/74275859
jdk自己百度哈, 很多参考博客的!

第四步: 如下命令是ssh机器之间copy用的命令

scp -r /home/administrator/test/ [email protected]:/root/


第五步: 下载完成后, 解压

unzip  rocketmq-all-4.4.0-bin-release.zip 

第六步:进入解压后的文件夹rocketmq-bin4.4.0 , 在文件夹里面新建logs , data/store, data2/store 目录

第七步:安装顺序修改bin下面的几个启动文件, 因默认配置内存空间太大,本地启动会报错

1. vim runbroker.sh    对应地方更改为  -server -Xms512m -Xmx512m -Xmn256m
2. vim runserver.sh (同样的道理) -server -Xms512m -Xmx512m -Xmn126m -XX:PermSize=128m -XX:MaxPermSize=320m
3. vim tools.sh           -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=128m

第八步:到rocketmq-bin4.4.0/conf/2m-2s-async 下 修改这四个文件

注: 这里说明下哈, 我是用了三台机器,所有配置了130, 131和132一样的,你们2台机器完全可以用,131和132配置一台就可以了哈,ip自行更改哈。

第九步: 130主机器修改如下配置文件, broker-a.properties broker-b-s.properties两个文件 内容分别如下
broker-a.properties

brokerClusterName=RocketMQCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
##Broker 对外服务的监听端口
listenPort=10911
#nameserver地址,分号分割
namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
brokerIP1=192.168.175.130
storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data/store
storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data/store/commitlog
# 消费队列存储路径存储路径
storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data/store/consumequeue
#消息索引存储路径
storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data/store/index
#checkpoint 文件存储路径
storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data/store/checkpoint
#abort 文件存储路径
abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data/store/abort
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存300W条,根据业务情况调整
mapedFileSizeConsumeQueue=3000000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
#diskMaxUsedSpaceRatio=88

broker-b-s.properties

brokerClusterName=RocketMQCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10921
#nameserver地址,分号分割
namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876
brokerIP1=192.168.175.130
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data2/store
storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/commitlog2
# 消费队列存储路径存储路径
storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/consumequeue2
#消息索引存储路径
storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/index2
#checkpoint 文件存储路径
storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/checkpoint2
#abort 文件存储路径
abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/abort2
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存300W条,根据业务情况调整
mapedFileSizeConsumeQueue=3000000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
#diskMaxUsedSpaceRatio=88

第十步: 131, 132 机器只修改 broker-b.properties 和broker-a-s.properties 内容分别如下:
broker-b.properties

# limitations under the License.
brokerClusterName=RocketMQCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
#nameserver地址,分号分割
namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876
brokerIP1=192.168.175.131
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true

storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data/store
storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data/store/commitlog
# 消费队列存储路径存储路径
storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data/store/consumequeue
#消息索引存储路径
storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data/store/index
#checkpoint 文件存储路径
storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data/store/checkpoint
#abort 文件存储路径
abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000

#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存300W条,根据业务情况调整
mapedFileSizeConsumeQueue=3000000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
#diskMaxUsedSpaceRatio=88

broker-a-s.properties

# limitations under the License.
brokerClusterName=RocketMQCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

listenPort=10921
namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876
brokerIP1=192.168.175.131
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true

storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data2/store
storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/commitlog
# 消费队列存储路径存储路径
storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/consumequeue
#消息索引存储路径
storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/index
#checkpoint 文件存储路径
storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/checkpoint
#abort 文件存储路径
abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000

#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
#diskMaxUsedSpaceRatio=88

第十一步: 启动


三台都执行:

nohup sh bin/mqnamesrv > ./logs/namesrvrun.log 2>&1 &

130机器执行:
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-a.log 2>&1 &

nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-b-s.log 2>&1 &

131, 132 机器执行:
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties  -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-b.log 2>&1 &

nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-a-s.log 2>&1 &

执行之后,jps结果,有两个brokerstartup就行了, 如果报错的化,看下自己建的logs文件夹日志

#### 好了,到此rocketmq 基础配置就搭建起来了,下面在讲一讲实战代码

导入依赖包

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-all</artifactId>
            <version>3.5.9</version>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

mq消息发送方


import java.io.UnsupportedEncodingException;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

/**
 * 消息发送者
 * @author LELE
 *
 */
public class Producer {

    public static void main(String[] args) throws MQClientException, InterruptedException {

        // 声明并初始化一个producer
        // 需要一个producer group名字作为构造方法的参数,这里为producer1
        DefaultMQProducer producer = new DefaultMQProducer("producer1");
        producer.setVipChannelEnabled(false);
        // 设置NameServer地址,此处应改为实际NameServer地址,多个地址之间用;分隔
        // NameServer的地址必须有
        // producer.setClientIP("xxxx");
        // producer.setInstanceName("Producer");
        producer.setNamesrvAddr("192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876");

        // 调用start()方法启动一个producer实例
        producer.start();

        // 发送1条消息到Topic为TopicTest,tag为TagA,消息内容为“Hello RocketMQ”拼接上i的值
        try {
            for(int i=0;i<30000;i++) {
                // 封装消息
                Message msg = new Message("TopicTest", // topic
                        "TagA", // tag
                        ("Hello RocketMQ--------"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
                );
                // 调用producer的send()方法发送消息
                // 这里调用的是同步的方式,所以会有返回结果
                SendResult sendResult = producer.send(msg);
                // 打印返回结果
                System.out.println(sendResult);
            }

        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        // 发送完消息之后,调用shutdown()方法关闭producer
        System.out.println("send success");
        producer.shutdown();
    }

}

消息消费者


import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

/**
 * 消息接收者, 需要服务器启动mq服务
 * @author LELE
 *
 */
public class Consumer {

    public static void main(String[] args) throws MQClientException {

        // 声明并初始化一个consumer
        // 需要一个consumer group名字作为构造方法的参数,这里为consumer1
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
        // consumer.setVipChannelEnabled(false);
        // 同样也要设置NameServer地址
        consumer.setNamesrvAddr("192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876");

        // 这里设置的是一个consumer的消费策略
        // CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
        // CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
        // CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        // 设置consumer所订阅的Topic和Tag,*代表全部的Tag
        consumer.subscribe("TopicTest", "*");

        // 设置一个Listener,主要进行消息的逻辑处理
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

                MessageExt msg = msgs.get(0);

                if (msg.getTopic().equals("TopicTest")) {

                    // 执行TopicTest1的消费逻辑

                    System.out.println("TagA:" + new String(msg.getBody()));

                }

                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()
                        + "----------------------------------------------------------------------------------");
                // 返回消费状态
                // CONSUME_SUCCESS 消费成功
                // RECONSUME_LATER 消费失败,需要稍后重新消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 调用start()方法启动consumer
        consumer.start();
        System.out.println("Consumer Started.");

    }
}

启动开始尽情玩耍吧,少年, 记得点赞哦!

原文地址:https://www.cnblogs.com/eian/p/11478472.html

时间: 2024-08-30 05:52:50

RocketMq 集群方式搭建 步骤教学包教包会的相关文章

双RocketMq集群的搭建

一.双Master RocketMq集群的搭建 1.服务器环境: 序号 IP 用户名 角色 模式 1 192.168.211.128 root nameServer1,brokerServer1 Master1 2 192.168.211.129 root nameServer2,brokerServer2 Master1 2.添加hosts信息 vim /etc/hosts IP NAME 192.168.211.128 rocketmq-nameserver1 192.168.211.128

高可用集群 corosync 搭建步骤

实验环境:    OS :    CentOS 6.6     corosync: corosync-1.4.7-1.el6.x86_64    pacemaker:pacemaker-1.1.12-4.el6.x86_64    crmsh:crmsh-2.1-1.6.x86_64.rpm     pssh:    pssh-2.3.1-2.el6.x86_64.rpm node1:        hostname: node2.1inux.com        IP     :172.16.

redis集群redis-cluster搭建

redis集群搭建--参考微信公众号(诗情画意程序员):https://mp.weixin.qq.com/s/s5eJE801TInHgb8bzCapJQ 这是来自redis官网的一段介绍,大概意思就是: Redis是一个开源(BSD许可)的内存数据结构存储,用作数据库.缓存和消息代理.它支持诸如字符串.散列.列表.集.带范围查询的排序集.位图.hyperloglogs.带半径查询和流的地理空间索引等数据结构.Redis具有内置的复制.Lua脚本.LRU清除.事务和不同级别的磁盘持久性,并通过R

Kafka集群环境搭建

Kafka介绍 在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算. KAFKA + STORM +REDIS 1.Apache Kafka是一个开源消息系统,用Scala写成. 2.Kafka是一个分布式消息队列:生产者.消费者的功能.它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现. 3.Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接收者成为Consumer,此外Kafka集群由多个Ka

Kafka 完全分布式集群环境搭建

思路: 先在主机s1上安装配置,然后远程复制到其它两台主机s2.s3上, 并分别修改配置文件server.properties中的broker.id属性. 1. 搭建前准备 示例共三台主机,主机IP映射信息如下: 192.168.32.101 s1 192.168.32.102 s2 192.168.32.103 s3 搭建ZooKeeper 集群,搭建步骤参考: https://www.cnblogs.com/jonban/p/zookeeper.html 2.下载 Kafka 下载地址: h

rocketmq那些事儿之集群环境搭建

上一篇入门基础部分对rocketmq进行了一个基础知识的讲解说明,在正式使用前我们需要进行环境的搭建,今天就来说一说rockeketmq分布式集群环境的搭建 前言 之前已经介绍了rocketmq的入门基础,相信各位已经基本了解,今天进行一个分布式集群的搭建,其实可以通过本地源码来进行源码的使用和学习,但是作为开发维护人员还是需要去了解分布式集群的部署流程,方便后面集群的调试和测试 配置参数 注意官方给的配置文件都是默认的,最简单的版本,线上的环境需要根据自己需求来进行配置,在这里说明下其中的部分

RocketMQ集群搭建及安装rocketmq-console

RocketMQ集群搭建-4.2.0版本https://juejin.im/post/5a911ea16fb9a0633f0e36a1 直接在官网下载的二进制包,编译容易出现问题 启动broker,报错:rocketmq Cannot allocate memory 可根据机器内存大小,配置jvm参数 http://www.cnblogs.com/quchunhui/p/8350904.html 安装rocketmq-console - 大墨垂杨 - 博客园https://www.cnblogs

Linux环境下HDFS集群环境搭建关键步骤

Linux环境下HDFS集群环境搭建关键步骤记录. 介质版本:hadoop-2.7.3.tar.gz 节点数量:3节点. 一.下载安装介质 官网下载地址:http://hadoop.apache.org/releases.html 二.服务器规划 MASTER:NAMENODE, DATANODENODE1:DATANODENODE2:SECONDARY NAMENODE, DATANODE 三.配置hostname和hosts 192.168.13.4 master192.168.13.5 n

Linux环境下SolrCloud集群环境搭建关键步骤

Linux环境下SolrCloud集群环境搭建关键步骤. 前提条件:已经完成ZooKeeper集群环境搭建. 一.下载介质 官网下载地址:http://www.apache.org/dyn/closer.lua/lucene/solr/7.3.1 历史版本下载:http://archive.apache.org/dist/lucene/solr/ 二.上传介质 通过工具将下载好的安装介质上传至服务器目录. 三.解压安装 解压即可完成安装. unzip solr-5.5.5.zip 四.修改配置文