(转)RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack。那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理。

在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care。他只是将自己的状态publish给上层,由上层的逻辑去处理。如果Message没有被正确处理,可能会导致某些状态丢失。但是由于提供了其他强制刷新全部状态的机制,因此这种异常情况的影响也就可以忽略不计了。

对于某些异步操作,比如客户端需要创建一个FileSystem,这个可能需要比较长的时间,甚至要数秒钟。这时候通过RPC可以解决这个问题。因此也就不存在Publisher端的确认机制了。

那么,有没有一种机制能保证Publisher能够感知它的Message有没有被处理的?答案肯定的。在这里感谢笑天居士同学:他在我的《RabbitMQ消息队列(三):任务分发机制》文后留言一起讨论了问题,而且也查找了一些资料。在这里我整理了一下他转载和一篇文章和原创的一篇文章。衔接已经附后。

1. 事务机制 VS Publisher Confirm

如果采用标准的 AMQP 协议,则唯一能够保证消息不会丢失的方式是利用事务机制 -- 令 channel 处于 transactional 模式、向其 publish 消息、执行 commit 动作。在这种方式下,事务机制会带来大量的多余开销,并会导致吞吐量下降 250% 。为了补救事务带来的问题,引入了 confirmation 机制(即 Publisher Confirm)。

为了使能 confirm 机制,client 首先要发送 confirm.select 方法帧。取决于是否设置了 no-wait 属性,broker 会相应的判定是否以 confirm.select-ok 进行应答。一旦在 channel 上使用 confirm.select方法,channel 就将处于 confirm 模式。处于 transactional 模式的 channel 不能再被设置成 confirm 模式,反之亦然。
    一旦 channel 处于 confirm 模式,broker 和 client 都将启动消息计数(以 confirm.select 为基础从 1 开始计数)。broker 会在处理完消息后,在当前 channel 上通过发送 basic.ack 的方式对其进行 confirm 。delivery-tag 域的值标识了被 confirm 消息的序列号。broker 也可以通过设置 basic.ack 中的 multiple 域来表明到指定序列号为止的所有消息都已被 broker 正确的处理了。

在异常情况中,broker 将无法成功处理相应的消息,此时 broker 将发送 basic.nack 来代替 basic.ack 。在这个情形下,basic.nack 中各域值的含义与 basic.ack 中相应各域含义是相同的,同时 requeue 域的值应该被忽略。通过 nack 一或多条消息,broker 表明自身无法对相应消息完成处理,并拒绝为这些消息的处理负责。在这种情况下,client 可以选择将消息 re-publish 。

在 channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack) 或者被 nack 一次。但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm 又被 nack 。

2. 消息在什么时候确认

broker 将在下面的情况中对消息进行 confirm :

  • broker 发现当前消息无法被路由到指定的 queues 中(如果设置了 mandatory 属性,则 broker 会先发送 basic.return)
  • 非持久属性的消息到达了其所应该到达的所有 queue 中(和镜像 queue 中)
  • 持久消息到达了其所应该到达的所有 queue 中(和镜像 queue 中),并被持久化到了磁盘(被 fsync)
  • 持久消息从其所在的所有 queue 中被 consume 了(如果必要则会被 acknowledge)

broker 会丢失持久化消息,如果 broker 在将上述消息写入磁盘前异常。在一定条件下,这种情况会导致 broker 以一种奇怪的方式运行。例如,考虑下述情景:

1.  一个 client 将持久消息 publish 到持久 queue 中
   2.  另一个 client 从 queue 中 consume 消息(注意:该消息具有持久属性,并且 queue 是持久化的),当尚未对其进行 ack
   3.  broker 异常重启
   4.  client 重连并开始 consume 消息

在上述情景下,client 有理由认为消息需要被(broker)重新 deliver 。但这并非事实:重启(有可能)会令 broker 丢失消息。为了确保持久性,client 应该使用 confirm 机制。如果 publisher 使用的 channel 被设置为 confirm 模式,publisher 将不会收到已丢失消息的 ack(这是因为 consumer 没有对消息进行 ack ,同时该消息也未被写入磁盘)。

3. 编程实现

首先要区别AMQP协议mandatory和immediate标志位的作用。

mandatory和immediate是AMQP协议中basic.pulish方法中的两个标志位,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。具体区别在于:
1. mandatory标志位
当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者;当mandatory设为false时,出现上述情形broker会直接将消息扔掉。
2. immediate标志位
当immediate标志位设置为true时,如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。

具体的代码参考请参考参考资料1.

参考资料:

1. http://blog.csdn.net/jiao_fuyou/article/details/21594205

2. http://blog.csdn.net/jiao_fuyou/article/details/21594947

3.  http://my.oschina.net/moooofly/blog/142095

时间: 2024-10-25 05:18:51

(转)RabbitMQ消息队列(九):Publisher的消息确认机制的相关文章

RabbitMQ(python实现)学习之二:Producer发送消息至多个消息队列queue(广播消息)

