[转]ActiveMQ的几种集群配置

ActiveMQ是一款功能强大的消息服务器,它支持许多种开发语言,例如Java, C, C++, C#等等。企业级消息服务器无论对服务器稳定性还是速度,要求都很高,而ActiveMQ的分布式集群则能很好的满足这一需求,下面说说ActiveMQ的几种集群配置。

Queue consumer clusters

此集群让多个消费者同时消费一个队列,若某个消费者出问题无法消费信息,则未消费掉的消息将被发给其他正常的消费者,结构图如下:

Broker clusters

此种配置是一个消费者连接到多个broker集群的中的一个broker,当该broker出问题时,消费者自动连接到其他一个正常的broker。消费者使用 failover:// 协议来连接broker。

failover:(tcp://localhost:61616,tcp://localhost:61617)

failover官网介绍 http://activemq.apache.org/failover-transport-reference.html

broker之间的通过静态发现(static discovery)和动态发现(dynamic discovery)来维持彼此发现,下面来介绍静态发现和动态发现的机制:

静态发现:

静态发现通过配置固定的broker uri来发现彼此,配置语法如下:

static:(uri1,uri2,uri3,...)?options

例如:

static:(tcp://localhost:61616,tcp://remotehost:61617?trace=false,vm://localbroker)?initialReconnectDelay=100

  

更多静态发现介绍,见ActiveMQ官网 http://activemq.apache.org/static-transport-reference.html

动态发现:

动态发现机制是在各个broker启动时通过Fanout transport来发现彼此,配置举例如下:

 <broker name="foo">
   <transportConnectors>
     <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
   </transportConnectors>
   ...
 </broker>

更多动态发现机制介绍,见官网 http://activemq.apache.org/discovery-transport-reference.html

Networks of brokers

多个broker组成集群,当其中一个broker的消费者出问题导致消息堆积无法消费掉时,通过ActiveMQ支持的Network of Broker方案可将该broker堆积的消息转发到其他有消费者的broker。

该方案主要有以下两种配置方式:

1、为broker配置文件配置networkConnector元素

2、使用发现机制互相探测broker

Here is an example of using the fixed list of URIs:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://activemq.org/config/1.0">

  <broker brokerName="receiver" persistent="false" useJmx="false">
    <networkConnectors>
      <!-- Static discovery -->
      <networkConnector uri="static:(tcp://localhost:62001)"/>
      <!-- MasterSlave Discovery -->
      <!--<networkConnector uri="masterslave:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/> -->
    </networkConnectors>

    <persistenceAdapter>
      <memoryPersistenceAdapter/>
    </persistenceAdapter>

   <transportConnectors>
      <transportConnector uri="tcp://localhost:62002"/>
    </transportConnectors>
  </broker>

</beans>

  

This example uses multicast discovery:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://activemq.org/config/1.0">

  <broker name="sender" persistent="false" useJmx="false">
    <networkConnectors>
      <networkConnector uri="multicast://default"/>
    </networkConnectors>

    <persistenceAdapter>
      <memoryPersistenceAdapter/>
    </persistenceAdapter>

  <transportConnectors>
      <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
    </transportConnectors>
  </broker>

</beans>

  

Master Slave

通过部署多个broker实例,一个master和多个slave关系的broker来达到高可用性,有三种方案:

1、Master-Slave
2、SharedFile System Master Slave
3、JDBCMaster Slave

第一种方案由于只可以由两个AMQ实例组件,实际应用场景并不广泛;
第三种方案支持N个AMQ实例组网,但他的性能会受限于数据库;
第二种方案同样支持N个AMQ实例组网,基于kahadb存储策略,亦可以部署在分布式文件系统上,应用灵活、高效且安全。

Master Slave方案当其中一个broker启动并拿到独占锁时自动成为master,其他后续的broker则一直等待锁,当master宕机释放锁时其他slave拿到独占锁则自动成为master,部署结构如下:

第二种方案的配置只需修改config文件夹下activemq.xml文件,修改消息持久化使用的方案:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="D:/Platform/mq_share_file">
  ...
    <persistenceAdapter>
            <kahaDB directory="D:/Platform/mq_share_file/kahadb" enableIndexWriteAsync="true" enableJournalDiskSyncs="false"/>
    </persistenceAdapter>
    ...
</broker>

消息生产者代码:

public class P2PSender {
    private static final String QUEUE = "client1-to-client2";

    public static void main(String[] args) {
        // ConnectionFactory :连接工厂,JMS用它创建连接
        ConnectionFactory connectionFactory;
        // Connection :JMS客户端到JMS Provider的连接
        Connection connection = null;
        // Session:一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // MessageProducer:消息发送者
        MessageProducer producer;
        // TextMessage message;
        // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现
        connectionFactory = new ActiveMQConnectionFactory(
                "failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)");
        try {
            // 构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue(QUEUE);
            // 获取session,FirstQueue是一个服务器的queue destination = session.createQueue("FirstQueue");
            // 得到消息生成者【发送者】
            producer = session.createProducer(destination);
            // 设置不持久化
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            // 构造消息
            sendMessage(session, producer);
            // session.commit();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null != connection) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void sendMessage(Session session, MessageProducer producer) throws Exception {
        for (int i = 1; i <= 1; i++) {
            Date d = new Date();
            TextMessage message = session.createTextMessage("ActiveMQ发送消息" + i + "  " + new Date());
            System.out.println("发送消息:ActiveMQ发送的消息" + i + "  " + new Date());
            producer.send(message);
        }
    }
}

消息消费者代码:

public class P2PReceiver {
    private static final String QUEUE = "client1-to-client2";

    public static void main(String[] args) {
        // ConnectionFactory :连接工厂,JMS用它创建连接
        ConnectionFactory connectionFactory;
        // Connection :JMS客户端到JMS Provider的连接
        Connection connection = null;
        // Session:一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // 消费者,消息接收者
        MessageConsumer consumer;
        connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)");
        try {
            // 得到连接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建Queue
            destination = session.createQueue(QUEUE);
            consumer = session.createConsumer(destination);
            while (true) {
                TextMessage message = (TextMessage) consumer.receive();
                if (null != message) {
                    System.out.println("收到消息" + message.getText());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
}

  

转自 ActiveMQ的几种集群配置

时间: 2024-10-29 19:11:35

[转]ActiveMQ的几种集群配置的相关文章

ActiveMQ的几种集群配置

ActiveMQ是一款功能强大的消息服务器,它支持许多种开发语言,例如Java, C, C++, C#等等.企业级消息服务器无论对服务器稳定性还是速度,要求都很高,而ActiveMQ的分布式集群则能很好的满足这一需求,下面说说ActiveMQ的几种集群配置. Queue consumer clusters 此集群让多个消费者同时消费一个队列,若某个消费者出问题无法消费信息,则未消费掉的消息将被发给其他正常的消费者,结构图如下: Broker clusters 此种配置是一个消费者连接到多个bro

Activemq 安装与集群配置

1. 新建文件夹activemq/server mkdir  server 2.授权 chmod 777 server 3.下载activeMQ安装包,拷贝到/activemq/server目录下 apache-activemq-5.9.0-bin.tar.gz,下载地址: http://activemq.apache.org/download.html 4.解压文件到运行目录/activemq/server tar -xzvf  apache-activemq-5.9.0-bin.tar.gz

activemq+Zookeper高可用集群方案配置

在高并发.对稳定性要求极高的系统中,高可用的是必不可少的,当然ActiveMQ也有自己的集群方案.从ActiveMQ 5.9开始,ActiveMQ的集群实现方式取消了传统的Master-Slave方式,增加了基于ZooKeeper + LevelDB 的 Master-Slave 实现方式. 相关文章:范例项目: http://wosyingjun.iteye.com/blog/2312553 ActiveMQ的简单实用:http://wosyingjun.iteye.com/blog/2314

java:redis(redis的集群配置)

服务器集群作用: 服务器集群就是指将很多服务器集中起来一起进行同一种服务,在客户端看来就象是只有一个服务器 集群可以利用多个计算机进行并行计算从而获得很高的计算速度,也可以用多个计算机做备份,从而使得任何一个机器坏了整个系统还是能正常运行.一旦在服务器上安装并运行了群集服务,该服务器即可加入群集.群集化操作可以减少单点故障数量,并且实现了群集化资源的高可用性. redis的集群配置: (.编辑network文件 HOSTNAME=redis(自己定义的hostname) vi /etc/sysc

nginx+tomcat集群配置(1)---根目录设定和多后端分发配置

前言: 对于javaer而言, nginx+tomcat集群配置, 已然成了web应用部署的主流. 大公司如此, 小公司亦然. 对于个人开发者而言, 资源有限, 往往多个web应用混部于一台服务器(云主机), 如何隔离访问这些服务资源? 彼此又不影响呢? nginx来为你排忧解难, ^_^. 本文将介绍tomcat的简单配置和部署, 以及nginx作为反向代理, 如何分流. 基础架构: 不成文的约定: 1). 一个tomcat容器部署一个webapp应用实例 2). url根目录访问webapp

Hadoop-2.2.0中文文档——MapReduce 下一代 -——集群配置

目的 这份文档描写叙述了怎样安装.配置和管理从几个节点到有数千个节点的Hadoop集群. 玩的话,你可能想先在单机上安装.(看单节点配置). 准备 从Apache镜像上下载一个Hadoop的稳定版本号. 安装 安装一个Hadoop集群,一般包含分发软件到全部集群中的机器上或者是安装RPMs. 一般地,集群中的一台机器被唯一地设计成NameNode,还有一台机器被设置成ResourceManager.这是master(主). 集群中剩下的机器作为DataNode 和 NodeManager.这些是

MySql 集群配置

MYSQL CLUSTER方案介绍 本文的大致框架来自罗志威.黄川的报告, 在它的基础上进行简化和修改一些bug并且添加了主从复制的章节,最后做出该文档 MySQL Cluster 是MySQL适合于分布式计算环境的高实用.高冗余版本.它采用了NDB Cluster 存储引擎,允许在1个 Cluster 中运行多个MySQL服务器.现在mysql cluster 被独立出来, 作为一个专门的产品进行运营, mysql-server-5.6+ 就不在存在对mysql cluster的支持,需要独立

Quartz集群配置

Quartz集群配置(100%成功) 先看看quartz的持久化基本介绍: 引用 1 大家都清楚quartz最基本的概念就是job,在job内调用具体service完成具体功能,quartz需要把每个job存储起来,方便调度,quartz存储job方式就分三种,我们最常用的也是quartz默认的是RAMJobStore,RAMJobStore顾名思义就是把job的相关信息存储在内存里,如果用spring配置quartz的job信息的话,所有信息是配置在xml里,当spirng context启动

基于Keepalived构建高可用集群配置实例(HA Cluster)

什么是集群 简单的讲集群(cluster)就是一组计算机,它们作为一个整体向用户提供一组网络资源.这些单个的计算机系统就是集群的节点(node).一个理想的集群是,用户从来不会意识到集群系统底层的节点,在他/她们看来,集群是一个系统,而非多个计算机系统.并且集群系统的管理员可以随意增加和删改集群系统的节点. 关于更详细的高可用集群我们在后面再做详解,先来说说Keepalived Keepalived是什么 Keepalived是集群管理中保证集群高可用的一个服务软件,其功能类似于heartbea