RocketMQ(二)集群配置

Broker集群部署方式主要有以下几种:(Slave 不可写,但可读)

单个Master

这种方式风险较大,一旦Broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。


多Master模式

一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master。

优点:配置简单,单个Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。

缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响。

先启动 NameServer,例如机器 IP 为:192.168.1.101:9876

nohup sh mqnamesrv &
  • 在机器 A,启动第一个 Master
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &
  • 在机器 B,启动第二个 Master
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &

多Master多Slave模式,异步复制

每个 Master 配置一个 Slave,有多对Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟,毫秒级。

优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 Master 宕机后,消费者仍然可以从 Slave 消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。

缺点:Master宕机,磁盘损坏情况,会丢失少量消息。

#先启动 NameServer,例如机器 IP 为:192.168.1.101:9876

nohup sh mqnamesrv &
  • 在机器 A,启动第一个 Master
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &
  • 在机器 B,启动第二个 Master
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &
  • 在机器 C,启动第一个 Slave
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &
  • 在机器 D,启动第二个 Slave
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &

多Master多Slave模式,同步双写

每个 Master 配置一个 Slave,有多对Master-Slave,HA 采用同步双写方式,主备都写成功,向应用返回成功。

优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高

缺点:性能比异步复制模式略低,大约低10%左右,发送单个消息的 RT 会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。

先启动 NameServer,例如机器 IP 为:192.168.1.101:9876

nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &
  • 在机器 A,启动第一个 Master
nohup sh mqbroker -n 192.168.1.1:9876 -c$ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &
  • 在机器 B,启动第二个 Master
nohup sh mqbroker -n 192.168.1.1:9876 -c$ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &
  • 在机器 C,启动第一个 Slave
nohup sh mqbroker -n 192.168.1.1:9876 -c$ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &
  • 在机器 D,启动第二个 Slave
nohup sh mqbroker -n 192.168.1.1:9876 -c$ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &

以上 Broker 与 Slave 配对是通过指定相同的brokerName 参数来配对,Master 的 BrokerId 必须是

0,Slave 的BrokerId 必须是大与 0 的数。另外一个 Master 下面可以挂载多个 Slave,同一 Master 下的多个

Slave 通过指定不同的 BrokerId 来区分。

除此之外,nameserver也需要集群。

下面以配置一主一备(同步),2个nameserver为例测试。

1、环境两台机器:

  • 192.168.36.101 为主
  • 192.168.36.102 为备

同时在2台机器个启动一个nameserver。安装RocketMq请参考:

http://blog.csdn.net/zhu_tianwei/article/details/40948447

2、修改配置

(1)创建目录

mkdir /usr/local/alibaba-rocketmq/logs   #创建日志目录
mkdir -p /usr/local/alibaba-rocketmq/data/store/commitlog  #创建数据存储目录

更改日志目录

cd /usr/local/alibaba-rocketmq/conf
sed -i ‘s#${user.home}#${user.home}/alibaba-rocketmq#g‘ *.xml

(2)修改主配置

vi  ./conf/2m-2s-sync/broker-a.properties
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
namesrvAddr=192.168.36.189:9876;192.168.36.54:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=50000000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
diskMaxUsedSpaceRatio=88  

storePathRootDir=/usr/local/alibaba-rocketmq/data/store
storePathCommitLog=/usr/local/alibaba-rocketmq/data/store/commitlog
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000  

checkTransactionMessageEnable=false
sendMessageThreadPoolNums=128
pullMessageThreadPoolNums=128  

brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH 

(3)修改备配置

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
namesrvAddr=192.168.36.189:9876;192.168.36.54:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=50000000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
diskMaxUsedSpaceRatio=88  

storePathRootDir=/usr/local/alibaba-rocketmq/data/store
storePathCommitLog=/usr/local/alibaba-rocketmq/data/store/commitlog
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000  

checkTransactionMessageEnable=false
sendMessageThreadPoolNums=128
pullMessageThreadPoolNums=128  

brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH  

实例:

1.生产者Producer.java ,TransactionMQProducer使用

package cn.somnus.rocketmq.cluster;  

import java.util.concurrent.TimeUnit;  

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;  

//生产者
public class Producer {  

