ActiveMQ(15):Message Dispatch的消息游标与异步发送

一、消息游标

1.1 简介

ActiveMQ发送持久消息的典型处理方式是:当消息的消费者准备就绪时,消息发送系统把存储的消息按批次发送给消费者,

在发送完一个批次的消息后,指针的标记位置指向下一批次待发送消息的位置,进行后续的发送操作。这是一种比较健壮

和灵活的消息发送方式,但大多数情况下,消息的消费者不是一直处于这种理想的活跃状态。

因此,从ActiveMQ5.0.0版本开始,消息发送系统采用一种混合型的发送模式,当消息消费者处理活跃状态时,允许消息发

送系统直接把持久消息发送给消费者,当消费者处于不活跃状态下,切换使用Cursors(游标)来处理消息的发送。

当消息消费者处于活跃状态并且处理能力比较强时,被持久存储的消息直接被发送到与消费者关联的发送队列,见下图:

当消息已经出现积压,消费者再开始活跃;或者消费者的消费速度比消息的发送速度慢时,消息将从Pending Cursor中提取,

并发送与消费者关联的发送队列。见下图:

1.2 分类

Message Cursors分成三种类型:

1: Store-based

2:VM

3:File-based

1.2.1 Store-based

从activemq5.0开始,默认使用此种类型的cursor,其能够满足大多数场景的使用要求。同时支持非持久消息的处理,Store-based内

嵌了File-based的模式,非持久消息直接被Non-Persistent Pending Cursor所处理。工作模式见下图:

1.2.2 VM

相关的消息引用存储在内存中,当满足条件时,消息直接被发送到消费者与之相关的发送队列,处理速度非常快,但出现慢消费者或者

消费者长时间处于不活跃状态的情况下,无法适应。工作模式见下图 :

1.2.3 File-based

当内存设置达到设置的限制,消息被存储到磁盘中的临时文件中。工作模式见下图 :

1.3 配置

在缺省情况下,ActiveMQ会根据使用的Message Store来决定使用何种类型的Message Cursors,但是你可以根据destination来配

置Message Cursors,例如:

1.3.1 对Topic subscribers

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry topic="org.apache.>" producerFlowControl="false" memoryLimit="1mb">
                <dispatchPolicy>
                    <strictOrderDispatchPolicy />
                </dispatchPolicy>
                <deadLetterStrategy>
                    <individualDeadLetterStrategy topicPrefix="Test.DLQ." />
                </deadLetterStrategy>
                <pendingSubscriberPolicy>
                    <vmCursor />
                </pendingSubscriberPolicy>
                <pendingDurableSubscriberPolicy>
                    <vmDurableCursor/>
                </pendingDurableSubscriberPolicy>
            </policyEntry>
        </policyEntries>
    </policyMap>
</destinationPolicy>

配置说明:

有效的Subscriber类型是vmCursor和fileCursor,缺省是store based cursor。有效的持久化Subscriber的cursor types是

storeDurableSubscriberCursor, vmDurableCursor 和fileDurableSubscriberCursor,缺省是store based cursor。

1.3.2 对于Queues的配置

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry queue="org.apache.>">
                <deadLetterStrategy>
                    <individualDeadLetterStrategy queuePrefix="Test.DLQ."/>
                </deadLetterStrategy>
                <pendingQueuePolicy>
                    <vmQueueCursor />
                </pendingQueuePolicy>
            </policyEntry>
        </policyEntries>
    </policyMap>
</destinationPolicy>

配置说明:有效的类型是storeCursor, vmQueueCursor 和 fileQueueCursor

二、异步发送

2.1 简介

AciveMQ支持异步和同步发送消息,是可以配置的。通常对于快的消费者,是直接把消息同步发送过去,但对于一个Slow Consumer,

你使用同步发送消息可能出现Producer堵塞等现象,慢消费者适合使用异步发送。

2.1 配置使用

1:ActiveMQ默认设置dispatcheAsync=true是最好的性能设置。如果你处理的是Fast Consumer则使用dispatcheAsync=false

2:在Connection URI级别来配置使用Async Send

cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");

3:在ConnectionFactory级别来配置使用Async Send

