ActiveMQ(13):ActiveMQ的集群

一、简介

1.1 消费者集群(Queue consumer clusters)

ActiveMQ支持Consumer对消息高可靠性的负载平衡消费,如果一个Consumer死掉,该消息会转发到其它的Consumer消费的Queue上。

如果一个Consumer获得消息比其它Consumer快,那么他将获得更多的消息。

因此推荐ActiveMQ的Broker和Client使用failover://transport的方式来配置链接。

1.2 Broker clusters

大部情况下是使用一系列的Broker和Client链接到一起。如果一个Broker死掉了,Client可以自动链接到其它Broker上。

实现以上行为需要用failover协议作为Client。

如果启动了多个Broker,Client可以使用static discover或者 Dynamic discovery容易的从一个broker到另一个broker直接链接。

这样当一个broker上没有Consumer的话,那么它的消息不会被消费的,然而该broker会通过存储和转发的策略来把该消息发到其它broker上。

特别注意:

ActiveMQ默认的两个broker,static链接后是单方向的,broker-A可以访问消费broker-B的消息,如果要支持双向通信,需要在

netWorkConnector配置的时候,设置duplex=true 就可以了。

操作,服务端搭建静态网络连接与消息回流

消息者1:

public void test1() throws Exception {
    ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","tcp://192.168.175.13:61676");
    Connection connection = cf.createConnection();
    connection.start();

    final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("my-queue");
    for (int i = 0; i < 1; i++) {
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
	    @Override
	    public void onMessage(Message message) {
	        TextMessage m = (TextMessage) message;
	        try {
	            System.out.println("===收到11111111:" + m.getText());
		    session.commit();
	        } catch (JMSException e) {
	            e.printStackTrace();
	        }    
	    }
	});
    }
}

消息者2:

public void test1() throws Exception {
    ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","tcp://192.168.175.13:61616");
    Connection connection = cf.createConnection();
    connection.start();

    final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("my-queue");
    for (int i = 0; i < 1; i++) {
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
	    @Override
	    public void onMessage(Message message) {
	        TextMessage m = (TextMessage) message;
	        try {
	            System.out.println("===收到222222222:" + m.getText());
		    session.commit();
	        } catch (JMSException e) {
	            e.printStackTrace();
	        }    
	    }
	});
    }
}

生产者:

public void test1() throws Exception {
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)");
    Connection connection = connectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("my-queue");
    MessageProducer producer = session.createProducer(destination);
    for (int i = 0; i < 30; i++) {
        TextMessage message = session.createTextMessage("message--" + i);
	Thread.sleep(1000);
	producer.send(message);
    }
    session.commit();
    session.close();
    connection.close();
}

效果:

二、主从节点的集群(Master Slave)

在5.9的版本里面,废除了Pure Master Slave的方式,目前支持:

1:Shared File System Master Slave:

基于共享储存的Master-Slave:多个broker实例使用一个存储文件,谁拿到文件锁就是master,其他处于待启动状态,如果master挂掉了,

某个抢到文件锁的slave变成master

2:JDBC Master Slave:基于JDBC的Master-Slave:使用同一个数据库,拿到LOCK表的写锁的broker成为master

3:Replicated LevelDB Store:基于ZooKeeper复制LevelDB存储的Master-Slave机制,这个是5.9新加的

具体的可以到官方察看: http://activemq.apache.org/masterslave.html

注意:这里可以不要静态连接与回流了

2.1 JDBC Master Slave的方式

2.1.1 简介

利用数据库作为数据源,采用Master/Slave模式,其中在启动的时候Master首先获得独有锁,其它Slaves Broker则等待获取独有锁。

推荐客户端使用Failover来链接Brokers。

具体如下图所示:

Master失败

如果Master失败,则它释放独有锁,其他Slaver则获取独有锁,其它Slaver立即获得独有锁后此时它将变成Master,并且启动所有的传输链接。

同时,Client将停止链接之前的Master和将会轮询链接到其他可以利用的Broker即新Master。如上中图所示

Master重启

任何时候去启动新的Broker,即作为新的Slave来加入集群,如上右图所示

2.1.2 JDBC Master Slave的配置

使用<jdbcPersistenceAdapter/>来配置消息的持久化,自动就会使用JDBC Master Slave的方式。

参考:ActiveMQ消息存储持久化 里的jdbc

去掉静态连接:参考ActiveMQ的静态网络链接

去掉回流:参考集群下的消息回流功能

注意:在配置JDBC时,注意配置useDatabaseLock="true",如下

    <jdbcPersistenceAdapter dataSource="#mysql-ds" useDatabaseLock="true" />

必需设置,不然在保存数据时会报数据主键重复异常

2.1.3 测试

生产30个消息:

public void test1() throws Exception {
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)");
    Connection connection = connectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("my-queue");
    MessageProducer producer = session.createProducer(destination);
    for (int i = 0; i < 30; i++) {
        TextMessage message = session.createTextMessage("message--" + i);
	Thread.sleep(1000);
	producer.send(message);
    }
    session.commit();
    session.close();
    connection.close();
}

消费3个消息:

public void test1() throws Exception {
    ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)");
    Connection connection = cf.createConnection();
    connection.start();

    final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("my-queue");
    MessageConsumer consumer = session.createConsumer(destination);
    int i=0;
    while(i<3) {
        i++;
	TextMessage message = (TextMessage) consumer.receive();
	session.commit();
	System.out.println("收到消 息:" + message.getText());
    }
    session.close();
    connection.close();
}

