ActiveMQ Cluster (ActiveMQ 集群) 配置

构建高可用的ActiveMQ系统在生产环境中是非常重要的,对于这个apache的消息中间件实现高可用非常简单,只要在Apache ActiveMQ单点基本配置基础上做一次配置变更(如果在一台设备上部署多个AMQ,需要修改对应端口号),即可实现

AMQ实现高可用部署有三种方案: 
1、Master-Slave 
2、SharedFile System Master Slave 
3、JDBCMaster Slave

第一种方案由于只可以由两个AMQ实例组件,实际应用场景并不广泛; 
第三种方案支持N个AMQ实例组网,但他的性能会受限于数据库; 
第二种方案同样支持N个AMQ实例组网,但由于他是基于kahadb存储策略,亦可以部署在分布式文件系统上,应用灵活、高效且安全。

shared filesystem Master-Slave部署方式主要是通过共享存储目录来实现master和slave的热备,所有的ActiveMQ应用都在不断地获取共享目录的控制权,哪个应用抢到了控制权,它就成为master。

多个共享存储目录的应用,谁先启动,谁就可以最早取得共享目录的控制权成为master,其他的应用就只能作为slave。

Apache ActiveMQ单点基本配置的原配置内容:

<persistenceAdapter> 
            <kahaDB directory="${activemq.data}/kahadb"/> 
</persistenceAdapter>

修改为:

<persistenceAdapter> 
             <kahaDB directory="D:\\ActiveMQ Cluster\\shareBrokerData" enableIndexWriteAsync="true"  enableJournalDiskSyncs="false"/> 
</persistenceAdapter>

在D:\\ActiveMQ Cluster目录先创建shareBrokerData文件夹。

注意:

1.前面提到如果在一台设备上部署多个AMQ,需要修改对应端口号,如AMQ对外的监听端口61616和jetty的监听端口8161等。 
2.如果多套AMQ部署在不同的设备上,这里的directory应该指向一个远程的系统目录(分布式文件系统) 
3.客户端通过failover方式进行连接,多个AMQ实例地址使用英文逗号隔开,当某个实例断开时会自动重连,但如果所有实例都失效,failover默认情况下会无限期的等待下去,不会有任何提示。

下面为在一台设备上部署两个AMQ示例: 
ActiveMQ A 
1.activemq.xml修改监听端口:

<transportConnectors> 
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> 
<!-- add &amp;wireFormat.maxInactivityDuration=0 --> 
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600&amp;wireFormat.maxInactivityDuration=0" discoveryUri="multicast://default"/> 
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600&amp;wireFormat.maxInactivityDuration=0"/>

</transportConnectors>

2.jetty.xml修改监听端口:

<property name="connectors"> 
            <list> 
                <bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector"> 
                    <property name="port" value="8166" /> 
                </bean> 
                <!-- 
                    Enable this connector if you wish to use https with web console 
                --> 
                <!-- 
                <bean id="SecureConnector" class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector"> 
                    <property name="port" value="8162" /> 
                    <property name="keystore" value="file:${activemq.conf}/broker.ks" /> 
                    <property name="password" value="password" /> 
                </bean> 
                --> 
            </list> 
</property>

ActiveMQ B 
1.activemq.xml修改监听端口:

<transportConnectors> 
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> 
<!-- add &amp;wireFormat.maxInactivityDuration=0 --> 
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600&amp;wireFormat.maxInactivityDuration=0" discoveryUri="multicast://default"/> 
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600&amp;wireFormat.maxInactivityDuration=0"/>

</transportConnectors>

2.jetty.xml修改监听端口:

<property name="connectors"> 
            <list> 
                <bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector"> 
                    <property name="port" value="8166" /> 
                </bean> 
                <!-- 
                    Enable this connector if you wish to use https with web console 
                --> 
                <!-- 
                <bean id="SecureConnector" class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector"> 
                    <property name="port" value="8162" /> 
                    <property name="keystore" value="file:${activemq.conf}/broker.ks" /> 
                    <property name="password" value="password" /> 
                </bean> 
                --> 
            </list> 
</property>

