ActiveMQ(22):Consumer高级特性之消息分组(Message Groups)

一、简介

Message Groups就是对消息分组,它是Exclusive Consumer功能的增强。

逻辑上,Message Groups 可以看成是一种并发的Exclusive Consumer。跟所有的消息都由唯一的consumer处理不同,JMS 消息属性JMSXGroupID 被用来区分message group。Message Groups特性保证所有具有相同JMSXGroupID的消息会被分发到相同的consumer(只要这个consumer保持active)。

另外一方面,Message Groups特性也是一种负载均衡的机制。在一个消息被分发到consumer之前,broker首先检查消息JMSXGroupID属性。如果存在,那么broker会检查是否有某个consumer拥有这个message group。如果没有,那么broker会选择一个consumer,并将它关联到这个message group。此后,这个consumer会接收这个message group的所有消息,直到:

1:Consumer被关闭

2:Message group被关闭,通过发送一个消息,并设置这个消息的JMSXGroupSeq为-1

二、操作

2.1 创建一个Message Groups

创建一个Message Groups,只需要在message对象上设置属性即可,如下:

message.setStringProperty("JMSXGroupID","GroupA");

2.2 关闭一个Message Groups

关闭一个Message Groups,只需要在message对象上设置属性即可,如下:

message.setStringProperty("JMSXGroupID","GroupA");
message.setIntProperty("JMSXGroupSeq", -1);

发送:

public void test4() throws Exception {
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)");
  Connection connection = connectionFactory.createConnection();
  connection.start();
  Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
  Destination destination = session.createQueue("test-queue4");
  MessageProducer producer = session.createProducer(destination);
  for (int i = 0; i < 3; i++) {
      TextMessage message = session.createTextMessage("messageGroupA--" + i);
	message.setStringProperty("JMSXGroupID", "GroupA");
	producer.send(message);

	TextMessage message2 = session.createTextMessage("GroupB--" + i);
	message.setStringProperty("JMSXGroupID", "GroupB");
	producer.send(message2);
  }
  session.commit();
  session.close();
    connection.close();
}

接收:

public void test4() throws Exception {
    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)");

  Connection connection = cf.createConnection();
  connection.start();

  final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
  Destination destination = session.createQueue("test-queue4");
  for (int i = 0; i < 2; i++) {
      MessageConsumer consumer = session.createConsumer(destination);
    consumer.setMessageListener(new MessageListener() {
        @Override
      public void onMessage(Message message) {
	      TextMessage msg = (TextMessage) message;
	    try {
	        System.out.println(consumer + "收到消息:" + msg.getText());
		  session.commit();
	    } catch (Exception e) {
	        e.printStackTrace();
	    }
	  }
    });
    }
}

效果:

时间: 2024-10-10 09:47:33

ActiveMQ(22):Consumer高级特性之消息分组(Message Groups)的相关文章

ActiveMQ(十二)——Consumer高级特性

一.独有消费者    Queue中的消息是按照顺序被分发到consumers的,然而,当有多个consumers同时从相同的queue中提取消息时,你将失去这个保证.因为这些消息是被多个多线程并发的处理.有的时候,保证消息按照顺序处理是很重要的,但是你可能不希望在插入订单操作结束之前执行更新这个订单的操作.    ActiveMQ从4.x版本起开始支持Exclusive Consumer.Broker会从多个consumers中挑选一个consumer来处理queue中所有的消息,从而保证消息的

ActiveMQ(23):Consumer高级特性之Message dispatche async、Consumer Priority与Message Selectors

一.Message dispatche async(消息异步分发) 在activemq4.0以后,你可以选择broker同步或异步的把消息分发给消费者.可以设置dispatchAsync 属性,默认是true,通常情况下这是最佳的. 修改:可以通过如下几种方式 1:在ConnectionFactory层设置 ((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false); 2:在Connection上设置 这个设置将会

ActiveMQ(24):Consumer高级特性之Slow Consumer Handling(慢消费者的处理)

一.Prefetch机制 ActiveMQ通过Prefetch机制来提高性能,方式是在客户端的内存里可能会缓存一定数量的消息.缓存消息的数量由prefetch limit来控制.当某个consumer的prefetch buffer已经达到上限,那么broker不会再向consumer分发消息,直到consumer向broker发送消息的确认,确认后的消息将会从缓存中去掉. 可以通过在ActiveMQConnectionFactory或者ActiveMQConnection上设置ActiveMQ

Consumer高级特性

Queue队列的消息一般是按照顺序各个队列依次获取消息,每次获取一个.所以假设有两个队列queue1,queue2,发送的消息为1.2.3.4.5.则默认情况下queue1获取到的消息为1.3.5,queue2获取到的消息为2.4. 1.Exclusive Consume 用于队列消息 独有消费者:Queue中的消息是按照顺序被分发到consumer的,然而,当你有多个consumers同时从相同的queue中提取消息时,你将失去这个保证.因为这些消息是被多个线程并发的处理.有的时候,保证消息按

ActiveMQ(19):Consumer高级特性之独有消费者(Exclusive Consumer)

一.简介 Queue中的消息是按照顺序被分发到consumers的.然而,当你有多个consumers同时从相同的queue中提取消息时, 你将失去这个保证.因为这些消息是被多个线程并发的处理.有的时候,保证消息按照顺序处理是很重要的. 如,你可能不希望在插入订单操作结束之前执行更新这个订单的操作. 二.使用 ActiveMQ从4.x版本起开始支持Exclusive Consumer. Broker会从多个consumers中挑选一个consumer来处理queue中 所有的消息,从而保证了消息

ActiveMQ(20):Consumer高级特性之重新投递(Redelivery Policy)

一.简介 ActiveMQ在接收消息的Client有以下几种操作的时候,需要重新传递消息: 1:Client用了transactions,且在session中调用了rollback() 2:Client用了transactions,且在调用commit()之前关闭 3:Client在CLIENT_ACKNOWLEDGE的传递模式下,在session中调用了recover() 二.定制想要的再次传送策略 可以通过设置ActiveMQConnectionFactory和ActiveMQConnect

ActiveMQ(21):Consumer高级特性之管理持久化订阅(Manage Durable Subscribers)

消息的持久化,保证了消费者离线后,再次进入系统,不会错过消息,但是这也会消耗很 多的资源.从5.6开始,可以对持久化消息进行如下管理: 我们还可能希望删除那些不活动的订阅者,如下: <broker name="localhost"      offlineDurableSubscriberTimeout="600000"     offlineDurableSubscriberTaskSchedule="30000"> 1.offli

ActiveMQ中的Destination高级特性(一)

---------------------------------------------------------------------------------------- Destination高级特性----->Composite Destinations 组合队列Composite Destinations : 允许用一个虚拟的destination代表多个destinations,这样就可以通过composite destinations在一个操作中同时向多个queue/topic发

ActiveMQ(14):Destination高级特性

一.Wildcards Wildcards用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展.ActiveMQ支持以下三种wildcards: 1:"." 用于作为路径上名字间的分隔符 2:"*" 用于匹配路径上的任何名字 3:">" 用于递归地匹配任何以这个名字开始的destination 示例,设想你有如下两个destinations PRICE.STOCK.NASDAQ.IBM (IBM在NASDAQ的股价) P