然后关闭61616这台mq,进入到61676界面:

时间: 2024-10-11 11:33:06

ActiveMQ(13):ActiveMQ的集群的相关文章

ActiveMQ的几种集群配置

ActiveMQ是一款功能强大的消息服务器,它支持许多种开发语言,例如Java, C, C++, C#等等.企业级消息服务器无论对服务器稳定性还是速度,要求都很高,而ActiveMQ的分布式集群则能很好的满足这一需求,下面说说ActiveMQ的几种集群配置. Queue consumer clusters 此集群让多个消费者同时消费一个队列,若某个消费者出问题无法消费信息,则未消费掉的消息将被发给其他正常的消费者,结构图如下: Broker clusters 此种配置是一个消费者连接到多个bro

[转]ActiveMQ的几种集群配置

ActiveMQ是一款功能强大的消息服务器,它支持许多种开发语言,例如Java, C, C++, C#等等.企业级消息服务器无论对服务器稳定性还是速度,要求都很高,而ActiveMQ的分布式集群则能很好的满足这一需求,下面说说ActiveMQ的几种集群配置. Queue consumer clusters 此集群让多个消费者同时消费一个队列,若某个消费者出问题无法消费信息,则未消费掉的消息将被发给其他正常的消费者,结构图如下: Broker clusters 此种配置是一个消费者连接到多个bro

activemq+Zookeper高可用集群方案配置

在高并发.对稳定性要求极高的系统中,高可用的是必不可少的,当然ActiveMQ也有自己的集群方案.从ActiveMQ 5.9开始,ActiveMQ的集群实现方式取消了传统的Master-Slave方式,增加了基于ZooKeeper + LevelDB 的 Master-Slave 实现方式. 相关文章:范例项目: http://wosyingjun.iteye.com/blog/2312553 ActiveMQ的简单实用:http://wosyingjun.iteye.com/blog/2314

基于Docker搭建ActiveMQ的高可用集群

最近刚开始玩Docker和ActiveMQ刚好学习到ActiveMQ集群的搭建,就将其记录了下来给有需要的人,也可以跟大家交流交流. 这里先感谢慕课网和http://blog.csdn.net/lifetragedy/article/details/51869032,在学习ActiveMQ有很大的帮助. 一.docker坏境的搭建. 这里重点不是docker,而是基于docker搭建的ActiveMQ集群,docker了解的也可以参考http://www.docker.org.cn/.     

ActiveMQ(七)_伪集群和主从高可用使用

一.本文目的 介绍如何在同一台虚拟机上搭建高可用的Activemq服务,集群数量包含3个Activemq,当Activemq可用数>=2时,整个集群可用. 本文Activemq的集群数量为3个,分别命名为mq1,mq2,mq3   二.概念介绍 1.伪集群 集群搭建在同一台虚拟机上,3个Activemq分别使用不同的端口提供服务,启用1个为Master,其它2个为Slaver,同一时间仅Master队列提供服务 2.高可用 3个Activemq服务,同一时间仅Master队列提供服务,当Mast

ActiveMQ之brokers network集群

一.集群配置方式类型 master-slave <networkConnectors> <networkConnector uri="masterslave:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/> </networkConnectors> uri列表中对应的顺序就是MASTER,SLAVE1,SLAVE2...SLAVE; master-slave的作用就是brokers之间的主从,同一时刻

CentOS7.6 使用ceph-deploy部署mimic 13.2.8集群(一)

一.部署环境规划 1.1 各版本规划如下表 到今目前为止各软件的支持情况  https://docs.ceph.com/docs/master/releases/general/ 1.2 查看磁盘信息 # lsblk NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT vda 253:0 0 50G 0 disk └─vda1 253:1 0 50G 0 part / vdb 253:16 0 20G 0 disk vdc 253:32 0 20G 0 disk vd

13.5.SolrCloud集群使用手册之数据导入

转载请出自出处:http://www.cnblogs.com/hd3013779515/ 1.使用curl命令方式 SolrCloud时会根据路由规则路由到各个shard. 删除所有数据 curl http://192.168.137.171:8080/solr-cloud/myc_shard1_replica1/update?commit=true -H "Content-Type: text/xml" --data-binary "<delete><qu

架构设计:系统间通信(26)——ActiveMQ集群方案(下)

(接上文<架构设计:系统间通信(26)--ActiveMQ集群方案(上)>) 3.ActiveMQ热备方案 ActiveMQ热备方案,主要保证ActiveMQ的高可用性.这种方案并不像上节中我们主要讨论的ActiveMQ高性能方案那样,同时有多个节点都处于工作状态,也就是说这种方案并不提高ActiveMQ集群的性能:而是从集群中的多个节点选择一个,让其处于工作状态,集群中其它节点则处于待命状态.当主要的工作节点由于各种异常情况停止服务时,保证处于待命的节点能够无缝接替其工作. 3-1.Acti

ActiveMQ学习第五篇:ActiveMq伪集群学习

启动多实例 # 1.将conf文件夹复制一份 cp -r conf/ conf-1/ #主要是修改conf-1目录activemq.xml # 2.修改Broker名称 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost-1" dataDirectory="${activemq.data}"> #3.数据存储如果使用的是kahaDB,