activeMq消息转投rabbitMq研究

在研究activemq转投消息到rabbitmq的过程中还是发现了很多有趣的细节。    消息发送端分为PERSISTENT与NON_PERSISTENT,该类型表示是否持久化消息到数据库中。

1. Activemq默认使用kahaDB。我大Q9使用的也是kahaDB。当然也是支持mysql等数据库的。

具体配置在${activemq.base}/conf/activemq.xml中。

<persistenceAdapter>
   <kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>

<!--
   The systemUsage controls the maximum amount of space the broker will
   use before slowing down producers. For more information, see:

http://activemq.apache.org/producer-flow-control.html
   -->
<systemUsage>
   <systemUsage>
      <memoryUsage>
         <memoryUsage limit="4 gb"/>
      </memoryUsage>
      <storeUsage>
         <storeUsage limit="15 gb"/>
      </storeUsage>
      <tempUsage>
         <tempUsage limit="100 mb"/>
      </tempUsage>
   </systemUsage>
</systemUsage>

和大家理解不一样的地方是NON_PERSISTENT是会使用文件作为存储介质的。主要是为了防止内存挤爆。当发送者发送过快或者接受者处理过慢都会导致使用大量内存。此时将消息临时存储在临时文件中(swap)。

2.    对于PERSISTENT与NON_PERSISTENT区别在于是否在mq服务器重启后能够正常发送消息。PERSISTENT的消息在服务器重启后依然能够将message发送出去。

如果服务端的topic没有订阅者该消息将被直接丢弃。

3.       消费者的持久化则有一定区别。当为queue的时候,若客户端不在线等到某个客户端消费了该消息时则会将该消息删除。当为topic时,若客户端未设置subscriptionDurable,则该客户端必须要在线才能收到订阅。当客户端设置subscriptionDurable为true时,则服务器会保存该消息直到被所有的订阅者均消费一次(消费是指服务器收到ack回复)

消息发送端 消息接收端 可靠性及因素
PERSISTENT queue receiver/durable subscriber 消费一次且仅消费一次。可靠性最好,但是占用服务器资源比较多。
PERSISTENT non-durable subscriber 最多消费一次。这是由于non-durable subscriber决定的,如果消费端宕机或其他问题导致与JMS服务器断开连接,等下次再联上JMS服务器时的一系列消息,不为之保留。
NON_PERSISTENT queue receiver/durable subscriber 最多消费一次。这是由于服务器的宕机会造成消息丢失
NON_PERSISTENT non-durable subscriber 最多消费一次。这是由于服务器的宕机造成消息丢失,也可能是由于non-durable subscriber的性质所决定

特别注意:经过实验证明当消息发送为NON_PERSISTENT,即使是durable subscriber再次连接上也是无法送到消息。期间服务器未重启

服务端可以根据clientId及durableSubscriptionName来辨别指定的订阅者以便将该订阅者尚未消费的消息供消费。

记得在设置subscriptionDurable时候也需要设置durableSubscriptionName如下注释。否则该名称会变成listener的名称。

/**  * Set whether to make the subscription durable. The durable subscription name  * to be used can be specified through the "durableSubscriptionName" property.  * <p>Default is "false". Set this to "true" to register a durable subscription,  * typically in combination with a "durableSubscriptionName" value (unless  * your message listener class name is good enough as subscription name).  * <p>Only makes sense when listening to a topic (pub-sub domain).  * @see #setDurableSubscriptionName  */

如下为一配置示例

<bean id="jmsbillChoiceOfAviationContainer"    class="org.springframework.jms.listener.DefaultMessageListenerContainer">    <property name="connectionFactory" ref="expressJmsFactory" />    <property name="destination" ref="expressDssScanTopicDestination" />    <property name="messageListener" ref="billChoiceOfAviationTopicListener" />    <property name="durableSubscriptionName" value="billChoiceOfAviation" />    <property name="clientId" value="billChoiceOfAviationClient" />    <property name="subscriptionDurable" value="true" /> </bean>

请注意在配置destination属性是topic。该类还有一个属性为destinationName。配置该属性时会默认为queue。

在设置destination时执行如下,可以看到还有setPubSubDomain(true);才会表示为订阅模式否则为点对点模式