Java测试程序代码: 
1.Producer:

import javax.jms.Connection; 
import javax.jms.DeliveryMode; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory; 
   
public class ProducerTool { 
   
    private String subject = "TOOL.DEFAULT";    
   
    private Destination destination = null;    
   
    private Connection connection = null;    
   
    private Session session = null;    
   
    private MessageProducer producer = null;    
   
    // 初始化 
    private void initialize() throws JMSException, Exception {    
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616?wireFormat.maxInactivityDuration=0,tcp://172.16.30.11:61617?wireFormat.maxInactivityDuration=0)");    
        connection = connectionFactory.createConnection();    
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);   
        destination = session.createQueue(subject);    
        producer = session.createProducer(destination);    
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
         
    }    
   
    // 发送消息    
    public void produceMessage(String message) throws JMSException, Exception {    
        initialize();    
        TextMessage msg = session.createTextMessage(message);    
        connection.start();    
        System.out.println("Producer:->Sending message: " + message);    
        producer.send(msg);    
        System.out.println("Producer:->Message sent complete!");    
    }    
   
    // 关闭连接     
    public void close() throws JMSException {    
        System.out.println("Producer:->Closing connection");    
        if (producer != null)    
            producer.close();    
        if (session != null)    
            session.close();    
        if (connection != null)    
            connection.close();    
   }    
}

import javax.jms.Connection; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.Session; 
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory; 
   
public class ConsumerTool implements MessageListener {      
   
    private String subject = "TOOL.DEFAULT";    
   
    private Destination destination = null;    
   
    private Connection connection = null;    
   
    private Session session = null;    
   
    private MessageConsumer consumer = null;    
   
    // 初始化    
    private void initialize() throws JMSException, Exception {    
       ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616,tcp://172.16.30.11:61617)"); 
        connection = connectionFactory.createConnection();    
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);   
        destination = session.createQueue(subject);    
        consumer = session.createConsumer(destination);    
            
    }    
   
    // 消费消息       
    public void consumeMessage() throws JMSException, Exception {    
        initialize();    
        connection.start();    
            
        System.out.println("Consumer:->Begin listening...");    
        // 
        consumer.setMessageListener(this);    
        // Message message = consumer.receive();    
    }    
   
    // 关闭连接   
    public void close() throws JMSException {    
        System.out.println("Consumer:->Closing connection");    
        if (consumer != null)    
            consumer.close();    
        if (session != null)    
            session.close();    
        if (connection != null)    
            connection.close();    
    }    
   
    // 消息处理函数  
    public void onMessage(Message message) {    
        try {    
            if (message instanceof TextMessage) {    
                TextMessage txtMsg = (TextMessage) message;    
                String msg = txtMsg.getText();    
                System.out.println("Consumer:->Received: " + msg);    
            } else {    
                System.out.println("Consumer:->Received: " + message);    
            }    
        } catch (JMSException e) {    
            // TODO Auto-generated catch block    
            e.printStackTrace();    
        }    
    }    
}

2.Consumer:

import javax.jms.Connection; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.Session; 
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory; 
   
public class ConsumerTool implements MessageListener {      
   
    private String subject = "TOOL.DEFAULT";    
   
    private Destination destination = null;    
   
    private Connection connection = null;    
   
    private Session session = null;    
   
    private MessageConsumer consumer = null;    
   
    // 初始化    
    private void initialize() throws JMSException, Exception {    
       ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616,tcp://172.16.30.11:61617)"); 
        connection = connectionFactory.createConnection();    
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);   
        destination = session.createQueue(subject);    
        consumer = session.createConsumer(destination);    
            
    }    
   
    // 消费消息       
    public void consumeMessage() throws JMSException, Exception {    
        initialize();    
        connection.start();    
            
        System.out.println("Consumer:->Begin listening...");    
        // 
        consumer.setMessageListener(this);    
        // Message message = consumer.receive();    
    }    
   
    // 关闭连接   
    public void close() throws JMSException {    
        System.out.println("Consumer:->Closing connection");    
        if (consumer != null)    
            consumer.close();    
        if (session != null)    
            session.close();    
        if (connection != null)    
            connection.close();    
    }    
   
    // 消息处理函数  
    public void onMessage(Message message) {    
        try {    
            if (message instanceof TextMessage) {    
                TextMessage txtMsg = (TextMessage) message;    
                String msg = txtMsg.getText();    
                System.out.println("Consumer:->Received: " + msg);    
            } else {    
                System.out.println("Consumer:->Received: " + message);    
            }    
        } catch (JMSException e) {    
            // TODO Auto-generated catch block    
            e.printStackTrace();    
        }    
    }    
}