((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);

4:在Connection级别来配置使用Async Send

((ActiveMQConnection)connection).setUseAsyncSend(true);
时间: 2024-07-30 10:43:56

ActiveMQ(15):Message Dispatch的消息游标与异步发送的相关文章

ActiveMQ学习第七篇:Message Dispatch(消息发送)的特性

Message Cursors ??ActiveMQ发送持久消息的典型处理方式是:当消息的消费者准备就绪时,消息发送系统把存储的消息按批次发送给消费者,在发送完一个批次的消息后,指针的标记位置指向下-批次待发送消息的位置,进行后续的发送操作.这是一种比较健壮和灵活的消息发送方式,但大多数情况下,消息的消费者不是一直处于这种理想的活跃状态. ??因此,从ActiveMQ5. 0. 0版本开始,消息发送系统采用一种混合型的发送模式,当消息消费者处理活跃状态时,允许消息发送系统直接把持久消息发送给消费

ActiveMQ(16):Message Dispatch的分发策略、消息批量确认和生产者流量控制

一.分发策略(Dispatch Policies) 1.1 严格顺序分发策略(Strict Order Dispatch Policy) 通常ActiveMQ会保证topic consumer以相同的顺序接收来自同一个producer的消息,但有时候也需要保证不同的topic consumer以 相同的顺序接收消息,然而,由于多线程和异步处理,不同的topic consumer可能会以不同的顺序接收来自不同producer的消息. Strict order dispatch policy会保证每

ActiveMQ消息游标 --转载

转:http://blog.csdn.net/m13321169565/article/details/8081358 在Activemq以前的版本中,broker会把待发送的消息保存在内存中.这种方式的缺陷是当消费者消费的速度赶不上生产者的速度时,会在broker的内存中积攒大量的消息,当达到一个限额后,broker就不再接收消息.这时生产者就被阻塞了,直到broker将内存清理能保存消息后才能继续发送.     在5.0版本后,Activemq实现了一种新的内存模型来防止慢消费者阻塞快速生产

ActiveMQ(十)——Message Dispatch高级特性

一.消息游标二.异步发送三.严格分发策略四.轮询分发策略五.优化批量确认六.生产者流量控制 原文地址:https://blog.51cto.com/mazongfei/2415612

ActiveMQ持久化到mysql实现消息永不丢失

ActiveMQ持久化到mysql实现消息永不丢失 配置 1.找到apache-activemq-5.15.2/examples/conf下面的activemq-jdbc-performance.xml 2.打开activemq-jdbc-performance.xml,在persistenceAdapter节点后面添加dataSource="#mysql-ds" 并配置你的数据库 其实可以直接更改apache-activemq-5.15.2/conf/activemq.xml的per

Java秒杀系统实战系列~整合RabbitMQ实现消息异步发送

摘要: 本篇博文是“Java秒杀系统实战系列文章”的第八篇,在这篇文章中我们将整合消息中间件RabbitMQ,包括添加依赖.加入配置信息以及自定义注入相关操作组件,比如RabbitTemplate等等,最终初步实现消息的发送和接收,并在下一篇章将其与邮件服务整合,实现“用户秒杀成功发送邮件通知消息”的功能! 内容: 对于消息中间件RabbitMQ,想必各位小伙伴没有用过.也该有听过,它是一款目前市面上应用相当广泛的消息中间件,可以实现消息异步通信.业务服务模块解耦.接口限流.消息分发等功能,在微

Java与微信不得不说的故事——消息的接收与发送

Java与微信的知识也是自学阶段,代码都是参照柳峰老师的.具体可以查看此博:http://blog.csdn.net/lyq8479/article/details/8949088. 下面说一下消息的接收和发送吧. 消息的推送:当普通用户向公众账号发送消息是,微信服务器将POST消息到填写的URL上.消息是一个xml包. 消息的回复:对于每一个POST请求,开发者在响应包中返回特定的xml包,对消息进行响应. 所以,需要有解析xml包和包装xml包的方法.于是,引进了dom4j.jar和xstr

线程消息通信与异步处理

转载请标明出处: http://blog.csdn.net/yujun411522/article/details/46444869 本文出自:[yujun411522的博客] 关于android内消息通信和handler的知识在之前的Handler中已经简要介绍过了,这里介绍在native层的实现机制. 相信大家都知道一个标准的Looper线程的写法: public MyLooperThread extends Thread{ private Handler mHandler; public

RabbitMq+Spring boot 消息生产者向队列发送消息 (一)

本人学习新框架方法. 一.先学习框架基本知识,也就是看这本书的前三章,了解基本概念.比如这个Rabbitmq,我会先看一些概念,比如,交换机,路由器,队列,虚拟机. 二.然后写代码,写demo,有哪些不懂的地方直接再去翻书或者google找资料,带着问题去学习,学的更快更扎实一些. 三.然后再看这个框架的应用场景,自己能否独立的写一些简单的项目,来验证自己的成果. 四.实际项目积累经验. RabbitMq 消息生产者向队列发送消息 (一) MQ分为消息生产者和消息消费者,这次做的主要是消息的生产