ActiveMQ配置策略

1.消息发送

1.异步发送

消息生产者使用持久(persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到 broker 发送一个确认消息给生产者,这个确认消息暗示生产者 broker已经成功地将它发送的消息路由到目标目的并把消息保存到二级存储中。这个过程通常称为同步发送。但有一个例外,当发送方法在一个事物上下文中时,被阻塞的是 commit 方法而不是 send 方法。commit 方法成功返回意味着所有的持久消息都以被写到二级存储中。

同步发送持久消息能够提供更好的可靠性,但这潜在地影响了程序的相应速度,因为在接受到 broker 的确认消息之前应用程序或线程会被阻塞。如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送。异步发送不会在受到 broker 的确认之前一直阻塞 Producer.send 方法。如果想启动异步传送可以把 connector uri 的 jms.useAsyncSend 选项设为 true,如下所示:

tcp://localhost:61616?jms.useAsyncSend=true

从 ActiveMQ 5 开始可以控制异步发送流。也就是说,在受到 broker 的确认应答之前,生产者能够传送消息给 broker 的最大信息量。即使是异步发送消息,生产者也是在收到 broker 的确认应答后才把下一条消息传送给 broker。当使用异步传送的时候,可以设置jms.producerWindowSize(单位为字节)属性,当生产者中等待发送的信息量到达设置的值时,即使没有收到 broker 的应答消息,生产者同样会把这些消息发给 broker。如下面的示例设置:

tcp://localhost:61616?jms.useAsyncSend=true&jms.producerWindowSize=1024000


org.apache.activemq.ActiveMQConnectionFactory中引入:

<property
name="useAsyncSend" value="true" />  <!-- false:同步 true:异步 -->

2. ActiveMQ讯息策略


只对Topic有效


DispatchPolcicy:
转发策略


RoundRobinDispatchPolicy


轮询


StrictOrderDispatchPolicy


严格有序


PriorityDispatchPolicy


基于“property”权重对“订阅者”排序


SimpleDispatchPolicy(默认值)


按照当前“订阅者”列表的顺序


SubscriptionRecoveryPolicy: 恢复策略


FixedSizedSubscriptionRecoveryPolicy


保存一定size的消息


FixedCountSubscriptionRecoveryPolicy


保存一定条数的消息


LastImageSubscriptionRecoveryPolicy


只保留最新的一条数据


QueryBasedSubscriptionRecoveryPolicy


符合置顶selector的消息都将被保存


TimedSubscriptionRecoveryPolicy


保留最近一段时间的消息


NoSubscriptionRecoveryPolicy(默认值)


关闭“恢复机制”


PendingMessageLimitStrategy:
消息限制策略(面向Slow Consumer)


ConstantPendingMessageLimitStrategy


保留固定条数的消息


PrefetchRatePendingMessageLimitStrategy


保留prefetchSize倍数条消息


MessageEvictionStrategy: 消息剔除策略(面向Slow Consumer)


OldestMessageEvictionStrategy(默认值)


移除旧消息


OldestMessageWithLowestPriorityEvictionStrategy


旧数据中权重较低的消息,将会被移除


UniquePropertyMessageEvictionStrategy


移除具有指定property的旧消息


PendingSubscriberMessageStoragePolicy: 待消息转存策略(针对非耐久)


vmCursor
(默认值)


将消息转存到基于内存


storeCursor


将消息转存到storeEngine中


fileCursor


将消息转存到临时文件中


PendingSubscriberMessageStoragePolicy: 待消息转存策略(针对耐久)


vmDurableCursor


将消息转存到基于内存


storeDurableSubscriberCursor


将消息转存到storeEngine中


fileDurableSubscriberCursor


将消息转存到临时文件中


只对Queue有效


PendingQueueMessageStoragePolicy:待消息转存策略


vmQueueCursor(默认值)


将消息转存到基于内存


storeCursor


将消息转存到storeEngine中


fileQueueCursor


将消息转存到临时文件中


对Topic和Queue都有效


DeadLetterStrategy:“死信”策略


IndividualDeadLetterStrategy


把DeadLetter放入各自的死信通道中


SharedDeadLetterStrategy(默认值)


将所有的DeadLetter保存在一个共享的队列中


DiscardingDeadLetterStrategy


broker将直接抛弃DeadLeatter


SlowConsumerStrategy:慢速消费者策略


AbortSlowConsumerStrategy


中断慢速消费者


AbortSlowConsumerStrategy


如果慢速消费者最后一个ACK距离现在的时间间隔超过阀值,则中断慢速消费者。

         

3. 消息重发策略

1.在使用事务的Session中,调用rollback()方法;

2.在使用事务的Session中,调用commit()方法之前就关闭了Session;

3.在Session中使用CLIENT_ACKNOWLEDGE签收模式,并且调用了recover()方法。


<!--
重发策略 -->

<bean
id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">

<!--重发次数,默认为6次 这里设置为1次 -->

<property
name="maximumRedeliveries" value="4"></property>

<!--重发时间间隔,默认为1秒 -->

<property
name="initialRedeliveryDelay"
value="2000"></property>

</bean>

class="org.apache.activemq.ActiveMQConnectionFactory"中引入:

<property
name="redeliveryPolicy" ref="activeMQRedeliveryPolicy"
/>

org.springframework.jms.listener.DefaultMessageListenerContainer中引入:

<property
name="sessionTransacted" value="true" />

4.消息持久化与非持久化

采用持久传输时,传输的消息会保存到磁盘中(messages
are persisted to disk/database),即“存储转发”方式。先把消息存储到磁盘中,然后再将消息“转发”给订阅者。

采用非持久传输时,发送的消息不会存储到磁盘中。

采用持久传输时,当Borker宕机 恢复后,消息还在。采用非持久传输,Borker宕机重启后,消息丢失。比如,当生产者将消息投递给Broker后,Broker将该消息存储到磁盘中,在Broker将消息发送给Subscriber之前,Broker宕机了,如果采用持久传输,Broker重启后,从磁盘中读出消息再传递给Subscriber;如果采用非持久传输,这条消息就丢失了。


org.springframework.jms.core.JmsTemplate中引入:

<property
name="explicitQosEnabled" value="true" /> <!-- 是否启用下列配置 -->

<property
name="deliveryMode" value="1"/>  <!-- 1:非持久化2:持久化 -->

<property
name="deliveryPersistent" value="false" />   <!-- true:持久化false:非持久化 -->

其中deliveryPersistent写不写均可

5.死信队列

DLQ-死信队列(Dead Letter Queue)用来保存处理失败或者过期的消息.缺省的死信队列是ActiveMQ.DLQ,如果没有特别指定,死信都会被发送到这个队列。默认情况下持久消息过期,会被送到DLQ,非持久消息不会送到DLQ

消息进入死信队列条件:

1.给ActiveMQConnectionFactory配上重发机制;

2.给DefaultMessageListenerContainer配置事务;


在activemq.xml的policyEntries节点中增加如下策略配置。

<policyEntry
queue=">">

<deadLetterStrategy>

<individualDeadLetterStrategy
useQueueForQueueMessages="true"
processNonPersistent="true"

queuePrefix="DLQ." processExpired="true"/>

</deadLetterStrategy>

</policyEntry>

useQueueForQueueMessages:true队列;false主题

processNonPersistent:true非持久化的消息放入死信队列

processExpired:true过期消息放入死信队列

queuePrefix:队列名称前缀, DLQ.+队列名称

6. Consumer特性详解与优化


asyncDispatch(默认为true)


broker端是否允许使用异步转发


alwaysSessionAsync(默认为true)


客户端session是否使用异步转发


maxThreadPoolSize

(默认Integer.MAX_VALUE)


Session异步转发线程池大小,Connection下所有session共享


useDedicatedTaskRunner(默认为false)


为每个session设置单独的线程池


messagePrioritySupported(默认为true)


Consumer是否开启消息权重,即是否支持消息优先级的属性设置


Priority(默认为0)


消费者权重,broker会优先级把消息转发给权重更大的Consumer


prefetchSize(默认为1000)


Broker批量发送prefetchSize条消息给consumer


noLocal(默认为false)


consumer是否接受本地消息,本地消息,就是为同一个connection创建的producer发送的消息


exclusiveConsumer

(仅对Queue有效, 默认值为false)


如果Queue中,有任意一个Consumer是“排他的”,那么消息只会转发给“exclusiveConsumer=true”的消费者;如果全部的消费者都是“排他的”,那么最新创建的consumer将会获取消息


allConsumersExclusiveByDefault


如果此参数为true,那么当前Queue中的所有消费者默认为exclusive


Retroactive

(仅对Topic有效, 默认值为false)


retroactive类型的订阅者可以获取这些原本不属于自己但broker上还保存的旧消息, 如果此订阅者不是durable(耐久的),它可以获取最近发布的一些消息;如果是durable,它可以获取存储器中尚未删除的所有的旧消息


selector (选择器)


Broker通过selector在筛选满足条件的Consumer, 如果你使用了selector,你一定要让全局中所有的selector覆盖所有的消息,或者至少有一个没有selector的consumer


Group(消息分组)(仅对Queue有效)


Producer在发送消息时为多条消息设定group,它们将会被同一个consumer消费


Duplicate(重复消息)


consumer在将消息消费之前,都会检测消息是否重复,对于重复消息,将不会消费而是直接发送poisonAck(毒丸),broker端会将消息直接删除。

7.慢速消费者(Slow
Consumer)

慢速,是相对于producer而言;简单来说,producer不断产生新的消息,broker端在内存中已经积压的足够多(比如cacheLimit已满),但是在转发给某个consumer时,发现此consumer仍然有大量的消息尚没有消费(ACK),broker会认为此consumer是慢速的。

在Queue中,如果已发送(dispatched)但没有消费(unAck)的消息条数 > prefetchSize时,此consumer被标记为Slow。

在Topic中,如果cacheLimit已满,但是需要向此订阅者发送的消息量 >
prefetchSize时,此订阅者被标记为Slow。

简单描述为: 快速的producer生产的消息,不能被消费者及时的消费,而导致在broker端积压。

解决方案:

1) 关闭Slow Consumer

