RabbitMQ 可靠投递

RabbitMQ 可靠投递


  • 背景
  • confirmCallback 确认模式
  • returnCallback 未投递到 queue 退回模式
  • shovel-plugin 跨机房可靠投递

背景

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两个选项用来控制消息的投递可靠性模式。

rabbitmq 整个消息投递的路径为:
producer->rabbitmq broker cluster->exchange->queue->consumer

messageproducerrabbitmq broker cluster 则会返回一个 confirmCallback
messageexchange->queue 投递失败则会返回一个 returnCallback 。我们将利用这两个 callback 控制消息的最终一致性和部分纠错能力。

confirmCallback 确认模式

在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启 confirmcallback

CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherConfirms(true);//开启confirm模式
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
        if (!ack) {
               log.error("消息发送失败!" + cause + data.toString());
        } else {
            log.info("消息发送成功,消息ID:" + (data != null ? data.getId() : null));
        }
    });

我们来看下 ConfirmCallback 接口。

public interface ConfirmCallback {

        /**
         * Confirmation callback.
         * @param correlationData correlation data for the callback.
         * @param ack true for ack, false for nack
         * @param cause An optional cause, for nack, when available, otherwise null.
         */
        void confirm(CorrelationData correlationData, boolean ack, String cause);

    }

重点是 CorrelationData 对象,每个发送的消息都需要配备一个 CorrelationData 相关数据对象,CorrelationData 对象内部只有一个 id 属性,用来表示当前消息唯一性。

发送的时候创建一个 CorrelationData 对象。

User user = new User();
user.setID(1010101L);
user.setUserName("plen");

rabbitTemplate.convertAndSend(exchange, routing, user,
        message -> {
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
            return message;
        },
new CorrelationData(user.getID().toString()));

这里将 user ID 设置为当前消息 CorrelationData id 。当然这里是纯粹 demo,真实场景是需要做业务无关消息 ID 生成,同时要记录下这个 id 用来纠错和对账。

消息只要被 rabbitmq broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback

broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback

returnCallback 未投递到queue退回模式

confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到 return 退回模式。

同样创建 ConnectionFactory 到时候需要设置 PublisherReturns(true) 选项。

CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherReturns(true);//开启return模式
rabbitTemplate.setMandatory(true);//开启强制委托模式

