ActiveMQ学习第七篇:Message Dispatch(消息发送)的特性

Message Cursors

??ActiveMQ发送持久消息的典型处理方式是:当消息的消费者准备就绪时,消息发送系统把存储的消息按批次发送给消费者,在发送完一个批次的消息后,指针的标记位置指向下-批次待发送消息的位置,进行后续的发送操作。这是一种比较健壮和灵活的消息发送方式,但大多数情况下,消息的消费者不是一直处于这种理想的活跃状态。
??因此,从ActiveMQ5. 0. 0版本开始,消息发送系统采用一种混合型的发送模式,当消息消费者处理活跃状态时,允许消息发送系统直接把持久消息发送给消费者,当消费者处于不活跃状态下,切换使用Cursors来处理消息的发送。当消息消费者处于活跃状态并且处理能力比较强时,被持久存储的消息直接被发送到与消费者关联的发送队列,如下图
??当消息已经出现积压,消费者再开始活跃;或者消费者的消费速度比消息的发送速度慢时,消息将从Pending Cursor中提取,并发送与消费者关联的发送队列。见下图
Store- -based:
??从activemq5.0开始,默认使用此种类型的cursor,其能够满足大多数场景的使用要求。同时支持非持久消息的处理,Store-based内 嵌了File-based的模式,非持久消息直接被Non-Persistent Pendi ng Cursor 所处理。工作模式见下图

VM:
??相关的消息引用存储在内存中,当满足条件时,消息直接被发送到消费者与之相关的发送队列,处理速度非常快,但出现慢消费者或者消费者长时间处于不活跃状态的情况下,无法适应。工作模式见下图:

File-based:
??当内存设置达到设置的限制,消息被存储到磁盘中的临时文件中。工作模式见下图:
??在缺省情况下,ActiveMQ会根据使用的Message Store来决定使用何种类型的Message Cursors,但是你可以根据destination来配置Message Cursors。
对Topic subscripbers

<destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry topic="org.apache.>" producerFlowControl="false" memoryLimit="1mb">
            <dispatchPolicy>
              <strictOrderDispatchPolicy />
            </dispatchPolicy>
            <deadLetterStrategy>
              <individualDeadLetterStrategy  topicPrefix="Test.DLQ." />
            </deadLetterStrategy>
            <pendingSubscriberPolicy>
                <vmCursor />
            </pendingSubscriberPolicy>
            <pendingDurableSubscriberPolicy>
                <vmDurableCursor/>
            </pendingDurableSubscriberPolicy>
          </policyEntry>
        </policyEntries>
      </policyMap>
</destinationPolicy>

对于queue,有效的类型是storeCursor,vmQueueCursor和fil eQueueCursor

<destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry queue="org.apache.>">
            <deadLetterStrategy>
              <individualDeadLetterStrategy queuePrefix="Test.DLQ."/>
            </deadLetterStrategy>
            <pendingQueuePolicy>
                <vmQueueCursor />
            </pendingQueuePolicy>
          </policyEntry>
        </policyEntries>
      </policyMap>
 </destinationPolicy>

Asvnc Sends(消息异步发送)

??AciveMQ支持异步和同步发送消息,是可以配置的。通常对于快的消费者,是直接把消息同步发送过去,但对于一个Slow Consumer, 你使用同步发送消息可能出现Producer堵塞等现象,慢消费者适合使用异步发送。

异步发送的配置方式有以下几种:

  1. ActiveMQ默认设置dispatcheAsync=true是最好的性能设置。如果你处理的是Fast Consumer则使用dispatcheAsync=false。
  2. 在Connection URI级别来配置使用Async Sen
  cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
  1. 在ConnectionFactory级别来配置使用Async Send
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
  1. 在Connection级别来配置使用Async Send
((ActiveMQConnection)connection).setUseAsyncSend(true);

消息确认:

??ActiveMQ缺省支持批量确认消息,由于批量确认会提高性能。若希望禁止使用经过优化的确认方式,有以下几种方式:

  1. 在Connection URI 上禁止启用Optimized Acknowledgements。
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.optimizeAcknowledge=false");
  1. 在ConnectionFactory 上禁止启用Optimized Acknowledgements。