3.Main

import javax.jms.JMSException; 
   
public class Test {    
   
    /**   
     * @param args   
     */   
    public static void main(String[] args) throws JMSException, Exception { 
    
        
        ConsumerTool consumer = new ConsumerTool();    
        ProducerTool producer = new ProducerTool();    
        // 开始监听    
        consumer.consumeMessage();    
            
        // 延时500毫秒之后发送消息    
        Thread.sleep(500);    
        producer.produceMessage("Hello, world!");    
        producer.close();    
            
        // 延时500毫秒之后停止接受消息    
        Thread.sleep(500);    
        consumer.close();    
    
    }    
}

ActiveMQ A 启动界面:

ActiveMQ B 启动界面:

AMQ A先启动,先锁文件,当AMQ B启动是,不能锁文件,但会不断的监听等待。

运行Java Test程序日志:

10:22:43.745 INFO  [] org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://172.16.30.11:61616 
Consumer:->Begin listening... 
10:22:45.623 INFO  [] org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://172.16.30.11:61616?wireFormat.maxInactivityDuration=0 
Producer:->Sending message: Hello, world! 
Producer:->Message sent complete! 
Producer:->Closing connection 
Consumer:->Received: Hello, world! 
Consumer:->Closing connection

ActiveMQ A 管理界面:

异常处理:

配置好ActiveMQ后,前几次都启动成功。有一天启动时发现启动不成功,查看报错日志发现出现如下提示: 
Failed to start Apache ActiveMQ (localhost, ID:*-PC-*-*-0:1). Reason: java.io.IOException: Transport Connector could not be registered in JMX: Failed to bind to server socket: tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600 due to: java.net.BindException: Address already in use: JVM_Bind。

1.先去查看是不是端口被占用,用netstat -ano命令查看端口使用情况,发现没有端口被占用。 
2.在控制面板的服务里把正在运行的Internet Connection Sharing (ICS)为家庭和小型办公网络提供网络地址转换、寻址、名称解析和/或入侵保护服务关了,他占用着端口。 
3.把此服务关了后再启动ActvieMQ成功了。

时间: 2024-10-10 20:38:43

ActiveMQ Cluster (ActiveMQ 集群) 配置的相关文章

Activemq 安装与集群配置

1. 新建文件夹activemq/server mkdir  server 2.授权 chmod 777 server 3.下载activeMQ安装包,拷贝到/activemq/server目录下 apache-activemq-5.9.0-bin.tar.gz,下载地址: http://activemq.apache.org/download.html 4.解压文件到运行目录/activemq/server tar -xzvf  apache-activemq-5.9.0-bin.tar.gz

Haproxy+mysql cluster( MySQL 集群) 配置

一.准备 1.准备服务器 建立有2个节点的MySQL CLuster体系,使用6台服务器建立Haproxy+mysql cluster( MySQL 集群) 体系 节点配置说明 节点 对应的IP和端口 Haproxy负载均衡(1个) centos 6.3 1.1.1.11  管理节点(1个) centos 6.3  1.1.1.30  SQL节点 (2个) centos 6.3  1.1.1.21  1.1.1.22  数据节点 (2个) centos 6.3  1.1.1.31  1.1.1.

基于zookeeper的activemq的主从集群配置

项目,要用到消息队列,这里采用activemq,相对使用简单点.这里重点是环境部署. 0. 服务器环境 RedHat710.90.7.210.90.7.1010.90.2.102 1. 下载安装zookeeper 地址:https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.3.6/zookeeper-3.3.6.tar.gz zookeeper的安装,采用一台机器装3个实例,伪集群.其实,搭建真集群,也是没问题的.在7

