【RabbitMQ】6、rabbitmq生产者的消息确认

通过Publisher Confirms and Returns机制,生产者可以判断消息是否发送到了exchange及queue,而通过消费者确认机制,Rabbitmq可以决定是否重发消息给消费者,以保证消息被处理。

1.什么是Publisher Confirms and Returns?

Delivery processing acknowledgements from consumers to RabbitMQ are known as acknowledgements in AMQP 0-9-1 parlance; broker acknowledgements to publishers are a protocol extension called publisher confirms. 
地址:http://www.rabbitmq.com/confirms.html

根据RabbitMq官网定义,rabbitmq代理(broker)对发布者(publishers)的确认被称作发布者确认(publisher confirms),这种机制是Rabbitmq对标准Amqp协议的扩展。因此通过这种机制可以确认消息是否发送给了目标。

2.如何通过Spring amqp来使用Publisher Confirms and Returns机制?

Confirmed and returned messages are supported by setting the CachingConnectionFactory’s publisherConfirms and publisherReturns properties to ‘true’ respectively.When these options are set, Channel s created by the factory are wrapped in an PublisherCallbackChannel, which is used to facilitate the callbacks. When such a channel is obtained, the client can register a PublisherCallbackChannel.Listener with the Channel. The PublisherCallbackChannel implementation contains logic to route a confirm/return to the appropriate listener. These features are explained further in the following sections. 
http://docs.spring.io/spring-amqp/docs/1.6.3.RELEASE/reference/html/_reference.html#cf-pub-conf-ret

通过Spring amqp文档可以看到,要使用这种机制需要将Template模版的设publisherConfirms 或publisherReturns 属性设置为true,此外ConnectionFactory要配置为CachingConnectionFactory。

    <bean id="connectionFactory"
        class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <property name="host" value="192.168.2.133" />
        <property name="port" value="5672" />
        <property name="username" value="sun" />
        <property name="password" value="123456" />
        <property name="publisherConfirms" value="true" />
        <property name="publisherReturns" value="true" />
    </bean>

2.1 ConfirmCallback的使用及触发的一种场景

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Service;

/**
 * @author wangzhongqiu
 *         Created on 2017/10/31.
 * @description:继承RabbitTemplate.ConfirmCallback,消息确认监听器
 */
@Service
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {
    private Logger log = LoggerFactory.getLogger(CommonProducer.class);

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("收到回调,成功发送到broker");
    }
}

使用场景:

2.2 ReturnCallback的使用及触发的一种场景

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

/**
 * @author wangzhongqiu
 *         Created on 2017/10/31.
 * @description:继承RabbitTemplate.ReturnCallback,消息发送失败返回监听器
 */
@Service
public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {
    private Logger log = LoggerFactory.getLogger(CommonProducer.class);

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("收到回调");
        log.info("return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
    }
}

使用场景:

时间: 2024-08-23 13:00:50

【RabbitMQ】6、rabbitmq生产者的消息确认的相关文章

rabbitMQ基础知识--消息确认机制之生产者端的确认机制

一:消息确认种类 RabbitMQ的消息确认有两种. 一种是消息发送确认.这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递.发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列. 第二种是消费接收确认.这种是确认消费者是否成功消费了队列中的消息. 具体建议参考:https://www.cnblogs.com/nizuimeiabc1/p/9397326.html 这里我们重点研究下生产者确认的情况. 生产者确认模式实现原理: 生产者将信道设置成conf

(转)RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

RabbitMQ之消息确认机制(事务+Confirm)

概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎

RabbitMQ 消息确认与公平调度消费者

一.消息确认 为了确保消息一定被消费者处理,rabbitMQ提供了消息确认功能,就是在消费者处理完任务之后,就给服务器一个回馈,服务器就会将该消息删除,如果消费者超时不回馈,那么服务器将就将该消息重新发送给其他消费者 默认是开启的,在消费者端通过下面的方式开启消息确认,  首先将autoAck自动确认关闭,等我们的任务执行完成之后,手动的去确认,类似JDBC的autocommit一样 QueueingConsumer consumer = new QueueingConsumer(channel

rabbitMQ学习笔记(三) 消息确认与公平调度消费者

从本节开始称Sender为生产者 , Recv为消费者   一.消息确认 为了确保消息一定被消费者处理,rabbitMQ提供了消息确认功能,就是在消费者处理完任务之后,就给服务器一个回馈,服务器就会将该消息删除,如果消费者超时不回馈,那么服务器将就将该消息重新发送给其他消费者 默认是开启的,在消费者端通过下面的方式开启消息确认,  首先将autoAck自动确认关闭,等我们的任务执行完成之后,手动的去确认,类似JDBC的autocommit一样 QueueingConsumer consumer

RabbitMQ的消息确认机制

一:确认种类 RabbitMQ的消息确认有两种. 一种是消息发送确认.这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递.发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列. 第二种是消费接收确认.这种是确认消费者是否成功消费了队列中的消息. 二:消息发送确认 (1)ConfirmCallback 通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调. 使用该功能需要开启确认,spring-boot中配置如下: spr

RabbitMQ (十二) 消息确认机制 - 发布者确认

消费者确认解决的问题是确认消息是否被消费者"成功消费". 它有个前提条件,那就是生产者发布的消息已经"成功"发送出去了. 因此还需要一个机制来告诉生产者,你发送的消息真的"成功"发送了. 在标准的AMQP 0-9-1,保证消息不会丢失的唯一方法是使用事务:在通道上开启事务,发布消息,提交事务.但是事务是非常重量级的,它使得RabbitMQ的吞吐量降低250倍.为了解决这个问题,RabbitMQ 引入了 发布者确认(Publisher Confir

rabbitMQ基础知识--消息确认机制

一:确认种类 RabbitMQ的消息确认有两种. 一种是消息发送确认.这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递.发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列. 第二种是消费接收确认.这种是确认消费者是否成功消费了队列中的消息. 具体建议参考:https://www.cnblogs.com/nizuimeiabc1/p/9397326.html 这里我们重点研究下接收确认的情况. 为了保证消息从队列可靠的到达消费者,RabbitMQ提供了