RabbitMQ中文文档PHP版本(七)--发布者确认

2019年12月10日10:07:12

原文:https://www.rabbitmq.com/tutorials/tutorial-seven-java.html

注意这里目前没有PHP版本只有java版本

发布者确认

发布者确认 是实现可靠发布的RabbitMQ扩展。在通道上启用发布者确认后,代理将异步确认客户端发布的消息,这意味着它们已在服务器端处理。

(使用Java客户端)

先决条件

本教程假定RabbitMQ 在标准端口(5672)的本地主机上安装并运行。如果您使用其他主机,端口或凭据,则连接设置需要进行调整。

在哪里获得帮助

如果您在阅读本教程时遇到困难,可以 通过邮件列表与我们联系。

总览

在本教程中,我们将使用发布者确认来确保发布的消息已安全到达代理。我们将介绍几种使用发布者确认并解释其优缺点的策略。

在频道上启用发布者确认

发布者确认是AMQP 0.9.1协议的RabbitMQ扩展,因此默认情况下未启用它们。发布者确认在通道级别使用confirmSelect方法启用:

Channel channel = connection.createChannel();
channel.confirmSelect();

必须在希望使用发布者确认的每个频道上调用此方法。确认仅应启用一次,而不是对每个已发布的消息都启用。

策略1:分别发布消息

让我们从使用确认发布的最简单方法开始,即发布消息并同步等待其确认:

while (thereAreMessagesToPublish()) {
    byte[] body = ...;
    BasicProperties properties = ...;
    channel.basicPublish(exchange, queue, properties, body);
    // uses a 5 second timeout
    channel.waitForConfirmsOrDie(5_000);
}

在前面的示例中,我们像往常一样发布一条消息,并等待通过Channel#waitForConfirmsOrDie(long)方法对其进行确认。确认消息后,该方法立即返回。如果未在超时时间内确认该消息或该消息没有被确认(这意味着代理出于某种原因无法处理该消息),则该方法将引发异常。异常的处理通常包括记录错误消息和/或重试发送消息。

不同的客户端库有不同的方式来同步处理发布者的确认,因此请确保仔细阅读所使用客户端的文档。

该技术非常简单,但也有一个主要缺点:由于消息的确认会阻止所有后续消息的发布,因此它会大大减慢发布速度。这种方法不会提供每秒超过数百条已发布消息的吞吐量。但是,对于某些应用程序来说这可能已经足够了。

发布者确认异步吗?

我们在一开始提到代理是异步地确认发布的消息,但是在第一个示例中,代码同步等待直到消息被确认。客户端实际上异步接收确认,并相应地取消阻止对waitForConfirmsOrDie的调用 。可以将waitForConfirmsOrDie视为依赖于幕后异步通知的同步帮助器。

策略2:批量发布消息

为了改进前面的示例,我们可以发布一批消息,并等待整个批次被确认。以下示例使用了100个批次:

int batchSize = 100;
int outstandingMessageCount = 0;
while (thereAreMessagesToPublish()) {
    byte[] body = ...;
    BasicProperties properties = ...;
    channel.basicPublish(exchange, queue, properties, body);
    outstandingMessageCount++;
    if (outstandingMessageCount == batchSize) {
        ch.waitForConfirmsOrDie(5_000);
        outstandingMessageCount = 0;
    }
}
if (outstandingMessageCount > 0) {
    ch.waitForConfirmsOrDie(5_000);
}

与等待确认单个消息相比,等待一批消息被确认可以极大地提高吞吐量(对于远程RabbitMQ节点,这最多可以达到20-30次)。一个缺点是我们不知道发生故障时到底出了什么问题,因此我们可能必须将整个批处理保存在内存中以记录有意义的内容或重新发布消息。而且该解决方案仍然是同步的,因此它阻止了消息的发布。

策略3:处理发布者异步确认

代理异步确认已发布的消息,只需在客户端上注册一个回调即可收到这些确认的通知:

Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {
    // code when message is confirmed
}, (sequenceNumber, multiple) -> {
    // code when message is nack-ed
});

有两种回调:一种用于确认消息,另一种用于小消息(代理可以认为丢失的消息)。每个回调都有2个参数:

  • 序列号:标识已确认或未确认消息的数字。我们很快将看到如何将其与已发布的消息相关联。
  • 整数:这是一个布尔值。如果为false,则仅确认/取消一条消息;如果为true,则将确认/不添加序列号较低或相等的所有消息。

可以 在发布之前使用Channel#getNextPublishSeqNo()获得序列号:

int sequenceNumber = channel.getNextPublishSeqNo());
ch.basicPublish(exchange, queue, properties, body);

将消息与序列号关联的一种简单方法是使用映射。假设我们要发布字符串,因为它们很容易变成要发布的字节数组。这是一个使用映射将发布序列号与消息的字符串主体相关联的代码示例:

ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
// ... code for confirm callbacks will come later
String body = "...";
outstandingConfirms.put(channel.getNextPublishSeqNo(), body);
channel.basicPublish(exchange, queue, properties, body.getBytes());