rabbitTemplate.setReturnCallback((message, replyCode, replyText,
                    exchange, routingKey) ->
    log.info(MessageFormat.format("消息发送ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode, replyText, exchange, routingKey)));

这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。

shovel-plugin 跨机房可靠投递

RabbitMQ 在跨机房集成提供了一个不错的插件 shovel 。使用 shovel-plugin 插件非常方便,shovel 可以接受机房之间的网络断开、机器下线等不稳定因素。

这里有两个 broker

10.211.55.3 rabbit_node1
10.211.55.4 rabbit_node2

我们希望将发送给 rabbit_node1 plen.queue 的消息传输到 rabbit_node2 plen.queue 中。我们先开启 rabbit_node1shovel-plugin

先看下当前 RabbitMQ 版本是否安装了 shovel-plugin,如果有的话直接开启。

rabbitmq-plugins  list
rabbitmq-plugins  enable rabbitmq_shovel
rabbitmq-plugins  enable rabbitmq_shovel_management

然后就可以在 Admin 面板里看到这个设置选项,怎么设置这里就不介绍了。主要就是配置下 amqp 协议地址,amqp://user:[email protected]/my-vhost

如果配置没有问题的话,应该是这样的一个状态,说明已经顺利连接到 rabbit_node2 broker


我们来看下 rabbit_node1rabbit_node2Connections 面板。
rabbit_node1(10.211.55.3):

rabbit_node2(10.211.55.4):

RabbitMQ shovel-plugin 插件在 rabbit_node1 broker 创建了两个 tcp 连接,端口 39544 连接是用来消费 plen.queue 里的消息,端口 55706 连接是用来推送消息给 rabbit_node2

我们来看下 rabbit_node1 tcp 连接状态:

tcp6       0      0 10.211.55.3:5672        10.211.55.3:39544       ESTABLISHED
tcp        0      0 10.211.55.3:55706       10.211.55.4:5672        ESTABLISHED

rabbit_node2 tcp 连接状态:

tcp6       0      0 10.211.55.4:5672        10.211.55.3:55706       ESTABLISHED

为了验证 shovel-plugin 稳定性,我们将 rabbit_node2 下线。

然后再发送消息,发现消息会现在 rabbit_node1 plen.queue 里待着,一旦 shovel-plugin 连接恢复将消费 rabbit_node1 plen.queue 消息,然后投递给 rabbit_node2 plen.queue

作者:王清培 (沪江集团资深JAVA架构师)

原文地址:https://www.cnblogs.com/wangiqngpei557/p/9381478.html

时间: 2024-11-04 12:04:43

RabbitMQ 可靠投递的相关文章

【RabbitMQ】如何进行消息可靠投递【上篇】

说明 前几天,突然发生线上报警,钉钉连发了好几条消息,一看是RabbitMQ相关的消息,心头一紧,难道翻车了? [橙色报警]?应用[xxx]在[08-15?16:36:04]发生[错误日志异常],alertId=[xxx].由[org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620]触发. 应用xxx?可能原因如下 服务名为: ?异常为:org.springframework.amqp.rabbit.lis

杂项之rabbitmq

杂项之rabbitmq 本节内容 rabbitmq简介 AMQP协议 rabbitmq使用 应用举例 rabbitmq简介 介绍rabbitmq之前,先介绍一下AMQP协议,因为rabbitmq就是基于AMQP协议实现的一个服务程序.(目前为止应该也是唯一实现了AMQP协议的服务) AMQP(高级消息队列协议)是一个网络协议.它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信. arbbitmq使用erlan

RabbitMQ的应用场景以及基本原理介绍

1.背景 RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现. 2.应用场景 2.1异步处理 场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种1.串行的方式;2.并行的方式 (1)串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端. 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西. (2)并行方式:将注册信息写入数据库后,发

RabbitMQ应用场景及原理

转载自:http://blog.csdn.net/whoamiyang/article/details/54954780 1.背景 RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现. 2.应用场景 2.1异步处理 场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种1.串行的方式;2.并行的方式 (1)串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端. 这有一个问题是,邮件

RabbitMQ下的生产消费者模式与订阅发布模式

??所谓模式,就是在某种场景下,一类问题及其解决方案的总结归纳.生产消费者模式与订阅发布模式是使用消息中间件时常用的两种模式,用于功能解耦和分布式系统间的消息通信,以下面两种场景为例: 数据接入 ??假设有一个用户行为采集系统,负责从App端采集用户点击行为数据.通常会将数据上报和数据处理分离开,即App端通过REST API上报数据,后端拿到数据后放入队列中就立刻返回,而数据处理则另外使用Worker从队列中取出数据来做,如下图所示. ??这样做的好处有:第一,功能分离,上报的API接口不关心

RabbitMQ及Spring集成

部分转载自https://blog.csdn.net/whoamiyang/article/details/54954780 1.背景 RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现. 2.应用场景 2.1异步处理 场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种1.串行的方式;2.并行的方式 (1)串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端. 这有一个问题是,

RabbitMQ,Apache的ActiveMQ,阿里RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可实现消息队列,RabbitMQ的应用场景以及基本原理介绍,RabbitMQ基础知识详解,RabbitMQ布曙

消息队列及常见消息队列介绍 2017-10-10 09:35操作系统/客户端/人脸识别 一.消息队列(MQ)概述 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为: 当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候. 消息队列主要解决了应用耦合.异步处理.流量削锋等问题. 当前使用较多的消息队列有RabbitMQ.RocketMQ.ActiveMQ.Kafka.ZeroMQ.MetaMq等,而部分数据库如Re

消息队列RabbitMQ基础知识详解

一: 什么是MQ? MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序或者模块对模块的通信方法.MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息. 常见的消息队列有RabbitMQ和kafka.下面详细介绍一下RabbitMQ的适用场景和基本概念. 二: 适用场景 2.1 并发处理 (1)串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端. 这有一个问题是,邮件,短

RabbitMQ学习(二)_AMQP简介

AMQP(高级消息队列协议)是一个网络协议.它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信. 消息代理(message brokers)从发布者(publishers)亦称生产者(producers)那儿接收消息,并根据既定的路由规则把接收到的消息发送给处理消息的消费者(consumers). 由于AMQP是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以存在于不同的设备上. AMQP 0-9