ActiveMQ学习第九篇:Consumer

Exclusive Consumer:

??独有消费者:Queue中的消息是按照顺序被分发到consumer的,然而,当你有多个consumers同时从相同的queue中提取消息时,你将失去这个保证。因为这些消息是被多个线程并发的处理。有的时候,保证消息按照顺序处理是很重要的。例如:你可能不希望在插入订单操作结束之前执行更新这个订单的操作。
  ActiveMQ从4.x版本开始支持Exclusive Consumer。Broker会从多个Consumers中挑选一个consumer来处理queue中所有的消息,从而保证了消息的有序处理。如果这个consumer失效,那么broker会自动切换到其他的consumer。可以通过destination options来创建一个Exclusive Consumer,如下:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);

还可以给consumer设置优先级,以便针对网络情况进行优化,如下:

queue = new  ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true&consumer.priority=10");

Consumer Dispatche Async:

??在activemq4.0以后,你可以选择broker同步或异步的把消息分发给消费者。可以设置dispatchAsync属性,默认是true,通常情况下这是最佳的。
你也可以通过如下几种方式修改:

  1. 在ConnectionFactory层设置
 ActiveMQConnectionFactory.setDispatchAsync(false);
  1. 在Conection上设置,这个设置将会覆盖ConnectionFactory上的设置
 ActiveMQConnetion.setDispatchAsync(false);
  1. 在Consumer上设置

      queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false");
      consumer = session.createConsumer(queue);

    Consumer Priority:

    ??JMS JMSPriority定义了十个消息优先级值,0是最低优先级,9是最高优先级,另外,客户端应当将0-4看作普通优先级,5-9看作加急优先级。
    自定义Consumer Priority优先级。配置如下:

      queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10");
      consumer = session.createConsumer(queue);

    ??Consumer的Priority的划分为0~127个级别,127是最高的级别,0是最低的也是ActiveMQ默认的。这种配置可以让Broker根据consumer的优先级来发送消息到较高的优先级的Consumer上,如果某个较高的Consumer的消息转载慢,则Broker会把消息发送到仅次于它优先级的Consumer上。

    Manage Durable Subscribers:

    ??消息持久化,保证了消费者离线之后,再次进入系统,不会错过消息,但是这也会消耗很多的资源,从5.6开始,可以对持久化进行如下管理:
     Removing inactive subscribers:我们还希望可以删除那些不活动的订阅者,如下:

    <broker name="localhost" offlineDurableSubscriberTimeout="86400000" offlineDurableSubscriberTaskSchedule="3600000">
  2. offlineDurableSubscriberTimeout:离线多长时间就过期删除,缺省是-1,就是不删除。
  3. offlineDurableSubscriberTaskSchedule: 多长时间检查一次,缺省300000,单位毫秒。

    Message Groups:

    ??Message Goups就是对消息分组,它是Exclusive Consumer功能的增强。
      逻辑上Message Groups可以看成是一种并发的Exclusive Consumer。跟所有的消息都由唯一的consumer处理不同,JMS消息属性的JMSXGroupID用来区分message group.
      Message Group特性保证所有具有相同JMSXGroupID的消息 都会被分发到相同的consumer(只要这个consumer保持在线)。
      另一方面,Message Groups特性也是一种负载均衡的机制。在一个消息被分发到consumer之前,broker首先检查消息JMSXGroupID属性。如果存在,那么broker会检查是否有某个consumer拥有这个message group.如果没有,那么broker会选择一个consumer,并将它关联到这个message group.此后,这个consumer会接收到这个message group的所有消息,直到:

  4. consumer被关闭
  5. Message group被关闭,通过发送一个消息,并设置这个消息的JMSXGroupSeq为-1
    创建一个Message Groups,只需要在message对象上设置属性即可,如下:
 message.setStringProperty("JMSXGroupID","GroupA");

关闭一个Message Groups,只需要在message对象上设置属性即可,如下:

  message.setStringProperty("JMSXGroupID","GroupA");
  message.setIntProperty("JMSXGroupSeq",-1);

Message Selectors:

