rabbit--消息持久化

消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。 
为了保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化。

queue的持久化

queue的持久化是通过durable=true来实现的。

1 using (var connection = factory.CreateConnection())
2             {
3                 using (var channel = connection.CreateModel())
4                 {
5                     channel.QueueDeclare(queue:"hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
6
7                 }
8             }

参数说明:

queue:queue名称

exclusive:排他队列;如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。

        这里需要注意三点:1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;

                 2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;

               3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。

autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。

quereDeclear相关的4种方法,如下:

1 Queue.DeclareOk queueDeclare() throws IOException;
2 Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
3                                  Map<String, Object> arguments) throws IOException;
4 void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
5                             Map<String, Object> arguments) throws IOException;
6 Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;

其中需要说明的是queueDeclarePassive(String queue)可以用来检测一个queue是否已经存在

如果该队列存在,则会返回true;如果不存在,就会返回异常,但是不会创建新的队列。

消息的持久化

  如果将queue的持久化标识为durable设置true,则代表是一个持久化的队列,那么在服务重启后,也会存在;因为服务会把持久化的queue存放在硬盘上。当服务重启的时候,会重新设置之前持久化的queue。

  队列是可以被持久化,但是里面的消息是否为持久化那还要看消息的持久化设置

  也就是说重启之前那个queue里面还没有发出去的消息的话,重启之后那队列里面是不是还存在原来的消息,这个就要取决于发生着在发送消息时对消息的设置了。 
如果要在重启后保持消息的持久化必须设置消息是持久化的标识。

  设置消息的持久化:

channel.BasicPublish(exchange: "test", routingKey: "task_queue", basicProperties: MessageProperties.PERSISTENT_TEXT_PLAIN, body: body);

参数解析:

  exchange:exchange名字

  routingKey:routingKey名字

  body:表示发送的消息体

  basicProperties:首先看下BasicProperties的定义

 1 public BasicProperties(
 2             String contentType,//消息类型如:text/plain
 3             String contentEncoding,//编码
 4             Map<String,Object> headers,
 5             Integer deliveryMode,//1:nonpersistent 2:persistent
 6             Integer priority,//优先级
 7             String correlationId,
 8             String replyTo,//反馈队列
 9             String expiration,//expiration到期时间
10             String messageId,
11             Date timestamp,
12             String type,
13             String userId,
14             String appId,
15             String clusterId)

deliveryMode=1表示不持久化;deliveryMode=2则表示持久化。

那么MessageProperties.PERSISTENT_TEXT_PLAIN又是什么鬼?

1 public static final BasicProperties PERSISTENT_TEXT_PLAIN =
2     new BasicProperties("text/plain",
3                         null,
4                         null,
5                         2,
6                         0, null, null, null,
7                         null, null, null, null,
8                         null, null);

可以看到这其实就是讲deliveryMode设置为2的BasicProperties的对象,为了方便编程而出现的一个东东

另一种实现:

1 AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
2 builder.deliveryMode(2);
3 AMQP.BasicProperties properties = builder.build();
4 channel.basicPublish("exchange.persistent", "persistent",properties, "persistent_test_message".getBytes());

设置了队列和消息的持久化之后,当broker服务重启的之后,消息依旧存在。单只设置队列持久化,重启之后消息会丢失;单只设置消息的持久化,重启之后队列消失,既而消息也丢失。单单设置消息持久化而不设置队列的持久化显得毫无意义。

exchange的持久化

上面阐述了队列的持久化和消息的持久化,如果不设置exchange的持久化对消息的可靠性来说没有什么影响,但是同样如果exchange不设置持久化,那么当broker服务重启之后,exchange将不复存在,那么既而发送方rabbitmq producer就无法正常发送消息。同样设置exchange的持久化。exchange的持久化设置也特别简单,方法如下:

channel.ExchangeDeclare(exchange: "test", type: "direct/topic/header/fanout", durable: true);即在声明的时候讲durable字段设置为true即可。

 1 Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
 2 Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
 3                                    Map<String, Object> arguments) throws IOException;
 4 Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
 5 Exchange.DeclareOk exchangeDeclare(String exchange,
 6                                           String type,
 7                                           boolean durable,
 8                                           boolean autoDelete,
 9                                           boolean internal,
10                                           Map<String, Object> arguments) throws IOException;
11 void exchangeDeclareNoWait(String exchange,
12                            String type,
13                            boolean durable,
14                            boolean autoDelete,
15                            boolean internal,
16                            Map<String, Object> arguments) throws IOException;
17 Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;

引申的问题

1.将queue,exchange, message等都设置了持久化之后就能保证100%保证数据不丢失了嚒? 

答案是否定的。 
  首先,从consumer端来说,如果这时autoAck=true,那么当consumer接收到相关消息之后,还没来得及处理就crash掉了,那么这样也算数据丢失,这种情况也好处理,只需将autoAck设置为true(方法定义如下),然后在正确处理完消息之后进行手动ack(channel.basicAck).

  其次,关键的问题是消息在正确存入RabbitMQ之后,还需要有一段时间(这个时间很短,但不可忽视)才能存入磁盘之中,RabbitMQ并不是为每条消息都做fsync的处理,可能仅仅保存到cache中而不是物理磁盘上,在这段时间内RabbitMQ broker发生crash, 消息保存到cache但是还没来得及落盘,那么这些消息将会丢失。

  那么这个怎么解决呢?首先可以引入RabbitMQ的mirrored-queue即镜像队列,这个相当于配置了副本,当master在此特殊时间内crash掉,可以自动切换到slave,这样有效的保障了HA, 除非整个集群都挂掉,这样也不能完全的100%保障RabbitMQ不丢消息,但比没有mirrored-queue的要好很多,很多现实生产环境下都是配置了mirrored-queue的。还有要在producer引入事务机制或者Confirm机制来确保消息已经正确的发送至broker端。RabbitMQ的可靠性涉及producer端的确认机制、broker端的镜像队列的配置以及consumer端的确认机制,要想确保消息的可靠性越高,那么性能也会随之而降,鱼和熊掌不可兼得,关键在于选择和取舍。