brorker端一旦发现slow consumer,就将它注册到慢速消费者列表中,此后将有额外的线程扫描并关闭它们,其中abortConnection表示,是否关闭底层的transport,默认为false,此时将会通过transport向client端发送一个指令(其中包括consumerId),当client端(Session)接收之后,将会调用consumer.close()方法;如果此值为true,将会导致底层的transport链接被关闭,这是很粗暴的办法,不过如果client端多个consumer共享一个connection的话,会导致所有的consumer被关闭,还是那句话:猪一样的队友,害了整个团队。


<policyEntry queue=">" producerFlowControl="true" memoryLimit="512mb">

<slowConsumerStrategy>

<abortSlowConsumerStrategy abortConnection="false"/>

</slowConsumerStrategy>

</policyEntry>

2) 抛弃旧消息(仅对Topic有效,仅对nondurable订阅者有效)

topic支持支持3种移除策略:

(1) oldestMessageEvictionStrategy表示移除最旧的消息

(2) uniquePropertyMessageEvictionStrategy表示移除根据属性值筛选消息并移除最旧的

(3)
OldestMessageWithLowestPriorityEvictionStrategy表示在旧消息中移除权重最低的。

3) 写入临时文件

       对于Queue而言,支持storeCursor,vmQueueCursor , fileQueueCursor。其中storeCursor是一个“综合”策略,持久化消息使用fileQueueCurosr支持,非持久化消息使用vmQueueCursor支持。vmQueueCursor基于内存,fileQueueCursor表示将数据写入本地临时文件(由tempDataStore决定)。