    public static void main(String[] args) throws MQClientException,
            InterruptedException {
        /**
         * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>
         * 注意:ProducerGroupName需要由应用来保证唯一,一类Producer集合的名称,这类Producer通常发送一类消息,且发送逻辑一致<br>
         * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
         * 因为服务器会回查这个Group下的任意一个Producer
         */
        final TransactionMQProducer producer = new TransactionMQProducer("ProducerGroupName");
        //nameserver服务
        producer.setNamesrvAddr("192.168.36.189:9876;192.168.36.54:9876");
        producer.setInstanceName("Producer");  

        /**
         * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
         * 注意:切记不可以在每次发送消息时,都调用start方法
         */
        producer.start();
        //服务器回调Producer,检查本地事务分支成功还是失败
        producer.setTransactionCheckListener( new TransactionCheckListener() {  

            public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
                System.out.println("checkLocalTransactionState --"+new String(msg.getBody()));
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });  

        /**
         * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
         * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>
         * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>
         * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
         */  

        for (int i = 0; i < 10; i++) {
            try {
                {
                    Message msg = new Message("TopicTest1",// topic
                            "TagA",// tag
                            "OrderID001",// key消息关键词,多个Key用KEY_SEPARATOR隔开(查询消息使用)
                            ("Hello MetaQA").getBytes());// body
                    SendResult sendResult = producer.sendMessageInTransaction(msg,new Producer().new MyTransactionExecuter(),"
$$$");
                    System.out.println(sendResult);
                }  

                {
                    Message msg = new Message("TopicTest2",// topic
                            "TagB",// tag
                            "OrderID0034",// key 消息关键词,多个Key用KEY_SEPARATOR隔开(查询消息使用)
                            ("Hello MetaQB").getBytes());// body
                    SendResult sendResult = producer.sendMessageInTransaction(msg,new Producer().new MyTransactionExecuter(),"
$$$");
                    System.out.println(sendResult);
                }  

                {
                    Message msg = new Message("TopicTest3",// topic
                            "TagC",// tag
                            "OrderID061",// key
                            ("Hello MetaQC").getBytes());// body
                    SendResult sendResult = producer.sendMessageInTransaction(msg,new Producer().new MyTransactionExecuter(),"
$$$");
                    System.out.println(sendResult);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            TimeUnit.MILLISECONDS.sleep(1000);
        }  

        /**
         * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
         * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
         */
        // producer.shutdown();
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                producer.shutdown();
            }
        }));
        System.exit(0);
    }  

    //执行本地事务,由客户端回调
    public class MyTransactionExecuter implements LocalTransactionExecuter{  

        public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
            System.out.println("executeLocalTransactionBranch--msg="+new String(msg.getBody()));
            System.out.println("executeLocalTransactionBranch--arg="+arg);
            return LocalTransactionState.COMMIT_MESSAGE;
        }  

    }
}  

2、消费者Consumer.java ,采用主动拉取方式消费。

package cn.somnus.rocketmq.cluster;  

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;  

import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;  

//消费者 pull
public class Consumer {  

    // Java缓存
    private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();  