/**  * Set the destination to receive messages from.  * <p>Alternatively, specify a "destinationName", to be dynamically  * resolved via the {@link org.springframework.jms.support.destination.DestinationResolver}.  * <p>Note: The destination may be replaced at runtime, with the listener  * container picking up the new destination immediately (works e.g. with  * DefaultMessageListenerContainer, as long as the cache level is less than  * CACHE_CONSUMER). However, this is considered advanced usage; use it with care!  * @see #setDestinationName(String)  */ public void setDestination(Destination destination) {    Assert.notNull(destination, "‘destination‘ must not be null");    this.destination = destination;    if (destination instanceof Topic && !(destination instanceof Queue)) {       // Clearly a Topic: let‘s set the "pubSubDomain" flag accordingly.       setPubSubDomain(true);    } }

因此在设置destinationName属性时如果是topic需要增加

<property name="pubSubDomain" value="true"/>

4.      rabbitMQ和activeMq很大的不同在于rabbitMq的消息发送完全基于queue。

在Rabbit MQ中,无论是生产者发送消息还是消费者接受消息,都首先需要声明一个MessageQueue。这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确:

l  消费者是无法订阅或者获取不存在的MessageQueue中信息。

l  消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。

在明白了上述两点以后,就容易理解如果是消费者去声明Queue,就有可能会出现在声明Queue之前,生产者已发送的消息被丢弃的隐患。如果应用能够通过消息重发的机制允许消息丢失,则使用此方案没有任何问题。但是如果不能接受该方案,这就需要无论是生产者还是消费者,在发送或者接受消息前,都需要去尝试建立消息队列。这里有一点需要明确,如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的

当rabbitmq的生产者发送消息出来后该消息会发送到指定的exchange中。Exchange分为如下几种常用类型:direct, fanout,topic

1)    fanout
所有bind到此exchange的queue都可以接收消息

通常此处routingkey为””

不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

Channel channel = connection.createChannel();  
channel.exchangeDeclare("exchangeName", "direct"); //direct fanout topic  
channel.queueDeclare("queueName");  
channel.queueBind("queueName", "exchangeName", "routingKey");  
  
byte[] messageBodyBytes = "hello world".getBytes();  
//需要绑定路由键  
channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);

2)    direct
通过routingKey和exchange决定的那个唯一的queue可以接收消息

处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。

Channel channel = connection.createChannel();  
channel.exchangeDeclare("exchangeName", "direct"); //direct fanout topic  
channel.queueDeclare("queueName");  
channel.queueBind("queueName", "exchangeName", "routingKey");  
  
byte[] messageBodyBytes = "hello world".getBytes();  
//需要绑定路由键  
channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);

3)    topic
所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout

Channel channel = connection.createChannel();  
channel.exchangeDeclare("exchangeName", "topic"); //direct fanout topic  
channel.queueDeclare("queueName");  
channel.queueBind("queueName", "exchangeName", "routingKey.*");  
  
byte[] messageBodyBytes = "hello world".getBytes();  
channel.basicPublish("exchangeName", "routingKey.one", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);

Rabbitmq和activemq区别很大的一点是在于当生产者发送消息给topic时,activemq是将该消息广播至该处所有的订阅者(包括离线持久订阅者),而rabbitmq的消息在发送时不一样是还需要配合routingkey。只有符合表达式的订阅者才会被转发。二订阅者依旧是关注被转发的queue,符合该表达式的消息会被转发至对应的queue中,这样客户端消费者才可以消费到。

因此想要持久化订阅topic在rabbitmq中还需要对应在exchange中增加一个名称唯一的queue来进行转发。

如下如果多个listener需要订阅该topic,则需要每个listener对应一个不同的queue,以便转发。

<rabbit:topic-exchange  id="expressDssScanTopicExchange" name="expressDssScanTopicExchange" durable="true" >       <rabbit:bindings>           <rabbit:binding queue="express.scan" pattern="express.dssScan"/>    <rabbit:binding queue="express.dss" pattern="express.dssScan"/>       </rabbit:bindings>   </rabbit:topic-exchange>
时间: 2024-08-02 19:45:25

activeMq消息转投rabbitMq研究的相关文章

消息队列原理及ActiveMQ、RocketMQ、RabbitMQ、Kafka区别总结

消息队列 为什么写这篇文章? 博主有两位朋友分别是小A和小B: 小A,工作于传统软件行业(某社保局的软件外包公司),每天工作内容就是和产品聊聊需求,改改业务逻辑.再不然就是和运营聊聊天,写几个SQL,生成下报表.又或者接到客服的通知,某某功能故障了,改改数据,然后下班部署上线.每天过的都是这种生活,技术零成长. 小B,工作于某国企,虽然能接触到一些中间件技术.然而,他只会订阅/发布消息.通俗点说,就是调调API.对为什么使用这些中间件啊?如何保证高可用啊?没有充分的认识. 庆幸的是两位朋友都很有