对于Topic而言,我们也可以根据订阅者的类型,来决定如果处理那些滞留的非持久化消息。

4) offlineDurableSubscriberTimeout

对于durable订阅者,如果订阅者“离线”,那么Broker将会一直保存属于它的消息,因此消息也会以为它而不能删除,导致积压。通常,订阅者离线的时间是无法预估的,有可能此订阅者永远都不会再上线(可能因为durable订阅者本来应该cancel,但是开发者却忘记了),这对broker来说是致命的。我们需要限制durable订阅者离线的时间,如果超过时间,那么订阅者将会被移除,消息也会因此而删除。


<broker name="localhost" offlineDurableSubscriberTimeout="86400000" offlineDurableSubscriberTaskSchedule="3600000">

8. Activemq消息确认机制

optimizeAck表示是否开启“优化ACK”, 只有在为true的情况下,prefetchSize(下文中将会简写成prefetch)以及optimizeAcknowledgeTimeout参数才会有意义, "optimizeAcknowledgeTimeout"选项只能在brokerUrl中配置, 在destinationUri中指定prefetchSize(预获取)选项,
其中brokerUrl参数选项是全局的,如果同时指定,brokerUrl中的参数选项值将会被覆盖;

当consumer端使用MessageListener异步获取消息时: prefetch>=1

当consumer端使用receive()方法同步获取消息时: prefetch>=0