1.1本部分内容简介 这部分我们将要发送一个消息到多个Consumer,这部分称之为"publish/subscribe" 我们实现的方式就是发送端,发送一个消息,与此同时,多个接收端将同时接收到消息并打印在屏幕上面. 1.2exchange简介 在前面的博文中,我们的讲解是:发送端发送消息至消息队列,接收端从消息队列获取消息.现在我们来介绍一下rabbitmq的完整消息传送模型. >Producer:用来发送消息的应用程序 >queue:用来存储消息的缓存 >Con

全面理解Handler第一步:理解消息队列,手写消息队列

前言 Handler机制这个话题,算是烂大街的内容.但是为什么偏偏重拿出来"炒一波冷饭"呢?因为自己发现这"冷饭"好像吃的不是很明白.最近在思考几个问题,发现以之前对Handler机制的了解是在过于浅显.什么问题? Handler机制存在的意义是什么?能否用其他方式替换? Looper.loop();是一个死循环,为什么没有阻塞主线程?用什么样的方式解决死循环的问题? 如果透彻的了解Handler,以及线程的知识.是肯定不会有这些疑问的,因为以上问题本身就存在问题.

消息队列属性及常见消息队列介绍

什么是消息队列?消息队列是在消息的传输过程中保存消息的容器,用于接收消息并以文件的方式存储,一个队列的消息可以同时被多个消息消费者消费.分布式消息服务DMS则是分布式的队列系统,消息队列中的消息分布存储,且每条消息存储多个副本,以实现高可用性,如下图所示. 一般来说,消息队列具有如下属性: 消息顺序普通队列支持"分区有序"和"全局队列"两种模式,ActiveMQ队列和Kafka队列均为分区有序. 分区有序的队列通过分布式处理,支持更高的并发,但由于队列的分布式特性,

Kafka 消息队列系列之分布式消息队列Kafka

介绍 ApacheKafka®是一个分布式流媒体平台.这到底是什么意思呢?我们认为流媒体平台具有三个关键功能:它可以让你发布和订阅记录流.在这方面,它类似于消??息队列或企业消息传递系统.它允许您以容错方式存储记录流.它可以让您在发生记录时处理记录流.什么是卡夫卡好?它被用于两大类的应用程序:构建可在系统或应用程序之间可靠获取数据的实时流数据管道构建实时流应用程序,可以转换或响应数据流要了解卡夫卡如何做这些事情,让我们深入探索卡夫卡的能力.首先几个概念:Kafka作为一个或多个服务器上的集群运行

消息队列(一)——消息的简单发送与接收

背景 开发者经常遇到需要异步执行操作的情况(即过程不等到操作完成就开始).消息队列提供一个中心位置或池,您可以在其中放置或从中提取数据,从而满足了这一要求.一个应用程序能够把消息存放在队列中,然后继续自己的业务,另一个应用程序在运行时再提取这些数据. 简单理解 感觉这里的消息队列还是一个典型的"buffer"思想:即就像喝水一样,如果有一杯水,我可能就直接喝掉了:但是如果有一壶水,我可能要先把水倒进杯子,然后再从杯子里喝水. 消息队列在这里起到了个杯子的作用. 代码示例 建立消息队列并

jedis实现redis的消息队列、发布对象消息、字节数组与字符串相互转换

redis支持发布/订阅的消息队列机制,jedis提供了java访问redis的客户端,本文将描述如何用jedis实现简单的消息队列,并传输对象. redis支持发布.订阅的功能,基本的命令有publish.subscribe等.在jedis中,有对应的java方法,并且只能发布字符串消息.为了传输对象,需要将对象进行序列化,并封装成字符串进行处理.将对象序列化后,只能成为字节流,如何封装成字符串是一个难点,具体可参考下面的代码. 实现三个类,一个对应publish.一个对应subscribe.

php消息队列之 think queue消息队列初体验

使用thinkphp 5的  消息队列 think queue ● php think queue:listen --queue queuename ● php think queue:work --daemon --queue xwyqueue 使用这两个命令进行消息队列的监控,在整个Linux操作界面关闭以后,发现就无法运行了. 原因就是这个进程没有常驻在系统后台.那么就需要用到liunx操作系统的 supervisor 来保证进程常驻 在百度搜索 supervisor 的安装 使用 然后配

消息队列总结

原文:消息队列总结 前言:关于消息队列应该大家都不陌生,在实际的项目中消息队列也无处不在,今天我和大家分享一下关于消息队列的问题. 1.消息队列定义 消息队列大家又经常称为MQ(message queue),从字面的含义来看就是一个存放消息的容器. 2.消息队列应用场景 2.1.异步处理 2.2.系统解耦 2.3.流量削峰 3.消息队列顺序性 提到mq那么我们必然会讨论mq顺序性问题,比如生产者发送消息1,2,3...对于消费者必须按照1,2,3...这样的顺序来消费,那么消息队列应该怎么样去考

RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message