ActiveMQ(七)——ActiveMQ的Network

一、在一台服务器上启动多个Broker

  • 步骤如下(为集群做准备):
    1:把整个conf文件夹复制一份,比如叫做conf2
    2:修改里面的activemq.xml文件
    (1)里面的brokerName不能跟原来的重复
    (2)数据存放的文件名称不能重复,比如:
    <hahaDB directory=”${activemq.data}/kahadb_2”/>
    (3)所有设计的transportConnectors的端口,都要跟前面的不一样
    3:修改jetty.xml,主要就是修改端口,比如:
    <property name=”port” value=”8181”/>端口必须和前面的不一样
    4:到bin下面,复制一个activemq,比如叫做activemq2;
    (1)修改程序的id,不能和前面的重复
    ACTIVEMQ_PIDFILE=”$ACTIVEMQ_DATA/activemq2-hostname.pid”
    (2)修改配置文件路径
    ACTIVEMQ_CONF=”$ACTIVEMQ_BASE/conf2”
    (3)修改端口,里面有个tcp的61616的端口,要改成不一样的,最好跟 activemq.xml里面的ctp的端口一致
    (4)然后就可以执行了。

二、ActiveMQ的静态网络链接

  • ActiveMQ的networkConnector是什么
    在某些场景下,需要多个ActiveMQ的Broker做集群,那么久涉及到Broker的通信,这个被称为ActiveMQ的networkConnector。
        ActiveMQ的networkConnector默认是单向的,一个Broker在一端发送消息,另一个Broker在另一端接收消息。这就是所谓的“桥接”。ActiveMQ也支持双向链接,创建一个双向的通道对于两个Broker,不仅发送消息而且也能从相同的通道来接收消息,通常作为duplex connector来映射,如下:
  • “discovery”的概念
        一般情况下,discovery是被用来发现远程的服务,客户端通常想去发现所有可利用的brokers;另一层意思,它是基于现有的网络Broker去发现其他可用的Brokers。
        有两种配置Client到Broker的链接方式,一种方式:Client通过Statically配置的方式去连接Broker,另一种方式:Client通过discovery agents来dynamically发现Brokers
  • Static networks
        Static networkConnector是用于创建一个静态的配置对于网络中的多个Broker。这种协议用于复合url,一个复合url包括多个url地址。格式如下:
    static:(uri1,uri2,uri3,...)?Key=value
    1:配置示例如下(activemq.xml--注释掉persistenceFactory节点):
    <networkConnectors>
    <networkConnector name="local network" uri="static://(tcp://remotehost1:61616,tcp://remotehost2:61616)"/>
    </networkConnectors>
  • Static networkConnector的基本原来示意图:

        上图中,两个Brokers是通过一个static的协议来网络链接的。一个Consumer链接到BrokerB的一个地址上,当Producer在BrokerA上以相同的地址发送消息时,此时它将被转移到BrokerB上。也就是,BrokerA会转发消息到BrokerB上。
  • networkConnector配置的可用属性
    1、name:默认是bridge
    2、dynamicOnly:默认是false,如果为true,持久订阅被激活时才创建对应的网络持久订阅。默认是启动时激活
    3、decreaseNetworkConsumerPriority:默认是false。设定消费者优先权,如果为true,网络的消费者优先级降低为-5。如果为false,则默认跟本地消费者一样为0
    4、networkTTL:默认是1,网络中用于消息和订阅消费的broker数量
    5、messageTTL:默认是1,网络中用于消息的broker数量
    6、consumerTTL:默认是1,网络中用于消费的broker数量
    7、conduitSubscriptions:默认true,是否把同一个broker的多个consumer当做一个来处理
    8、dynamicallyIncludedDestinations:默认为空,要包括的动态消息地址,类似于excludedDestinations,如
    <dynamicallyIncludedDestinations>
    <queue physicalName="include.test.foo"/>
    <topic physicalName="include.test.bar"/>
    </dynamicallyIncludedDestinations>

    9、staticallyIncludedDestinations:默认为空,要包括的静态消息地址。类似于excludedDestinations,如:

    <staticallyIncludedDestinations>
    <queue physicalName="always.include.queue"/>
    </staticallyIncludedDestinations>

    10、excludedDestinations:默认为空,指定排除的地址,示例如下:

    11、duplex:默认false,设置是否能双向通信
    12、prefetchSize:默认是1000,持有的未确认的最大消息数量,必须大于0,因为网络消费者不能自己轮询消息
    13、suppressDuplicateQueueSubscriptions:默认false,如果为true,重复的订阅关系一产生即被阻止
    14、bridgeTempDestinations:默认true,是否广播advisory messages来创建临时destination
    15、alwaysSyncSend:默认false,如果为true,非持久化消息也将使用request/reply方式代替oneway方式发送到远程broker
    16、staticBridge:默认false,如果为true,只有staticallyIncludedDestinations中配置的destination可以被处理