    /**
     * 主动拉取方式消费
     *
     * @throws MQClientException
     */
    public static void main(String[] args) throws MQClientException {
        /**
         * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
         * 注意:ConsumerGroupName需要由应用来保证唯一 ,最好使用服务的包名区分同一服务,一类Consumer集合的名称,
         * 这类Consumer通常消费一类消息,且消费逻辑一致
         * PullConsumer:Consumer的一种,应用通常主动调用Consumer的拉取消息方法从Broker拉消息,主动权由应用控制
         */
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("ConsumerGroupName");
        // //nameserver服务
        consumer.setNamesrvAddr("192.168.36.189:9876;192.168.36.54:9876");
        consumer.setInstanceName("Consumber");
        consumer.start();
        // 拉取订阅主题的队列,默认队列大小是4
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
        for (MessageQueue mq : mqs) {
            System.out.println("Consume from the queue: " + mq);
            SINGLE_MQ: while (true) {
                try {
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    List<MessageExt> list = pullResult.getMsgFoundList();
                    if (list != null && list.size() < 100) {
                        for (MessageExt msg : list) {
                            System.out.println(new String(msg.getBody()));
                        }
                    }
                    System.out.println(pullResult.getNextBeginOffset());
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                    case FOUND:
                        break;
                    case NO_MATCHED_MSG:
                        break;
                    case NO_NEW_MSG:
                        break SINGLE_MQ;
                    case OFFSET_ILLEGAL:
                        break;
                    default:
                        break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }  

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        offseTable.put(mq, offset);
    }  

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = offseTable.get(mq);
        if (offset != null) {
            System.out.println(offset);
            return offset;
        }
        return 0;
    }
}  
时间: 2024-11-08 19:00:02

RocketMQ(二)集群配置的相关文章

consui(二)集群配置

consul集群搭建:一.软件安装Linux 环境下载zip包然后直接解压,然后把解压的文mv consul /bin检验安装是否成功,查看版本[[email protected] ~]consul -vConsul v1.1.0Protocol 2 spoken by default, understands 2 to 3 (agent will automatically use protocol >2 when speaking to compatible agents)Consul 软件

ELK5.3+Kafka集群配置

[一]资源准备 # 3台4C*8G, 安装Zookeeper.Kafka.Logstash--Broker(input: filebeat; output: Kafka) 10.101.2.23 10.101.2.24 10.101.2.25 # 2台4C*8G, 安装Logstash--Indexer(input: Kafaka; output: Elasticsearch) 10.101.2.26 10.101.2.27 # 3台8C*16G, 安装Elasticsearch 10.101.

WildFly8.1(JBoss)+mod_cluster(Apache)集群配置

继上次使用mod_jk进行了Apache+JBoss集群配置之后,由于JBoss5.1启动过于缓慢,所以开始尝试使用最新的WildFly8.1进行配置(WildFly就是JBoss,在JBoss7之后改名). 系统环境: Windows 7 SP1(Windows Server 2003已测试,WildFly会有未知问题) 软件环境: JDK 7+(一定要使用7以上,否则WildFly不支持) WildFly 8.1.0(http://wildfly.org/downloads/) mod_cl

MySql 集群配置

MYSQL CLUSTER方案介绍 本文的大致框架来自罗志威.黄川的报告, 在它的基础上进行简化和修改一些bug并且添加了主从复制的章节,最后做出该文档 MySQL Cluster 是MySQL适合于分布式计算环境的高实用.高冗余版本.它采用了NDB Cluster 存储引擎,允许在1个 Cluster 中运行多个MySQL服务器.现在mysql cluster 被独立出来, 作为一个专门的产品进行运营, mysql-server-5.6+ 就不在存在对mysql cluster的支持,需要独立

直接路由的高可用LVS集群配置

 直接路由的高可用LVS集群配置: 调度服务器IP:(106.3.43.240)192.168.11.100,节点服务器分别为:192.168.11.101,192.168.11.102 一.安装ipvsadmin: 1.yum -y install ipvsadmin(推荐用第一种方法) 2.下载http://www.linuxvirtualserver.org/software/,找到相应的版本: 注意对应自己的内核版本 ipvsadm-1.24.tar.gz tar zxvf ipvs

集群配置工具之conga:web配置简易RHCS

RHCS,红帽的集群套件,至于原理性能什么的我这里也不多说了,很复杂也很无趣,而且网上一百遍介绍RHCS的文章,通常也是只是在用同一种方式同一种语言同一种思路说同一件事,有的甚至拼音的错误也是一样的,很无聊,大家有兴趣可以自己搜.好了,吐槽完毕,开始今天的实验! 准备: 一台控制台(node1.xue.com),需要安装luci和ansible; 三台做集群,及被控制机(node2.xue.com.node3.xue.com.node4.xue.com),需要安装ricci: 配置好yum源:

apache + tomcat 负载均衡分布式集群配置

Tomcat集群配置学习篇-----分布式应用 现目前基于javaWeb开发的应用系统已经比比皆是,尤其是电子商务网站,要想网站发展壮大,那么必然就得能够承受住庞大的网站访问量:大家知道如果服务器访问量过大,就会出现服应用务器崩溃的情况,这个时候怎么办,难道就只能去重启服务器吗?好,如果是一般的小型公益网站到也无所谓,但如果是比如像大型航空公司售票等电子商务网站,每天每小时都有大量的订单业务,如果这些售票系统一旦崩溃后,再去重启,这些时间和客户的损失就直接会影响到航空公司的利益,这些损失如何去避

转载 Tomcat集群配置学习篇-----分布式应用

Tomcat集群配置学习篇-----分布式应用 现目前基于javaWeb开发的应用系统已经比比皆是,尤其是电子商务网站,要想网站发展壮大,那么必然就得能够承受住庞大的网站访问量:大家知道如果服务器访问量过大,就会出现服应用务器崩溃的情况,这个时候怎么办,难道就只能去重启服务器吗?好,如果是一般的小型公益网站到也无所谓,但如果是比如像大型航空公司售票等电子商务网站,每天每小时都有大量的订单业务,如果这些售票系统一旦崩溃后,再去重启,这些时间和客户的损失就直接会影响到航空公司的利益,这些损失如何去避

Redhat6.5下MySQL5.6集群配置完整版

1.准备三台服务器 2.为三台机器分别安装Linux操作系统(Oracle Linux / RHEL 6.5 x86_64bit) 3.分别IP地址 管理节点      192.168.1.110         (负责管理整个集群) SQL节点       192.168.1.111         (负责操作数据库) SQL节点       192.168.1.112         (负责操作数据库) 数据节点      192.168.1.111         (负责存储数据) 数据节

Linux系统运维之Zookeeper集群配置

一.简介 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件.ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效.功能稳定的系统提供给用户. 1. ZooKeeper的基本运转流程 1.选举Leader,选举机制大于1/2. 2.同步数据. 3.选举Leader过程中算法有很多,但要达到的选举标准是一致的. 4.Leader要具有最高的执行ID,类似root权限. 5