??JMS Selectors 用在获取消息的时候,可以基于消息属性和Xpath语法对消息进行过滤。JMS Selectors有SQL92语义定义。以下是个Selectors的例子:

 consumer = session.createConsumer(destination, "JMSType='car' AND weight > 2500");
  1. JMS Selectors表达式中,可以使用IN, NOT IN, LIKE等
  2. 需要注意的是,JMS Selectors表达式中的日期和时间需要使用标准的Long型毫秒值。
  3. 表达式中的属性不会自动进行类型转换,例如:
    myMessage.setStringProperty("NumberOfOrders","2");
    那么此时“NumberOfOrders  > 1” 的结果就是会false
  4. Message Groups虽然可以保证具有相同的message group的消息会被唯一的consumer顺序处理,但是却不能确定被哪个consumer处理,在某些情况下,Message Groups可以和JMS Selector一起工作。
    ??例如:设想三个consumers分别是A,B,C,你可以在producer中为消息设置三个message groups分别为“A","B","C"。然后令Consumer A使用JMSXGroupID=‘A‘作为selector,c和b也同理,这样就保证了message group A的消息只会被A处理,需要注意的是,这种做法有以下缺点:
      (1) producer必须直到当前正在运行的consumers,也就是说producer和consumer被耦合到一起。
      (2) 如果某个consumer失效,那么应该被这个consumer消费的消息将会一直被积压在broker上。

    Redelivery Policy:

    ActiveMQ在接收消息的Client有以下几种操作的时候,需要重新传递消息:

  5. Client用了transactions,且在Session中调用了rollback();
  6. Client用了transactions,且在调用commit()之前关闭。
  7. Client在CLIENT_ACKNOWLEDGE的传递模式下,在session中调用了recover();
      可以通过设置ActiveMQConnectionFactory和ActiveMQConnection来定制想要的再次传送策略,可用的Redelivery属性如下:
      1). collisionAvoidanceFactor:设置防止冲突范围的正负百分比,只有启用了useCollisionAvoidance参数时才生效。也就是在延迟时间上再加一个时间波动范围。默认值是0.15
      2). maximumRedeliveries:最大重传次数,达到最大重传次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。默认值为
      3) . maximumRedeliveryDelay:传送延迟,旨在useExpoentialBackOff为true时有效(5.5之后),假设首次重间隔为10ms,倍数为2,那么第二次重连时间间隔为20ms,第三次重连时间间隔为40ms,当重连时间间隔大于最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。默认为-1.
      4). initialRedeliveryDelay:初始重发延迟时间,默认1000L
      5). redeliveryDelay:重发延迟时间,当initialRedeliveryDelay=0时生效,默认1000L
      6). useCollisionAvoidance:启用防止冲突功能,默认false
      7). useExponentialBackOff:启用指数倍数递增的方式增加延迟时间,默认false
      8). backOffMultiplier:重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效,默认是5;
    在接收的client可以如下设置:
  ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:61617)?randowize=false");
  RedeliveryPolicy policy = new RedeliveryPolicy();
  policy.setMaximumRedeliveries(3);
  cf.setRedeliveryPolicy(policy);

??当消息试图被传递的次数超多配置中的maximumRedeliveries属性的值时,那么,broker会认定该消息是一个死消息,并会把该消息发送到死队列中。默认activeMQ中死队列被声明为”ActiveMQ.DLQ",所有不能消费的消息都被传递到该死队列中。你可以在activemq.xml中配置individualDeadLetterStrategy属性,示例如下:

<policyEntry queue=">">
    <dealLetterStrategy>
        <individualDeadLetterStategy queuePrefix="DLQ."
            useQueueForQueueMessage="true"/>
    </dealLetterStrategy>
</policyEntry>

? 自动删除过期消息:有时需要直接删除过期的消息而不需要发送到死队列中,可以使用属性processExpired=false来设置,示例如下:

<policyEntry queue=">">
    <dealLetterStrategy>
        <sharedDeadLetterStategy processExpired="false"/>
    </dealLetterStrategy>
</policyEntry>

??存放非持久消息到死队列中:默认情况下,ActiveMQ不会把非持久的死消息发送到死队列中。如果你想非持久的消息 发送到死队列中,需要设置属性processNonPersistent="true",示例如下:

<policyEntry queue=">">
    <dealLetterStrategy>
        <sharedDeadLetterStategy processNonPersistent="true"/>
    </dealLetterStrategy>
</policyEntry>

??RedeliveryPolicy per Destination:在5.7之后,你可以为每一个Destination配置一个Redelivery Policy,示例如:

ActiveMQConnection connection ...  // Create a connection
RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
queuePolicy.setInitialRedeliveryDelay(0);
queuePolicy.setRedeliveryDelay(1000);
queuePolicy.setUseExponentialBackOff(false);
queuePolicy.setMaximumRedeliveries(2);

RedeliveryPolicy topicPolicy = new RedeliveryPolicy();
topicPolicy.setInitialRedeliveryDelay(0);
topicPolicy.setRedeliveryDelay(1000);
topicPolicy.setUseExponentialBackOff(false);
topicPolicy.setMaximumRedeliveries(3);

// Receive a message with the JMS API
RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
map.put(new ActiveMQTopic(">"), topicPolicy);
map.put(new ActiveMQQueue(">"), queuePolicy);

Slow Consumer Handling:

??Prefetch机制:ActiveMQ通过Prefetch机制来提供性能,方式是在客户端得内存里可能缓存一定数量得消息。缓存消息得数量由prefetch limit来控制。当某个consumer的prefetch buffer已经达到上限,那么broker不会再向consumer分发消息,知道consumer像broker发送消息的确认,确认后的消息将会从缓存中去掉。
  可以通过在ActiveMQConnectionFactory或者ActiveMQConnection上设置ActiveMQPrefetchPolicy对象来配置prefetch policy。也可以通过connection options或destination options来配置。例如:

  tcp://localhost:61616?jms.prefetchPolicy.all=50
  tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1

  或

queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
consumer = session.createConsumer(queue)

等方式配置
  prefetch size缺省的值如下:

  • persistent queues (default value: 1000)
  • non-persistent queues (default value: 1000)
  • persistent topics (default value: 100)
  • non-persistent topics (default value: Short.MAX_VALUE - 1)

慢Consumer处理
  慢消费者会在非持久的topics上导致问题,一旦消息积压起来,会导致broker把大量消息保存到内存中,broker也会因此而变慢,目前,ActiveMQ使用Pending Message Limit Strategy来解决这个问题。除了prefetch buffer之外,你还要配置缓存消息的上限,超过这个上限之后,新消息到来时会丢弃旧的消息。
  通过在配置文件的destination map中配置pendingMessageLimitStrategy,可以为不同的topic message配置不同的策略。
  Pending Message Limit Strategy(等待消息限制策略),目前有以下两种“

  1. Constant Pending Message Limit Strategy
      Limit 可以设置0, > 0, -1三种方式:0表示:不额外的增加其预存大小,> 0表示:在额外的增加其预存大小,-1表示:不增加预存也不丢弃旧的消息,这个策略使用常量限制,配置如下:
 <constantPendingMessageLimitStrategy limit="50"/>
  1. Prefetch Rate Pending Message LimitStrategy
    这种策略是利用Consumer的之前的预存的大小乘以其倍数等于现在的预存大小。比如:
<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
  1. 说明:在以上两种方式中,如果设置了0,意味着除了prefetch之外不再缓存消息,如果设置了-1意味着禁止丢弃消息。
      配置消息的丢弃策略,目前有三种方式:
      oldestMessageEvictionStrategy:这个策略丢弃最旧的消息。
      oldestMessageWithLowestPriorityEvictionStrategy: 这个策略丢弃最旧的,而且具有最低优先级的消息。  
      uniquePropertyMessageEvictionStrategy:从5.6开始,可以根据自定义的属性来进行抛弃,比如 <uniquePropertyMessageEvictionStrategy propertyName="STOCK"/>表示要抛弃属性名称为Stock的消息。
      配置方式:
      

原文地址:https://www.cnblogs.com/yangk1996/p/11667705.html

时间: 2024-08-09 19:46:45

ActiveMQ学习第九篇:Consumer的相关文章

OpenCV学习第九篇:图像模糊二

1.中值滤波 2.双边滤波 中值滤波 统计排序滤波器 中值对椒盐噪声有很好的抑制作用 中值滤波的意思是绿色框框区域内的像素点的值,从小到大排序,去中间的值给正中间的值,也就是用124替换150,最大值和最小值滤波也是同样的道理,用最大值或者最小值替换中间的值! 双边滤波 均值模糊无法克服边缘像素信息丢失缺陷,原因是均值滤波是基于平均权重 高斯模糊部分克服了该缺陷,但是无法完全避免,应为没有考虑像素值的不同 高斯双边模糊-是边缘保留的滤波方法,避免了边缘信息丢失,保留了图像轮廓不变 API: 中值

学习java随笔第九篇:java异常处理