prefetch=0, receive()方法将会首先发送一个PULL指令并阻塞,直到broker端返回消息为止;

prefetch>0, broker端将会批量push给client 一定数量的消息(<=
prefetch),client端会把这些消息(unconsumedMessage)放入到本地的队列中;

原文地址:https://www.cnblogs.com/jinloooong/p/9609420.html

时间: 2024-08-29 02:57:20

ActiveMQ配置策略的相关文章

BGinfo配置策略

策略:计算机配置-策略-Windows设置-脚本-启动 脚本:井号内为脚本内容 ############################################################ @echo off Del C:\Users\%username%\AppData\Local\Temp /Q net use z: \\hbgslz.com\netlogon\bginfo z: bginfo.exe bginfo.bgi /timer:00 /nolicprompt net u

SRX 配置策略与IDS日志

SRX配置策略日志set system syslog file policy_session user infoset system syslog file policy_session match RT_FLOW (关键字)set system syslog file policy_session archive size 1000kset system syslog file policy_session archive world-readableset system syslog fil

netsh导入导出IPSec配置策略

C:\Users\XYY>netsh ? //首先查看netsh 用法: netsh [-a AliasFile] [-c Context] [-r RemoteMachine] [-u [DomainName\]UserName] [-p Password | *] [Command | -f ScriptFile] 下列指令有效: 此上下文中的命令:? - 显示命令列表.add - 在项目列表上添加一个配置项目.advfirewall - 更改到 `netsh advfirewall' 上下

Spring集成ActiveMQ配置 --转

转自:http://suhuanzheng7784877.iteye.com/blog/969865 集成环境 Spring采用2.5.6版本,ActiveMQ使用的是5.4.2,从apache站点可以下载.本文是将Spring集成ActiveMQ来发送和接收JMS消息. 集成步骤 将下载的ActiveMQ解压缩后文件夹如下 activemq-all-5.4.2.jar是activemq的所有的类jar包.lib下面是模块分解后的jar包.将lib下面的 Java代码 /lib/activati

ActiveMQ配置

1.连接BrokerURI failover://(tcp://localhost:61613,tcp://localhost:61612) 2.Master-slave配置 ActiveMQ的主备有三种方式:纯Master/Slave.文件共享方式.数据库共享方式. 3.Load Balance #activemq.xml <networkConnectors>             <networkConnector uri="static:(tcp://localhos

ActiveMQ 配置jdbc主从

使用 jdbc 方式配置主从模式,持久化消息存放在数据库中. 在同一时刻,只有一个 master broker,master 接受客户端的连接,slave 不接受连接.当 master 因为关机而下线后,其中一个 slave 会提升为 master,然后接受客户端连接.但原来 master 的非持久消息丢失了,而持久消息保存在数据库中. broker xml 配置:使用 MySQL 数据源 <!-- Licensed to the Apache Software Foundation (ASF)

记录activemq 配置mysql 持久化入得坑

首先重点说明一下,百度找解决问题方法的时候一定要好好先看看,结合自己的问题一点点的改正,切忌全盘接受,然后一定要备份自己的代码,做到可以及时撤销更改 首先第一个坑我是避开了,习惯于用最新版本,所有我用的activemq是515,jdk是1.8,避免了版本冲突问题, 若activemq用的是10以上则必须用jdk1.8,. 第二个问题,完美入坑,重点记录,以下是database配置 <persistenceAdapter> <jdbcPersistenceAdapter dataSourc

Linux防火墙简介 – iptables配置策略

Netfilter/iptables简介 ????要想真正掌握Linux防火墙体系,首先要搞清楚Netfilter和iptables的关系,Netfilter和iptables包含在Linux2.4以后的内核中,可实现防火墙.NAT和数据包分割的功能.Netfilter采用模块化设计,具有良好的可扩展性.Netfilter是一个框架,iptables则是我们用户层的工具,通过iptables我们可以配置很多规则,这些规则加载到Netfilter框架中生效. ????Netfilter可以和协议栈

nginx负载均衡配置策略

Nginx的upstream目前支持的以下几种方式的分配,公司常用的有 weight,fair, bakcup1 轮询(默认)每个请求按时间顺序逐一分配到不同的后端服务器,如果后端服务器down掉,能自动剔除. 例如:upstream bakend {server 192.168.0.88 ;server 192.168.0.89;}2 weight指定轮询几率,weight和访问比率成正比,用于后端服务器性能不均的情况.例如:upstream bakend {server 192.168.0.