ActiveMQ学习第六篇:Destination的特性

Wildcards(通配符)

Wildcars用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展。
??ActiveMQ支持以下三种通配符:

  • ".":用于作为路径上名字间的分隔符
  • ">":用于递归的匹配任何以这个名字开始的Destination(目的地)
  • "*":用于作为路径上任何名字。
    ??举例来说,如有以下两个Destination:
    ??PRICE.COMPUTER.JD.APPLE(苹果电脑在京东上的价格)
    ??PRICE.COMPUTER.TMALL.APPLE(苹果电脑在天猫上的价格)
  1. PRICE.> :匹配任何产品的价格变动
  2. PRICE.COMPUTER.> :匹配任何电脑产品的价格变动
  3. PRICE.COMPUTER.JD.*:匹配任何在京东上的电脑的价格变动
  4. PRICE.COMPUTER.*.APPLE:匹配苹果电脑京东或天猫上的价格变动
// 实例化连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, "failover:(tcp://localhost:61616,tcp://localhost:61626)?randomize=false");
// 通过连接工厂获取连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建session
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
// 创建队列
Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE");
// 创建生产者
MessageProducer messageProducer = session.createProducer(destination);
for (int i = 1; i <= 10; i++) {
    TextMessage textMessage = session.createTextMessage(message);
    messageProducer.send("Mac Air价格:"  + i * 1000);
    System.out.println("发送消息 - " + textMessage.getText());
}
session.commit();
// 实例化连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, "failover:(tcp://localhost:61616,tcp://localhost:61626)?randomize=false");
// 通过连接工厂获取连接
Connection connection = connectionFactory.createConnection();
// 启动连接
connection.start();
// 创建session
Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
// 创建队列
Destination destination = session.createQueue("PRICE.COMPUTER.>");
// 创建消费者
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener(){
  @Override
    public void onMessage(Message message) {
        try {
            System.out.println("收到的消息:" + ((TextMessage) message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
});

通配符中是为消费者服务的。即:通配符只能配置在消费端。

Composite Destinations(组合列队)

组合队列允许用一个虚拟的destination代表多个destinations。这样就可以通过compositedestinations在一个操作中同时向多个queue发送消息。

客户端实现的方式:

? 在composite destinations中,多个destination之间采用“,”分割。例如:

        //创建一个队列
       // Destination destination = session.createQueue("test,test1");
        Queue queue = new ActiveMQQueue("test,test1");
        //创建生产者
//        MessageProducer producer = session.createProducer(destination);
        MessageProducer producer = session.createProducer(queue);

如果你希望使用不同类型的destination,那么需要加上前缀如queue:// 或topic://,例如:

Queue queue = new ActiveMQQueue("test,topic://192.168.100.155::61616");

在xml配置实现的方式:

<destinationInterceptors>
  <virtualDestinationInterceptor>
    <virtualDestinations>
        <!-- 虚拟的queue的名字-->
      <compositeQueue name="MY.QUEUE">
        <forwardTo>
            <!-- 实际发送的名称 -->
          <queue physicalName="my-queue" />
          <queue physicalName="my-queue2" />
        </forwardTo>
      </compositeQueue>
    </virtualDestinations>
  </virtualDestinationInterceptor>
</destinationInterceptors>

使用filtered destinations,在xml配置如下:

<destinationInterceptors>
       <virtualDestinationInterceptor>
              <virtualDestinations>
                     <compositeQueue name="MY.QUEUE">
                            <forwardTo>
                                 <filteredDestination selector="odd='yes'" queue="FOO"/>
                                 <filteredDestination selector="i = 5" topic="BAR" />
                            </forwardTo>
                     </compositeQueue>
              </virtualDestinations>
       </virtualDestinationInterceptor>
</destinationInterceptors>

避免在network连接到broker,出现重复消息:

<networkConnectors>
<networkConnector uri= "static://(tcp://localhost:61616) " >
<excludedDestinations>
    <queue physicalName="Consumer.*VirtualTopic.> " />
</ excludedDestinations>
</ networkConnector>
</ networkConnectors>

在ActiveMQ启动时候就创建Destination

<broker xmlns="http://activemq.apache.org/schema/core">
       <destinations>
              <queue physicalName="FOO.BAR" />
              <queue physicalName="SOME.TOPIC" />
       </destinations>
</broker>

Delete Inactive Destinations (删除无用的队列)

可以通过web控制台或是JMX方式来删除掉,通过配置文件,自动探测无用的队列并删除掉,回收响应资源,配置如下:
? SchedulePeriodForDestinationPurge:设置多长时间检查一次。
? inactiveTimeoutBeforeGC:设置当destination为空后,多长时间被删除,这里是30s,默认为60
? gcInactiveDestinations:设置删除掉不活动队列,默认为false

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulePeriodForDestinationPurge="10000">
       <destinationPolicy>
              <policyMap>
                <policyEntries>
                     <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimeoutBeforeGC="30000" />
                </policyEntries>
              </policyMap>
       </destinationPolicy>
</broker>

Destination options (队列选项)

队列选项是给consumer在JMS规范之外添加的功能特性,通过在队列名称后面使用类似URL的语法添加多个选项。包括:
1:consumer.prefetchSize,consumer持有的未确认最大消息数量,默认值 variable。
2:consumer.maximumPendingMessageLimit:用来控制非持久化的topic在存在慢消费者的情况下,丢弃的数量,默认0。
3:consumer.noLocal :默认false。
4:consumer.dispatchAsync :是否异步分发 ,默认true。
5:consumer.retroactive:是否为回溯消费者 ,默认false。
6:consumer.selector:Jms的Selector,默认null。
7:consumer.exclusive:是否为独占消费者 ,默认false。
8:consumer.priority:设置消费者的优先级,默认0。

queue = new ActiveMQQueue("PRICE.COMPUTER.TMALL.APPLE?consumer.dispatchAsync=true&consumer.prefetchSize=20");
consumer = session.createConsumer(queue);

Visual Destinations

前面也说到了两个虚拟主题,虚拟Destinations和组合Destinations
??ActiveMQ中,topic只有在持久订阅下才是持久化的。持久订阅时,每个持久订阅者,都相当于一个queue的客户端,它会收取所有消息。这种情况下存在两个问题:
??(1) 同一应用内consumer端负载均衡的问题:即同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个consumer都会获取所有消息。queue模式可以解决这个问题,但broker端又不能将消息发送到多个应用端。所以,既要发布订阅,又要让消费者分组,这个功能JMS规范本身是没有的。

(2)同一应用内consumer端failover的问题:由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高。
??Activemq可以实现虚拟的Topic来解决这两个问题。

使用虚拟主题:

??对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.Mobile。示例:

Topic destination = session.createTopic("VirtualTopic.Mobille");

??对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。

??如Consumer.A.VirtualTopic.Mobille,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.Mobille说明是一个名称为B的客户端。可以在同一个应用里使用多个consumer消费此Topic,则可以实现上面两个功能。

??又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。代码示例如下:

public static void main(String[] args) throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.100.155:61616");
        Connection connection = factory.createConnection();
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        //创建虚拟主题,加前缀VirtualTopic
        Topic topic = session.createTopic("VirtualTopic.TestTopic");
        MessageProducer producer = session.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();
        for (int i = 0; i < 30; i++) {
            TextMessage textMessage = session.createTextMessage("topic消息===" + i);
            producer.send(textMessage);
        }
        session.commit();
        connection.close();
    }
 public static void main(String[] args) throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.100.155:61616");
        Connection connection = factory.createConnection();
        connection.start();
        final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        //创建队列
        Destination destination = session.createQueue("Consumer.A.VirtualTopic.TestTopic");
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("Consumer.A.接收到得消息:" + textMessage.getText());
                    session.commit();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
 public static void main(String[] args) throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.100.155:61616");
        Connection connection = factory.createConnection();
        connection.start();
        final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        //创建队列
        Destination destination = session.createQueue("Consumer.B.VirtualTopic.TestTopic");
        final MessageConsumer consumer = session.createConsumer(destination);
        final MessageConsumer messageConsumer = session.createConsumer(destination);
        //模拟多个consumer消费一个queue
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    consumer.setMessageListener(new MessageListener() {
                        @Override
                        public void onMessage(Message message) {
                            TextMessage textMessage = (TextMessage) message;
                            try {
                                System.out.println("Consumer.B-->consumer接收到消息:" + textMessage.getText());
                                session.commit();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    messageConsumer.setMessageListener(new MessageListener() {
                        @Override
                        public void onMessage(Message message) {
                            TextMessage textMessage = (TextMessage) message;
                            try {
                                System.out.println("Consumer.B-->messageConsumer接收到消息:" + textMessage.getText());
                                session.commit();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

在接收消息之前,应该先运行一下consumer客户端,将消费者注册到Broker中。
xml配置:
默认虚拟主题的前缀是: VirtualTopic.>
自定义消费虚拟地址默认格式: Consumer.*.VirtualTopic.>
自定义消费虚拟地址可以改,比如下面的配置就把它修改了。

<destinationInterceptors>
        <virtualDestinationInterceptor>
            <virtualDestinations>
                <virtualTopic name=">" prefix="自定义前缀.*." selectorAware="false"/>
            </virtualDestinations>
        </virtualDestinationInterceptor>
</destinationInterceptors>

Mirrored Queules(镜像队列)

??ActiveMQ中每个queue中的消息只能被一一个consumer消费。然而,有时候你可能希望能够监视生产者和消费者之间的消息流。你可以通过使用Virtual Destinations 来建立一个virtual queue来把消息转发到多个queues中。但是为系统中每个queue都进行如此的配置可能会很麻烦。
??ActiveMQ支持Mirrored Queues。 Broker 会把发送到某个queue的所有消息转发到一个名称类似的topic,因此监控程序只需要订阅这个mirrored queue topic。 为了启用Mirrored Queues,首先要将BrokerService的useMirroredQueues属性设置成true,然后可以通过destinationInterceptors设置其它属性,如mirrortopic的前缀, 缺省是:“VirtualTopic. Mirror."

<destinationInterceptors>
    <mirroredQueue copyMessage = "true" postfix=".qmirror" prefix=""/>
</destinationInterceptors>

这样发送之后会自动存放到一个topic里面。需要定于那个topic就可以监听到消息了。

原文地址:https://www.cnblogs.com/yangk1996/p/11667699.html

时间: 2024-08-27 07:22:55

ActiveMQ学习第六篇:Destination的特性的相关文章

ActiveMQ学习笔记(六)——JMS消息类型

1.前言 ActiveMQ学习笔记(四)--通过ActiveMQ收发消息http://my.oschina.net/xiaoxishan/blog/380446 和ActiveMQ学习笔记(五)--使用Spring JMS收发消息http://my.oschina.net/xiaoxishan/blog/381209   中,发送和接受的消息类型都是TextMessage,即文本消息(如下面的代码所示).显然消息类型只有文本类型是不能满足要求的. //发送文本消息  session.create

ActiveMQ学习第七篇:Message Dispatch(消息发送)的特性

Message Cursors ??ActiveMQ发送持久消息的典型处理方式是:当消息的消费者准备就绪时,消息发送系统把存储的消息按批次发送给消费者,在发送完一个批次的消息后,指针的标记位置指向下-批次待发送消息的位置,进行后续的发送操作.这是一种比较健壮和灵活的消息发送方式,但大多数情况下,消息的消费者不是一直处于这种理想的活跃状态. ??因此,从ActiveMQ5. 0. 0版本开始,消息发送系统采用一种混合型的发送模式,当消息消费者处理活跃状态时,允许消息发送系统直接把持久消息发送给消费

ActiveMQ(14):Destination高级特性

一.Wildcards Wildcards用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展.ActiveMQ支持以下三种wildcards: 1:"." 用于作为路径上名字间的分隔符 2:"*" 用于匹配路径上的任何名字 3:">" 用于递归地匹配任何以这个名字开始的destination 示例,设想你有如下两个destinations PRICE.STOCK.NASDAQ.IBM (IBM在NASDAQ的股价) P

[dart学习]第六篇:流程控制语句

经过前面的基础知识了解学习,我们今天可以进入语句模块啦. dart主要有以下流程控制语句: if-else for循环 while和do-while循环 break和continue switch-case assert 当然,你还可以使用 try-catch或throw (一)if-else dart的if(或者else if)的条件表达式必须为bool表达式,不能使用其他类型.dart的if-else用法与C语言类似,不再细述. int a = 6; if(a<0) { print("

ActiveMQ学习第五篇:ActiveMq伪集群学习

启动多实例 # 1.将conf文件夹复制一份 cp -r conf/ conf-1/ #主要是修改conf-1目录activemq.xml # 2.修改Broker名称 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost-1" dataDirectory="${activemq.data}"> #3.数据存储如果使用的是kahaDB,

ActiveMQ学习第八篇:Messaage

Messaage Properties: ??ActiveMQ支持很多消息属性,具体可以参考 http://activemq.apache.org/activemq-message-properties.html ??常见得一些属性说明: queue得消息默认是持久化得 消息得优先级默认是4. 消息发送时设置了时间戳. 消息的过期时间默认是永不过期,过期的消息进入DLQ,可以配置DLQ及其处理策略. 如果消息是重发的,将会被标记出来. JMSReplyTo标识响应消息发送到哪个queue. JM

activemq 学习系列(六) activemq 集群

activemq 集群 activemq 是可以通过 networkConnectors 来实现集群的. 首页准备多台 activemq 1:端口号:8161,服务端口号:61616 2:端口号:8162,服务端口号:61617 3:端口号:8163,服务端口号:61618 然后在任意一台的 /conf/activemq.xml 的 broker 节点中加入一个子节点 <networkConnectors> <networkConnector uri="static:(tcp:

python 学习 第六篇 mysql

安装mysql拓展 yum install python-devel pip install MySQL-python 2.在mysql中创建库 create database reboot10 default character set utf8; 3.创建表 create table users( id int AUTO_INCREMENT primary key ,name varchar(20) not null comment '用户名' ,name_cn varchar(50) no

IOC容器特性注入第六篇:利用MVC注入点,把容器启动

这里是利用MVC三个注入点中的:IDependencyResolver 进行注入 在全局类中Global.asax代码如下: #region MVC Inject System.Web.Mvc.DependencyResolver.SetResolver(new DaHua.Sites.DependencyResolve(DaHua.Common.Runtime.EngineContext.Current, System.Web.Mvc.DependencyResolver.Current));