在java中的异常处理和c#中的异常处理是一样的都是用try-catch语句. 基本语法如下 try { //此处是可能出现异常的代码 } catch(Exception e) { //此处是如果发生异常的处理代码 } finally语句 try { //此处是可能出现异常的代码 } catch(Exception e) { //此处是如果发生异常的处理代码 } finally { //此处是肯定被执行的代码 } 抛出异常 使用thows和throw语句抛出异常 public static vo

ActiveMQ学习第七篇:Message Dispatch(消息发送)的特性

Message Cursors ??ActiveMQ发送持久消息的典型处理方式是:当消息的消费者准备就绪时,消息发送系统把存储的消息按批次发送给消费者,在发送完一个批次的消息后,指针的标记位置指向下-批次待发送消息的位置,进行后续的发送操作.这是一种比较健壮和灵活的消息发送方式,但大多数情况下,消息的消费者不是一直处于这种理想的活跃状态. ??因此,从ActiveMQ5. 0. 0版本开始,消息发送系统采用一种混合型的发送模式,当消息消费者处理活跃状态时,允许消息发送系统直接把持久消息发送给消费

Python之路【第九篇】:Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy

Python之路[第九篇]:Python操作 RabbitMQ.Redis.Memcache.SQLAlchemy Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度.Memcached基于一个存储键/值对的hashmap.其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信. Memc

第九篇 Integration Services:控制流任务错误

本篇文章是Integration Services系列的第九篇,详细内容请参考原文. 简介在前面三篇文章,我们创建了一个新的SSIS包,学习了脚本任务和优先约束,并检查包的MaxConcurrentExecutables属性.我们检查.演示并测试优先约束赋值为"成功"."完成"."失败"时对工作流的影响.我们学习了SSIS变量和表达式,并将它们应用到优先约束.这一篇,we introduce fault tolerance by examinin

第九篇 SQL Server代理了解作业和安全

本篇文章是SQL Server代理系列的第九篇,详细内容请参考原文 在这系列的前几篇,学习了如何在SQL Server代理作业步骤启动外部程序.你可以使用过时的ActiveX系统,运行批处理命令脚本,甚至自己的程序.你最好的选择是使用PowerShell的子系统运行PowerShell脚本.PowerShell脚本将允许你处理几乎所有方面的Windows和SQL Server问题.在这一篇,你会深入到SQL Server代理安全.安全是个令人困惑的话题,它值得一些明确的考虑.这系列有两个不同方面

[老老实实学WCF] 第九篇 消息通信模式(上) 请求应答与单向

老老实实学WCF 第九篇 消息通信模式(上) 请求应答与单向 通过前两篇的学习,我们了解了服务模型的一些特性如会话和实例化,今天我们来进一步学习服务模型的另一个重要特性:消息通信模式. WCF的服务端与客户端在通信时有三种模式:单向模式.请求/应答模式和双工模式. 如果选用了单向模式,调用方在向被调用方进行了调用后不期待任何回应,被调用方在执行完调用后不给调用方任何反馈.如客户端通过单向模式调用了一个服务端的操作后,就去干别的了,不会等待服务端给他任何响应,他也无从得知调用是否成功,甚至连发生了

ActiveMQ学习笔记(五)——使用Spring JMS收发消息

ActiveMQ学习笔记(四)http://my.oschina.net/xiaoxishan/blog/380446 中记录了如何使用原生的方式从ActiveMQ中收发消息.可以看出,每次收发消息都要写许多重复的代码,Spring 为我们提供了更为方便的方式,这就是Spring JMS.我们通过一个例子展开讲述.包括队列.主题消息的收发相关的Spring配置.代码.测试. 本例中,消息的收发都写在了一个工程里. 1.使用maven管理依赖包 <dependencies> <depend

第九篇 Replication:复制监视器

本篇文章是SQL Server Replication系列的第九篇,详细内容请参考原文. 复制监视器允许你查看复制配置组件的健康状况.这一篇假设你遵循前八篇,并且你已经有一个合并发布和事务发布.启动复制监控器复制监视器不是SSMS的一部分,它是一个独立的可执行文件(SqlMonitor.exe).在一个标准的SQL Server安装中开始菜单下找不到复制监视器.启动复制监视器的最简单方法是:SSMS对象资源管理器下连接到发布服务器,右击你的发布选择"启动复制监视器",如图9.1所示:图