现在,发布代码使用地图跟踪出站邮件。我们需要在确认到达时清理此地图,并做一些类似在消息不足时记录警告的操作:

ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
    if (multiple) {
        ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
          sequenceNumber, true
        );
        confirmed.clear();
    } else {
        outstandingConfirms.remove(sequenceNumber);
    }
};

channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
    String body = outstandingConfirms.get(sequenceNumber);
    System.err.format(
      "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
      body, sequenceNumber, multiple
    );
    cleanOutstandingConfirms.handle(sequenceNumber, multiple);
});
// ... publishing code

前面的示例包含一个回调,当确认到达时,该回调将清除地图。请注意,此回调处理单个确认和多个确认。确认到达时使用此回调(作为Channel#addConfirmListener的第一个参数 )。缺少邮件的回调将检索邮件正文并发出警告。然后,它重新使用先前的回调来清理未完成确认的映射(无论消息是已确认还是未确认,都必须删除映射中的相应条目)。

如何跟踪未完成的确认?

我们的示例使用ConcurrentNavigableMap来跟踪未完成的确认。由于以下几个原因,此数据结构很方便。它允许轻松地将序列号与消息相关联(无论消息数据是什么),还可以轻松清除条目直到给定序列ID(以处理多个确认/提示)。最后,它支持并发访问,因为在客户端库拥有的线程中调用了确认回调,该线程应与发布线程保持不同。

除了使用复杂的映射实现之外,还有其他跟踪未完成确认的方法,例如使用简单的并发哈希映射和变量来跟踪发布序列的下限,但是它们通常涉及更多且不属于教程。

综上所述,处理发布者异步确认通常需要执行以下步骤:

  • 提供一种将发布序列号与消息相关联的方法。
  • 在通道上注册一个确认侦听器,以便在发布者确认/通知到达后执行相应操作(例如记录或重新发布未确认的消息)时收到通知。序列号与消息的关联机制在此步骤中可能还需要进行一些清洗。
  • 在发布消息之前跟踪发布序列号。

重新发布nack-ed消息?

从相应的回调中重新发布一个nack-ed消息可能很诱人,但是应该避免这种情况,因为确认回调是在不应执行通道的I / O线程中分派的。更好的解决方案是将消息放入由发布线程轮询的内存队列中。诸如ConcurrentLinkedQueue之类的类 将是在确认回调和发布线程之间传输消息的理想选择。

摘要

在某些应用程序中,确保将发布的消息发送到代理非常重要。发布者确认是RabbitMQ功能,可以帮助满足此要求。发布者确认本质上是异步的,但也可以同步处理它们。没有确定的方法可以实现发布者确认,这通常归结为应用程序和整个系统中的约束。典型的技术有:

  • 分别发布消息,同步等待确认:简单,但吞吐量非常有限。
  • 批量发布消息,同步等待批量确认:简单,合理的吞吐量,但是很难推断出什么时候出了问题。
  • 异步处理:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但可以正确实施。

全部放在一起

PublisherConfirms.java 类包含了我们所覆盖的技术代码。我们可以对其进行编译,按原样执行并查看它们各自的性能:

javac -cp $CP PublisherConfirms.java
java -cp $CP PublisherConfirms

输出将如下所示:

Published 50,000 messages individually in 5,549 ms
Published 50,000 messages in batch in 2,331 ms
Published 50,000 messages and handled confirms asynchronously in 4,054 ms

如果客户端和服务器位于同一台计算机上,则计算机上的输出应看起来相似。单独发布消息的效果不理想,但是与批处理发布相比,异步处理的结果有些令人失望。

Publisher确认非常依赖于网络,因此我们最好尝试使用远程节点,因为客户端和服务器通常不在生产中的同一台计算机上,所以这是更现实的选择。 PublisherConfirms.java可以轻松更改为使用非本地节点:

static Connection createConnection() throws Exception {
    ConnectionFactory cf = new ConnectionFactory();
    cf.setHost("remote-host");
    cf.setUsername("remote-user");
    cf.setPassword("remote-password");
    return cf.newConnection();
}

重新编译该类,再次执行,然后等待结果:

Published 50,000 messages individually in 231,541 ms
Published 50,000 messages in batch in 7,232 ms
Published 50,000 messages and handled confirms asynchronously in 6,332 ms

我们现在看到单独发布的效果非常好。但是,通过客户端和服务器之间的网络,批处理发布和异步处理现在的执行方式类似,对于发布者确认的异步处理来说,这是一个很小的优势。

请记住,批量发布很容易实现,但是在发布者否定确认的情况下,不容易知道哪些消息无法发送给代理。处理发布者确认异步涉及更多的实现,但是提供更好的粒度和更好地控制在处理发布的消息时执行的操作。

原文地址:https://www.cnblogs.com/zx-admin/p/12015007.html

时间: 2024-11-05 22:48:47

RabbitMQ中文文档PHP版本(七)--发布者确认的相关文章

RabbitMQ中文文档PHP版本(二)--发布/订阅

2019年12月10日10:01:00 原文:https://www.rabbitmq.com/tutorials/tutorial-three-php.html 工作队列 (使用php-amqplib) 先决条件 本教程假定RabbitMQ 已在标准端口(5672)的本地主机上安装并运行.如果您使用其他主机,端口或凭据,则连接设置需要进行调整. 在哪里获得帮助 如果您在阅读本教程时遇到困难,可以 通过邮件列表与我们联系. 在第一个教程中,我们编写了程序来发送和接收来自命名队列的消息.在这一部分

RabbitMQ中文文档PHP版本(一)--打印Hello World

2019年12月10日09:54:28 原文:https://www.rabbitmq.com/tutorials/tutorial-one-php.html 介绍 先决条件 本教程假定RabbitMQ 已在标准端口(5672)的本地主机上安装并运行.如果您使用其他主机,端口或凭据,则连接设置需要进行调整. 在哪里获得帮助 如果您在阅读本教程时遇到困难,可以 通过邮件列表与我们联系. RabbitMQ是消息代理:它接受并转发消息.您可以将其视为邮局:将要发布的邮件放在邮箱中时,可以确保Mailp

RabbitMQ中文文档PHP版本(六)--远程过程调用(RPC)

2019年12月10日10:05:54 原文:https://www.rabbitmq.com/tutorials/tutorial-six-php.html 远程过程调用(RPC) (使用php-amqplib) 先决条件 本教程假定RabbitMQ 已在标准端口(5672)的本地主机上安装并运行.如果您使用其他主机,端口或凭据,则连接设置需要进行调整. 在哪里获得帮助 如果您在阅读本教程时遇到困难,可以 通过邮件列表与我们联系. 在第二篇教程中,我们学习了如何使用工作队列在多个工作人员之间分

RabbitMQ中文文档PHP版本(五)--主题

2019年12月10日10:05:11 原文:https://www.rabbitmq.com/tutorials/tutorial-five-php.html 话题 (使用php-amqplib) 先决条件 本教程假定RabbitMQ 已在标准端口(5672)的本地主机上安装并运行.如果您使用其他主机,端口或凭据,则连接设置需要进行调整. 在哪里获得帮助 如果您在阅读本教程时遇到困难,可以 通过邮件列表与我们联系. 在上一教程中,我们改进了日志记录系统.我们没有使用只能进行虚拟广播的扇出交换机

Apache Storm 1.1.0 中文文档 | ApacheCN

前言  Apache Storm 是一个免费的,开源的,分布式的实时计算系统. 官方文档: http://storm.apache.org 中文文档: http://storm.apachecn.org ApacheCN 最近组织了翻译 Storm 1.1.0 中文文档 的活动,整体 翻译进度 为 96%. 感谢大家参与到该活动中来 感谢无私奉献的 贡献者,才有了这份 Storm 1.1.0 中文文档 感谢一路有你的陪伴,我们才可以做的更好,走的更快,走的更远,我们一直在努力 ... 网页地址:

Bottle 中文文档

译者: smallfish ([email protected]) 更新日期: 2009-09-25 原文地址: http://bottle.paws.de/page/docs (已失效) 译文地址: http://pynotes.appspot.com/static/bottle/docs.htm (需翻墙) 这份文档会不断更新. 如果在文档里没有找到答案,请在版本跟踪中提出 issue. 基本映射 映射使用在根据不同 URLs 请求来产生相对应的返回内容. Bottle 使用 route()

Core 中文文档

ASP.NET Core 中文文档 第二章 指南(1)用 Visual Studio Code 在 macOS 上创建首个 ASP.NET Core 应用程序 原文:Your First ASP.NET Core Application on a Mac Using Visual Studio Code作者:Daniel Roth.Steve Smith 以及 Rick Anderson翻译:赵志刚校对:何镇汐.刘怡(AlexLEWIS) 本节将展示如何在 macOS 平台上创建首个 ASP.N

Visual Studio Code中文文档

Visual Studio Code中文文档 Visual Studio Code是一个轻量级但是十分强大的源代码编辑器,重要的是它在Windows, OS X 和Linux操作系统的桌面上均可运行.Visual Studio Code内置了对JavaScript, TypeScript和Node.js语言的支持,并且为其他语言如C++, C#, Python, PHP等提供了丰富的扩展库和运行时. 一.Visual Studio Code实际应用(一)快速强大的编码功能:    能够快速捕捉程

MyBatis Generator中文文档

MyBatis Generator中文文档 MyBatis Generator中文文档地址: http://mbg.cndocs.tk/ 该中文文档由于尽可能和原文内容一致,所以有些地方如果不熟悉,看中文版的文档的也会有一定的障碍,所以本章根据该中文文档以及实际应用,使用通俗的语言来讲解详细的配置. 本文中所有节点的链接都是对应的中文文档地址,可以点击查看详细信息. 下载本文档的PDF版本 注:本文后面提到的MBG全部指代MyBatis Generator. 运行MyBatis Generato