RabbitMQ,Apache的ActiveMQ,阿里RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可实现消息队列,RabbitMQ的应用场景以及基本原理介绍,RabbitMQ基础知识详解,RabbitMQ布曙

消息队列及常见消息队列介绍 2017-10-10 09:35操作系统/客户端/人脸识别 一.消息队列(MQ)概述 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为: 当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候. 消息队列主要解决了应用耦合.异步处理.流量削锋等问题. 当前使用较多的消息队列有RabbitMQ.RocketMQ.ActiveMQ.Kafka.ZeroMQ.MetaMq等,而部分数据库如Re

消息队列之 RabbitMQ

关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时候把这块的知识整理记录一下了. 市面上的消息队列产品有很多,比如老牌的 ActiveMQ.RabbitMQ ,目前我看最火的 Kafka ,还有 ZeroMQ ,去年底阿里巴巴捐赠给 Apache 的 RocketMQ ,连 redis 这样的 NoSQL 数据库也支持 MQ 功能.总之这块知名的产品就有十几种,就我自己的使用经验和兴趣只打算谈谈 RabbitMQ.Kafka 和 A

转 消息队列之 RabbitMQ

转 https://www.jianshu.com/p/79ca08116d57 消息队列之 RabbitMQ 预流 2017.05.06 16:03* 字数 4884 阅读 80990评论 18喜欢 236赞赏 1 关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时候把这块的知识整理记录一下了. 市面上的消息队列产品有很多,比如老牌的 ActiveMQ.RabbitMQ ,目前我看最火的 Kafka ,还有 ZeroMQ ,去年底

消息队列和 RabbitMQ

消息队列和 RabbitMQ 消息队列 来看这样一个例子.相信大家都用外卖软件点过外卖.当我们在手机上下单之后,都发生了什么呢? 首先,客户用手机下单,将请求发送给服务端.服务端当然要先安排订单系统确认我们的订单信息,只有当我们付款成功后,订单才会生成.订单生成后,需要做三件事(真实的系统要做的事情更多,这里只是简单讨论):分配骑士.告知商家和记录流水. 如果按照流程顺序,一步一步走,执行同步操作.我们假设订单系统要花费 500 ms,其他系统都要花费 300 ms.那么整套系统应该要花费 1.

2015年12月10日 spring初级知识讲解(三)Spring消息之activeMQ消息队列

基础 JMS消息 一.下载ActiveMQ并安装 地址:http://activemq.apache.org/ 最新版本:5.13.0 下载完后解压缩到本地硬盘中,解压目录中activemq-core-5.13.0.jar,这就是ActiveMQ提供给我们的API. 在bin目录中,找到用于启动ActiveMQ的脚本,运行脚本后ActiveMQ就准备好了,可以使用它进行消息代理. 访问http://127.0.0.1:8161/admin/能看到如下则表示安装成功了. 二.在Spring中搭建消

JMS消息持久化,将ActiveMQ消息持久化到mySql数据库中

ActiveMQ5.8.0版本采用kahadb作为默认的消息持久化方式.使用默认的持久化机制,我们不容易直接看到消息究竟是如何持久的.ActiveMQ提供的JDBC持久化机制,能够将持久化信息存储到数据库.通过查看数据库中ActiveMQ生成的表结构和存储的数据,能够帮助我们更好的了解消息的持久化机制.现在介绍如何配置activemq,将数据持久化到mysql中. 1.配置activeMQ需要的mySql数据源 为了能够使用JDBC访问mysql数据库,显然必须要配置消息服务器的数据库源.在ac

JAVA的设计模式之观察者模式----结合ActiveMQ消息队列说明

1----------------------观察者模式------------------------------ 观察者模式:定义对象间一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知自动更新. activeMQ消息队列 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮

ActiveMQ 消息服务(三)

想象场景: 有一条任务,需要在特定环境下进行.用ActiveMQ 来讲分两步,第一:发送者发布一条消息:第二:接收者接收到这条消息后需要干某些事情. 本文依然直接贴出demo代码! 1.项目结构图: 2.activeMQ的jar包依赖,部分pom.xml文件代码: <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core