2.消息什么时候刷到磁盘? 

  写入文件前会有一个Buffer,大小为1M,数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘)。 
  有个固定的刷盘时间:25ms,也就是不管Buffer满不满,每隔25ms,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘。 
每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作。

时间: 2024-11-13 06:55:43

rabbit--消息持久化的相关文章

RabbitMQ(三):消息持久化策略

原文:RabbitMQ(三):消息持久化策略 一.前言 在正常的服务器运行过程中,时常会面临服务器宕机重启的情况,那么我们的消息此时会如何呢?很不幸的事情就是,我们的消息可能会消失,这肯定不是我们希望见到的结果.所以我们希望AMQP服务器崩溃了也可以将消息恢复,这称之为消息持久化.RabbitMQ自然存在这种策略可以帮助我们完成这件事情. 二.持久化的消息 当RabbitMQ服务器重启后,原先的队列和交换器会随同里面的消息一同消失.原因在于每个队列和交换器都有durable属性,该属性默认是fa

rabbitmq 消息持久化

rabbitmq 消息持久化 2016-02-18 11:19 224人阅读 评论(0) 收藏 举报  分类: 综合(15)  版权声明:本文为博主原创文章,未经博主允许不得转载. 二: 任务分发 &消息持久化 启用多个接收端的时候如果某一个receive 关闭要保证消息有反馈是否收到 send端 #-*- coding: UTF-8 -*-import pikacred = pika.PlainCredentials('zxl','pwd') #账号密码params = pika.Connec

ActiveMQ消息持久化机制

为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制. ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的. 就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件.内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试. 消息中心启动以后首先要检查制定的存储位置,如果有未发送成功的消息,则需要把消息发送出去. >>

JMS消息持久化,将ActiveMQ消息持久化到mySql数据库中

ActiveMQ5.8.0版本采用kahadb作为默认的消息持久化方式.使用默认的持久化机制,我们不容易直接看到消息究竟是如何持久的.ActiveMQ提供的JDBC持久化机制,能够将持久化信息存储到数据库.通过查看数据库中ActiveMQ生成的表结构和存储的数据,能够帮助我们更好的了解消息的持久化机制.现在介绍如何配置activemq,将数据持久化到mysql中. 1.配置activeMQ需要的mySql数据源 为了能够使用JDBC访问mysql数据库,显然必须要配置消息服务器的数据库源.在ac

【转】 使用Redis的Pub/Sub来实现类似于JMS的消息持久化

http://blog.csdn.net/canot/article/details/52040415 关于个人对Redis提供的Pub/Sub机制的认识在上一篇博客中涉及到了,也提到了关于如何避免Redis的Pub/Sub的一个最大的缺陷的思路-消息的持久化(http://blog.csdn.net/canot/article/details/51975566).这篇文章主要是关于其思路(Redis的Pub/Sub的消息持久化)的代码实现: Pub/Sub机制中最核心的Listener的实现:

理解JMS规范中消息的传输模式和消息持久化

JMS规范定义了2种消息传输模式:持久传送模式和非持久传输模式.发送者可以通过如下类似的代码进行设置 TopicPublisher publihser = session.createPublisher(topic); // 设置持久化传输 publihser.setDeliveryMode(DeliveryMode.PERSISTENT); 这种方式对publisher发送的所有消息都有效,相当于是一个全局的效果.如果只是想设置某一个消息的传输模式,可以通过以下代码设置消息头的属性来实现 Te

ActiveMQ的消息持久化机制

为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制. ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的. 就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件.内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试. 消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去. 1. J

Rabbitmq消息持久化

消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢--消息持久化.?为了保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化. queue的持久化 queue的持久化是通过durable=true来实现的.?一般程序中这么使用: /** * amqp_queue_declare * * @param [in] state connection state – TCP连接 * @param

RabbitMQ原理与相关操作(三)消息持久化

现在聊一下RabbitMQ消息持久化: 问题及方案描述 1.当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间.在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了. 这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情. 2.在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的.当RabbitMQ死掉了或者重启了,上次创建的队列.消息都不会保存. 这种情况可以使用RabbitMQ提供的消

4、RabbitMQ-消息应答与消息持久化

消息应答( Message acknowledgment) 1. Message acknowledgment(消息应答) 执行任务可能需要几秒钟.你可能想知道如果其中一个消费者开始一项长期任务并且只是部分完 成而死亡会发生什么.使用我们当前的代码,一旦RabbitMQ向消费者发送消息,它立即将其标 记为删除.在这种情况下,如果你杀死一个工人,我们将丢失它刚刚处理的消息.我们还将丢 失分发给这个特定工作者但尚未处理的所有消息. 但我们不想失去任何任务.如果工人死亡,我们希望将任务交付给另一名工人