((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(fase);
  1. 在Connection上禁止启用Optimized Acknowledgements。
((ActiveMQConnection)connection).setOptimizeAcknowledge(true);

Dispatch Policies(分发策略):

??通常ActiveMQ会保证topic consumer 以相同的顺序接收来自同一个producer的消息,但有时候也需要保证不同的topic consumer 以相同的顺序接收消息,然而,由于多线程和异步处理,不同的topic consumer 可能会以不同的顺序接收来自不同producer的消息。


使用两个生产者和一个消费者。可以看到顺序是正常的,下面测试两个生产者使用不同session的多个消费者接受消息。

消费者

  public static void main(String[] args) {
        ConnectionFactory cf=new ActiveMQConnectionFactory("tcp://192.168.100.155:61616");
        for(int i =0;i<2;i++){
            Thread t=nw MyT(cf);
            t.start();
        }
    }


这样就可以看到顺序已经乱了。想要按照顺序分发需要在xml配置,但是会损失性能

<policyEntry topic="ORDERS.>">
    <dispatchPolicy>
        <strictOrderDispatchPolicy/>
     </dispatchPolicy>
</policyEntry>   


这样再次测试就可以看到顺序了。queue的配置

<policyEntry queue=">" stricOrderDispatch="false"/>

轮训分发策略:

??ActiveMQ的prefetch缺省参数,是针对处理大量消息时的高性能和高吞吐量而设置的,所以缺省的prefetch参数比较大,而且缺省的dispatche policies会尝试尽可能快的填满缓冲。
??然而有些情况下,例如只有少量的消息而且单个消息的处理时间比较长,那么在缺省的prefetch和dispatch policies下,这些少量的消息总是倾向于被分发到个边的consumer上,这样就会因为负载的不均衡而导致处理时间的增加。
??Round Robin Dispatch Policy会尝试平均分发消息,以下是一个例子:

<policyEntry topic="FOO.>">
    <dispatchPolicy>
        <roundRobinDispatchPolicy/>
    </dispatchPolicy>
</policyEntry>   

Optimized Acknowledgetment

??ActiveMQ缺省支持批量确认消息,由于批量确认会提高性能,如果希望在应用程序中禁止经过优化的确认方式,可以采用以下几种方式:

  1. 在Connection的URI上启用Optimized Acknowledgements
ActiveMQConnectionFactory  factory = new  ActiveMQConnectionFactory("tcp://localhost:61616?jms.optimizedAcknowledge=true");
  1. 在ActiveMQConnectionFacrory上启用Optimized Acknowledgements
 factory.setOptimizedAcknowledge(true);
  1. 在Connection上启用Optimized Acknowledgements
 ActiveMQConnection.setOptimizedAcknowledge();
  1. 在5.6 以后的版本,还可以在Connection URI上设置setOptimizedAcknowledgeTimeOut参数,默认值为300ms,你还可以设置自己要用的值,0表示禁用。

    Producer Flow Control(生产者流量控制)

    ??流量控制的含义:当生产者产生消息过快,超过流量限制的时候,生产者将会被阻塞知道资源可以继续使用,或者抛出一个JMSException,可以通过来配置。
    ??同步发送消息的producer会自动使用producer flow control;对于异步发送消息的producer,要使用producer flow control,你先要为connection配置一个ProducerWindowSize参数,如下:

 ActiveMQConnectionFactory.setProducerWindowSize(1024000);

??ProducerWindowSize是producer在发送消息的过程中,收到broker对于之前发送消息的确认之前,能够发送消息的最大字节数。
??可以禁用producer flow control,以下是ActiveMQ配置文件的一个例子。

<destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry queue=">" producerFlowControl="false"/>
        </policyEntries>
            </policyMap>
        </destinationPolicy>

??注意:自从ActiveMQ 5.x中引入新的消息游标之后,非持久化消息被分流到了临时文件存储中,以此来减少非持久化消息传送使用的内存总量。
??结果就是,你可能会发现一个队列的内存限制永远达不到,因为游标不需要使用太多的内存。如果你真的想把所有的非持久化消息存放到内存中,并在达到内存限制的时候停掉生产者,你需要配置,示例如下:

<policyEntry queue">" producerFolwControl="true" memoryLimit="10b">
    <pendingQueuePolicy>
        <vmQueueCursor>
    </pendingQueuePolicy>
 </policyEntry >

??上面的配置可以保证所有的非持久化队列消息都保存在内存中,每一个队列的内存限制为10kb.也就是相当于如果发送的消息超过10kb之后,后面的消息就无法发送了。如果存在事务的情况下,比如一次性发送的条数超过限制之后才commit这样也是会阻塞的。
??配置客户端异常:为了应对代理空间不足,而导致的不确定的阻塞send()方法的一种替代方案,就是将其配置的成客户端抛出的一个异常,通过将sendFailIfNoSpace属性设置为true,代理将会引起send()方法失败,并抛出javax.jms.ResourceAllocationException异常,传播到客户端,小面是一个配置例子:

<systemUsage>
            <systemUsage sendFailIfNoSpace="true">
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

??这么配置的好处是:客户端可以捕获javax.jms.ResourceAllocationException异常,稍等一下,并重试send()操作,而不是无限期的傻等下去。
??从5.3.1版本之后,sendFailIfNoSpaceAfterTimeout属性才被加进来。这个属性同样导致send()方法失败,并在客户端抛出异常,但仅当等待了指定时间之后才触发。如果配置的等待时间过去之后,代理上的空间仍然没有释放,仅当这个时候send()方法才会失败,并且在客户端抛出异常。示例:

 <systemUsage>
            <systemUsage sendFailIfNoSpaceAfterTimeout="3000">
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

 可以通过元素的一些属性来减慢生产者,如下例子:

<systemUsage>
            <systemUsage sendFailIfNoSpace="true">
                <memoryUsage>
                    <memoryUsage limit="64 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

??可以为非持久化的消息设置内存限制,为持久化消息设置磁盘空间,以及为临时消息设置总的空间,broker将在减慢生产者之前使用这些空间。具体上述的默认设置,broker将会一直阻塞send()方法的调用,直至一些消息被消费,有了可用的空间。

原文地址:https://www.cnblogs.com/yangk1996/p/11667701.html

时间: 2024-10-08 11:40:51

ActiveMQ学习第七篇:Message Dispatch(消息发送)的特性的相关文章

ActiveMQ(16):Message Dispatch的分发策略、消息批量确认和生产者流量控制

一.分发策略(Dispatch Policies) 1.1 严格顺序分发策略(Strict Order Dispatch Policy) 通常ActiveMQ会保证topic consumer以相同的顺序接收来自同一个producer的消息,但有时候也需要保证不同的topic consumer以 相同的顺序接收消息,然而,由于多线程和异步处理,不同的topic consumer可能会以不同的顺序接收来自不同producer的消息. Strict order dispatch policy会保证每

ActiveMQ(15):Message Dispatch的消息游标与异步发送

一.消息游标 1.1 简介 ActiveMQ发送持久消息的典型处理方式是:当消息的消费者准备就绪时,消息发送系统把存储的消息按批次发送给消费者, 在发送完一个批次的消息后,指针的标记位置指向下一批次待发送消息的位置,进行后续的发送操作.这是一种比较健壮 和灵活的消息发送方式,但大多数情况下,消息的消费者不是一直处于这种理想的活跃状态. 因此,从ActiveMQ5.0.0版本开始,消息发送系统采用一种混合型的发送模式,当消息消费者处理活跃状态时,允许消息发 送系统直接把持久消息发送给消费者,当消费

ActiveMQ学习笔记(一)--认识消息JMS

1.面向消息的中间件 1.1 什么是MOM 面向消息的中间件,Message Oriented Middleware,简称MOM,利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成.一个MOM系统,通常会包括客户端(Client).消息(Message)和MOM,客户端是发送或者接收消息的应用程序,消息封装了要传递的内容,MOM可以存储和转发消息. 1.2 MOM的好处 降低系统间的通信复杂度.有了MOM,系统间通信可以跨编程语言.不用考虑复杂的网络编程,只需

ActiveMQ(17):Message之消息属性与自身的系统消息地址

一.消息属性 ActiveMQ支持很多消息属性,具体可以参见 http://activemq.apache.org/activemq-message-properties.html 常见的一些属性说明 1:Queue的消息默认是持久化的 2:消息的优先级默认是4 3:消息发送时设置了时间戳 4:消息的过期时间默认是永不过期,过期的消息进入DLQ,可以配置DLQ及其处理策略 5:如果消息时重发的,将会标记出来 6:JMSReplyTo标识响应消息发送到哪个Queue 7:JMSCorelation

ActiveMQ学习第六篇:Destination的特性

Wildcards(通配符) Wildcars用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展. ??ActiveMQ支持以下三种通配符: ".":用于作为路径上名字间的分隔符 ">":用于递归的匹配任何以这个名字开始的Destination(目的地) "*":用于作为路径上任何名字. ??举例来说,如有以下两个Destination: ??PRICE.COMPUTER.JD.APPLE(苹果电脑在京东上的价格) ?

ActiveMQ学习第八篇:Messaage

Messaage Properties: ??ActiveMQ支持很多消息属性,具体可以参考 http://activemq.apache.org/activemq-message-properties.html ??常见得一些属性说明: queue得消息默认是持久化得 消息得优先级默认是4. 消息发送时设置了时间戳. 消息的过期时间默认是永不过期,过期的消息进入DLQ,可以配置DLQ及其处理策略. 如果消息是重发的,将会被标记出来. JMSReplyTo标识响应消息发送到哪个queue. JM

activemq学习总结 (转)Java消息队列--ActiveMq 实战

转:https://www.cnblogs.com/jaycekon/p/6225058.html 感谢作者 ActiveMQ官网下载地址:http://activemq.apache.org/download.html ActiveMQ 提供了Windows 和Linux.Unix 等几个版本,楼主这里选择了Linux 版本下进行开发. 下载完安装包,解压之后的目录: 从它的目录来说,还是很简单的: bin存放的是脚本文件 conf存放的是基本配置文件 data存放的是日志文件 docs存放的

ActiveMQ学习第五篇:ActiveMq伪集群学习

启动多实例 # 1.将conf文件夹复制一份 cp -r conf/ conf-1/ #主要是修改conf-1目录activemq.xml # 2.修改Broker名称 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost-1" dataDirectory="${activemq.data}"> #3.数据存储如果使用的是kahaDB,

R语言学习 第七篇:列表

列表(List)是R中最复杂的数据类型,一般来说,列表是数据对象的有序集合,但是,列表的各个元素(item)的数据类型可以不同,每个元素的长度可以不同,是R中最灵活的数据类型.列表项可以是列表类型,因此,列表被认为是递归变量,与之相对,向量,数组,矩阵,数据框被认为是原子变量. 一,创建列表 列表由list()函数创建,每个参数使用逗号分割,用于指定列表项的内容,列表中的元素数量不限,类型不限.列表项的名称是可选的,可以创建无名的列表项:列表的每一个item,都可以显示命名,例如,name1=i