三、“丢失”的消息
     存在这样的场景,broker1和broker2通过networkConnector连接,一些consumers连接到broker1,消费broker2上的消息。消息先被broker1从broke2上消费掉,然后转发给这些consumers。不幸的是转发部分消息的时候broker1重启了,这些consumers发现broker1连接失败,通过failover连接到broker2上去了,但是又一部分他们还没有消费的消息被broker2已经分发到broker1上去了。这些消息,就好像是消失了,除非有消费者重新连接到broker1上来消费。
     从5.6版起,在destinationPolicy上新增的选项replayWhenNoConsumers。这个选项使得broker1上有需要转发的消息但是没有消费者时,把消息回流到它原始的broker。同时把enableAudit设置为false,为了防止消息回流后被当做重复消息而不被分发,示例如下:

<destinationPolicy>
    <policyMap>
        <policyEntry queue=">" enableAudit="false">
            <networkBridgeFilterFactory>
                <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
            </networkBridgeFilterFactory>
        </policyEntry>
    </policyMap>
</destinationPolicy>

四、容错的连接

  • Failover Protocol
        之前的都是Client配置链接到指定的broker上。但是,如果Broker的链接失败怎么办?此时,Client有两个选项:要么立刻死掉,要么去连接到其它的broker上。
        Failover协议实现了自动重新链接的逻辑。这里有两种方式提供了稳定的brokers列表对于Client链接。
    第一种:提供一个static的可用的Brokers列表
    第二种:提供一个dynamic发现的可用Brokers
  • Failover Protocol的默认方式
    failover:(uri1,...,uriN)?key=value或者failover:uri1,...,uriN
    & Failover Protocol的默认配置
    默认情况下,这种协议用于随机的去选择一个链接去链接,如果链接失败了,那么会链接到其它的Broker上。默认的配置定义了延迟重新链接,意味着传输将会在10秒后自动的去重新链接可用的broker。所有的重新链接参数都是可以根据应用的需要而配置的。
  • Failover Protocol的使用示例,在客户端程序里面
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory   ("failover:(tcp://localhost:61616,tcp://localhost:61617)
    ?randomize=false");
  • Failover Protocol可用的配置参数:
    1、initialReconnectDelay:第一次尝试重连之前等待的时间(毫秒)默认10
    2、maxReconnectDelay:最长重连的时间间隔(毫秒),默认30000
    3、useExponentialBackOff:重连时间间隔是否以指数形式增长,默认true
    4、backOffMultiplier:递增倍数,默认2.0
    5、maxReconnectAttempts:默认-1|0,自版本5.6起,-1为默认值,代表不限重连次数;0代表从不重试(只尝试连接一次,并不重连),5.6版本之前,0为默认值,代表不限重试次数;大于0的数,代表最大重试次数。
    6、startupMaxReconnectAttempts:初始化时的最大重连次数。一旦连接上,将使用maxReconnectAttempts的配置,默认0
    7、Randomize:使用随机连接,以达到负载均衡的目的,默认true
    8、Backup:提前初始化一个未使用连接,以便进行快速失败转移,默认false
    9、timeout:设置发送操作的超时时间(毫秒),默认-1
    10、trackMessages:设置是否缓存[故障发生时]尚未传送完成的消息,当broker一旦重新连接成功,便将这些缓存中的消息刷新到新连接的代理中,使得消息可以在broker切换前后顺序传送,默认false
    11、maxCacheSize:当trackMessages启用时,缓存的最大字节,默认为128*1024 字节
    12、updateURIsSupported:设定是否可以动态修改broker rui(自版本5.4起),默认true

五、动态网络连接(纯理论)
多播协议multicast
    ActiveMQ使用Multicast协议将一个Service和其他的Broker的Service连接起来。IP multicast是一个被用于网络中传输数据到其他一组接收者的技术。
    IP multicast传统的概念成为组地址。组地址是ip地址在224.0.0.0到239.255.255.255之间的ip地址。ActiveMQ broker使用multicast协议去建立服务与远程的broker的服务的网络连接。

  • 基本的格式配置
    multicast://ip address:port?transportOptions
    1、group:表示唯一的组名称,缺省值default
    2、minmumWireFormatVersion:被允许的最小的wireformat版本,缺省为0
    3、trace:是否追踪记录日志,默认false
    4、useLocalHost:表示本地机器的名称是否为localhost,默认true
    5、datagramSize:特定的数据大小,默认值41024
    6、timeToLive:消息的声明周期值,默认值-1
    7、loopBackMode:是否启用loopback模式,默认false
    8、wireFormat:默认用wireFormat命名
    9、wireFormat.
    :前缀是wireFormat
  • 配置示例
    1:默认配置,默认情况下是不可靠的多播,数据包可能会丢失
    multicast://default
    2:特定的ip和端口
    multicast://224.1.2.3:6255
    3:特定的ip和端口以及组名
    multicast://224.1.2.3:6255?group=mygroupname
  • ActiveMQ使用multicast协议的配置格式如下:
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="multicast" dataDirectory="${activemq.data}/data">
    <networkConnectors>
        <networkConnector name="default-nc" uri="multicast://default"/>
    </networkConnectors>
    <transportConnectors>
        <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
    </transportConnectors>
    </broker>

    说明:
    1:uri="multicast://default"中的default是activemq默认的ip,默认动态 的寻找地址
    2:"discoveryUri"是指在transport中用multicast的default的地址传递
    3:"uri"指动态寻找可利用的地址
    4:如果防止自动的寻找地址?
    (1)名称为openwire的transport,移除discoveryUri="multicast:
    //default"即可。传输连接用默认的名称openwire来配置broker的tcp多点连接,这将允许其他broker能够自动发现和连接到可用的broker中。
    (2)名称为"default-nc"的networkConnector,注释掉或者删除即可。
    ActiveMQ默认的networkConnector基于multicast协议的连接的默认名称是default-nc,而且自动的去发现其他broker。去停止这种行为,只需要注销或者删除掉default-nc网络连接。
    (3)使brokerName的名字唯一,可以唯一识别Broker的实例,默认是localhost

  • multicast协议和普通的tcp协议
    它们是差不多的,不同的是multicast能够自动的发现其他broker,从而替代了使用static功能列表brokers。用multicast协议可以在网络中频繁的添加和删除ip不会有影响。
    multicast协议
    好处:能够适应动态变化的地址。
    缺点:自动连接地址会过渡的消耗网络资源

Discovery协议
    Discovery是在multicast协议的功能上定义的。功能类似与Failover功能。它将动态的发现multicast协议的broker的连接并且随机的连接其中一个broker。

  • 基本配置如下:
    discovery:(discoveryAgentURI)?transportOptions
    1、reconnectDelay:再次寻址等待时间,缺省值10
    2、initialReconnectDelay:初始化设定再次寻址等待时间,缺省值10
    3、maxReconnectDelay:最大寻址等待时间,缺省值30000
    4、useExponentialBackOff:是否尝试BackOff重连接,默认是true
    5、backOffMultiplier:尝试Backoff的次数,默认是2
    6、maxReconnectAttempts:如果异常,最大的重新连接个数,默认是0
    7、Group:组唯一的地址,默认是default
    示例:
    discovery:(multicast://default)?initialReconnectDelay=100
  • Discovery协议的配置示例
    <broker name="foo">
    <transportConnectors>
        <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
    </transportConnectors>
    </broker>   

Peer协议
    ActiveMQ提供了peer transport connector提供了更加容易的去嵌入broker网络中。它创建一个优于vm连接的p2p网络连接。默认格式如下:
peer://peergroup/brokerName?Key=value

  • Peer协议的基本使用
        当我们启用了peer协议时,应用将自动的启动内嵌broker,也将会自动的去配置其它broker来建立连接,前提是必须要有一个组。配置如下:
    peer://groupa/broker1?Persistent=false
        另外,生产者和消费者都各自连接到嵌入到自己应用的broker,并且在本地的同一个组名中相互访问数据。
  • Peer协议的基本原理

    在本地机器断网的情况下,本地的client访问本地brokerA将依然正常。
    在断网的情况下发送消息到本地brokerA,然后网路连接正常后,所有的消息将重新发送并连接到brokerB。
    Fanout协议

Fanout协议是同时连接到多个broker,默认的格式如下:
    fanout:(fanoutURI)?key=value
示例:fanout:(static:(tcp://host1:61616,tcp://host2:61616))
表示client将试图连接到两个static列表中定义的三个URI

  • Fanout协议的配置参数如下:
    1、initialReconnectDelay:重新连接的等待时间,默认是10
    2、maxReconnectDelay:最大重新连接的等待时间,默认是30000
    3、useExponentialBackOff:是否尝试BackOff重连接,默认是true
    4、backOffMultiplier:尝试Backoff的次数,默认是2
    5、maxReconnectAttempts:如果异常,最大的重新连接个数,默认是0
    6、fanOutQueues:是否将topic消息转换queue消息,默认false
    7、minAckCount:Broker连接的最小数,默认是2
    配置示例:
    fanout:(static:(tcp://localhost:61616,tcp://remotehost:61616))?
    initialReconnectDelay=100
    注意:
    ActiveMQ并不推荐Consumer使用fanout协议。当Provider发送消息到多个broker中,测试Consumer可能受到重复的消息。

原文地址:https://blog.51cto.com/mazongfei/2415600

时间: 2024-10-10 20:38:36

ActiveMQ(七)——ActiveMQ的Network的相关文章

【分布式系列之ActiveMq】ActiveMq入门示例

前言 github地址:https://github.com/AndyFlower/web-back/tree/master/ActiveMq01 下载ActiveMQ :http://activemq.apache.org/download.html 放到自己的目录,大致目录如下: bin存放的是脚本文件 conf存放的是基本配置文件 data存放的是日志文件 docs存放的是说明文档 examples存放的是简单的实例 lib存放的是activemq所需jar包 webapps用于存放项目的

JMS学习(七)-ActiveMQ消息的持久存储方式之KahaDB存储

一,介绍 自ActiveMQ5.4以来,KahaDB成为了ActiveMQ默认的持久化存储方式.相比于原来的AMQ存储方式,官方宣称KahaDB使用了更少的文件描述符,并且提供了更快的存储恢复机制. 二,KahaDB存储配置 在 conf/activemq.xml 中配置如下: <broker brokerName="broker" ... > <persistenceAdapter> <kahaDB directory="activemq-da

ActiveMQ(七)_伪集群和主从高可用使用

一.本文目的 介绍如何在同一台虚拟机上搭建高可用的Activemq服务,集群数量包含3个Activemq,当Activemq可用数>=2时,整个集群可用. 本文Activemq的集群数量为3个,分别命名为mq1,mq2,mq3   二.概念介绍 1.伪集群 集群搭建在同一台虚拟机上,3个Activemq分别使用不同的端口提供服务,启用1个为Master,其它2个为Slaver,同一时间仅Master队列提供服务 2.高可用 3个Activemq服务,同一时间仅Master队列提供服务,当Mast

学习ActiveMQ(七):JMS消息的事务管理

Spring提供了一个JmsTransactionManager用于对JMS ConnectionFactory做事务管理.这将允许JMS应用利用Spring的事务管理特性.JmsTransactionManager在执行本地资源事务管理时将从指定的ConnectionFactory绑定一个ConnectionFactory/Session这样的配对到线程中.JmsTemplate会自动检测这样的事务资源,并对它们进行相应操作. 在Spring整合JMS的应用中,如果我们要进行本地的事务管理的话

[ActiveMQ]初识ActiveMQ

初识ActiveMQ ActiveMQ介绍 官方网站:http://activemq.apache.org/ 最新版本:ActiveMQ 5.14.1(2016-10-28) 最新版本下载链接:http://activemq.apache.org/download.html 历史版本下载链接:http://archive.apache.org/dist/activemq/ Getting Started Guide链接:http://activemq.apache.org/getting-sta

activemq下activemq.bat不能启动

今天下载了一个apache-activemq-5.5.0-bin.rar解压缩后双击/bin目录下的activemq.bat批处理文件发现启动窗口一闪而过无法启动,最后找到原因是因为在环境变量-系统变量中classpath.JAVA_HOME.Path这些系统变量中没有添加jdk相关的安装路径. classpath下添加“.;%JAVA_HOME%\lib;%JAVA_HOME%\lib\tools.jar” JAVA_HOME下添加 “C:\Program Files\Java\jdk1.6.

ActiveMQ Cluster (ActiveMQ 集群) 配置

构建高可用的ActiveMQ系统在生产环境中是非常重要的,对于这个apache的消息中间件实现高可用非常简单,只要在Apache ActiveMQ单点基本配置基础上做一次配置变更(如果在一台设备上部署多个AMQ,需要修改对应端口号),即可实现 AMQ实现高可用部署有三种方案: 1.Master-Slave 2.SharedFile System Master Slave 3.JDBCMaster Slave 第一种方案由于只可以由两个AMQ实例组件,实际应用场景并不广泛: 第三种方案支持N个AM

ActiveMQ(2)---ActiveMQ原理分析之消息发送

持久化消息和非持久化消息的发送策略 消息同步发送和异步发送 ActiveMQ支持同步.异步两种发送模式将消息发送到broker上.同步发送过程中,发送者发送一条消息会阻塞直到broker反馈一个确认消息,表示消息已经被broker处理.这个机制提供了消息的安全性保障,但是由于是阻塞的操作,会影响到客户端消息发送的性能 异步发送的过程中,发送者不需要等待broker提供反馈,所以性能相对较高.但是可能会出现消息丢失的情况.所以使用异步发送的前提是在某些情况下允许出现数据丢失的情况. 默认情况下,非

ActiveMQ(3)---ActiveMQ原理分析之消息持久化

持久化消息和非持久化消息的存储原理 正常情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的.能够存储的最大消息数据在${ActiveMQ_HOME}/conf/activemq.xml文件中的systemUsage节点 ,SystemUsage配置设置了一些系统内存和硬盘容量 <systemUsage> <systemUsage> <memoryUsage> //该子标记设置整个ActiveMQ节点的“可用内存限制”.这个值不能超过ActiveMQ本身设置

杭电多校第七场 Traffic Network in Numazu

Problem Description Chika is elected mayor of Numazu. She needs to manage the traffic in this city. To manage the traffic is too hard for her. So she needs your help. You are given the map of the city -- an undirected connected weighted graph with N