ActiveMQ 高可用集群安装、配置(ZooKeeper + LevelDB)

1.ActiveMQ 集群部署规划: 环境: JDK7 版本:ActiveMQ 5.11.1 ZooKeeper 集群环境:10.14.0.1:2181,10.14.0.2:2182,10.14.0.3:2183(ZooKeeper 集群部署请参考<ZooKeeper 集群的安装.配置>) 主机 集群端口 消息端口 管控台端口 节点安装目录 192.168.1.11 63631 53531 8361 /opt/aijia/activemq/node-01 192.168.1.12 63632

[转]ActiveMQ的几种集群配置

ActiveMQ是一款功能强大的消息服务器,它支持许多种开发语言,例如Java, C, C++, C#等等.企业级消息服务器无论对服务器稳定性还是速度,要求都很高,而ActiveMQ的分布式集群则能很好的满足这一需求,下面说说ActiveMQ的几种集群配置. Queue consumer clusters 此集群让多个消费者同时消费一个队列,若某个消费者出问题无法消费信息,则未消费掉的消息将被发给其他正常的消费者,结构图如下: Broker clusters 此种配置是一个消费者连接到多个bro

ActiveMQ的几种集群配置

ActiveMQ是一款功能强大的消息服务器,它支持许多种开发语言,例如Java, C, C++, C#等等.企业级消息服务器无论对服务器稳定性还是速度,要求都很高,而ActiveMQ的分布式集群则能很好的满足这一需求,下面说说ActiveMQ的几种集群配置. Queue consumer clusters 此集群让多个消费者同时消费一个队列,若某个消费者出问题无法消费信息,则未消费掉的消息将被发给其他正常的消费者,结构图如下: Broker clusters 此种配置是一个消费者连接到多个bro

ActiveMq+zookeeper+levelDB集群整合配置

ActiveMq+zookeeper+levelDB集群整合配置 环境:linux系统,jdk1.7  三台linux系统电脑.我这里使用一台window,分别远程3台linux电脑.三台电脑的ip分别为10.0.88.10 ,10.0.88.11 ,10.0.88.12 第一步:下载activemq 和zookeeper,levelDB会activemq自带有,所以不需要下载,把下载好的压缩包发送到所有的linux上,(我这里是把下载好的压缩包放在当前window下的tomcat,root目录

基于Keepalived构建高可用集群配置实例(HA Cluster)

什么是集群 简单的讲集群(cluster)就是一组计算机,它们作为一个整体向用户提供一组网络资源.这些单个的计算机系统就是集群的节点(node).一个理想的集群是,用户从来不会意识到集群系统底层的节点,在他/她们看来,集群是一个系统,而非多个计算机系统.并且集群系统的管理员可以随意增加和删改集群系统的节点. 关于更详细的高可用集群我们在后面再做详解,先来说说Keepalived Keepalived是什么 Keepalived是集群管理中保证集群高可用的一个服务软件,其功能类似于heartbea

ELK5.3+Kafka集群配置

[一]资源准备 # 3台4C*8G, 安装Zookeeper.Kafka.Logstash--Broker(input: filebeat; output: Kafka) 10.101.2.23 10.101.2.24 10.101.2.25 # 2台4C*8G, 安装Logstash--Indexer(input: Kafaka; output: Elasticsearch) 10.101.2.26 10.101.2.27 # 3台8C*16G, 安装Elasticsearch 10.101.

MySQL Cluster(MySQL 集群) 初试(转)

作/译者:叶金荣(imysql#imysql.com>),来源:http://imysql.com,欢迎转载. 作/译者:叶金荣(Email: ),来源:http://imysql.cn,转载请注明作/译者和出处,并且不能用于商业用途,违者必究. MySQL Cluster 是MySQL适合于分布式计算环境的高实用.高冗余版本.它采用了NDB Cluster 存储引擎,允许在1个 Cluster 中运行多个MySQL服务器.在MyQL 5.0及以上的二进制版本中.以及与最新的Linux版本兼容的