rabbitmq的发布确认和事务

摘要: 介绍confirm的工作机制。使用spring-amqp介绍事务以及发布确认的使用方式。因为事务以及发布确认是针对channel来讲,所以在一个连接中两个channel,一个channel可以使用事务,另一个channel可以使用发布确认,并介绍了什么时候该使用事务,什么时候该使用发布确认

confirm的工作机制

? Confirms是增加的一个确认机制的类,继承自标准的AMQP。这个类只包含了两个方法:confirm.select和confirm.select-ok。另外,basic.ack方法被发送到客户端。

? confirm.select是在一个channel中启动发布确认。注意:一个具有事务的channel不能放入到确认模式,同样确认模式下的channel不能用事务。

当confirm.select被发送/接收。发布者/broker开始计数(首先是发布然后confirm.select被记为1)。一旦channel为确认模式,发布者应该期望接收到basic.ack方法,delivery-tag属性显示确认消息的数量。

当broker确认了一个消息,会通知发布者消息被成功处理;?

?

? basic的规则是这样的:?

一个未被路由的具有manadatory或者immediate的消息被正确确认后触发basic.return;

另外,一个瞬时态的消息被确认目前已经入队;

持久化的消息在持久化到磁盘或者每个队列的消息被消费之后被确认。

关于confirm会有一些问题:

首先,broker不能保证消息会被confirm,只知道将会进行confirm。

第二,当未被确认的消息堆积时消息处理缓慢,对于确认模式下的发布,broker会做几个操作,日志记录未被确认的消息

第三,如果发布者与broker之间的连接删除了未能得到确认,它不一定知道消息丢失,所以可能会发布重复的消息。

最后,如果在broker中发生坏事会导致消息丢失,将会basic.nack那些消息

总之,Confirms给客户端一种轻量级的方式,能够跟踪哪些消息被broker处理,哪些可能因为broker宕掉或者网络失败的情况而重新发布。

确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务。

事务

Spring AMQP做的不仅仅是回滚事务,而且可以手动拒绝消息,如当监听容器发生异常时是否重新入队。

持久化的消息是应该在broker重启前都有效。如果在消息有机会写入到磁盘之前broker宕掉,消息仍然会丢失。在某些情况下,这是不够的,发布者需要知道消息是否处理正确。简单的解决方案是使用事务,即提交每条消息。

案例:

RabbitTemplate的使用案例(同步),由调用者提供外部事务,在模板中配置了channe-transacted=true。通常是首选,因为它是非侵入性的(低耦合)

<rabbit:template id="rabbitTemplate"  connection-factory="cachingConnectionFactory"

exchange="sslexchange" channel-transacted="true"/>

@Transactional

public void doSomething() {

ApplicationContext context =

new GenericXmlApplicationContext("spring-amqp-test.xml");

RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

String incoming = (String) rabbitTemplate.receiveAndConvert();

// do some more database processing...

String outgoing = processInDatabaseAndExtractReply(incoming);

//数据库操作中如果失败了,outgoing这条消息不会被发送,incoming消息也会返回到broker服务器中,因为这是一条事务链。

//可做XA事务,在消息传送与数据库访问中共享事务。

rabbitTemplate.convertAndSend(outgoing);

}

private String processInDatabaseAndExtractReply(String incoming){

return incoming;

}

异步使用案例(外部事务)

<bean id="rabbitTxManage" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">

<property name="connectionFactory" ref="cachingConnectionFactory"></property>

</bean>

<rabbit:listener-container  connection-factory="cachingConnectionFactory" transaction-manager="rabbitTxManage" channel-transacted="true">

<rabbit:listener ref="foo" method="onMessage" queue-names="rabbit-ssl-test"/>

</rabbit:listener-container>

在容器中配置事务时,如果提供了transactionManager,channelTransaction必须为true;如果为false,外部的事务仍然可以提供给监听容器,造成的影响是在回滚的业务操作中也会提交消息传输的操作。

使用事务有两个问题:

?  一是会阻塞,发布者必须等待broker处理每个消息。如果发布者知道在broker死掉之前哪些消息没有被处理就足够了。

?  第二个问题是事务是重量级的,每次提交都需要fsync(),需要耗费大量的时间。

confirm模式下,broker将会确认消息并处理。这种模式下是异步的,生产者可以流水式的发布而不用等待broker,broker可以批量的往磁盘写入。

发布确认

发布确认必须配置在CachingConnectionFactory上

<bean id="cachingConnectionFactory"

class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">

<property name="host" value="192.168.111.128"></property>

<property name="port" value="5672"></property>

<property name="username" value="admin"/>

<property name="password" value="admin"/>

<property name="publisherConfirms" value="true"/>

<property name="publisherReturns" value="true"/>

</bean>

若使用confirm-callback或return-callback,必须要配置publisherConfirms或publisherReturns为true

每个rabbitTemplate只能有一个confirm-callback和return-callback

//确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中即可,只要正确的到达exchange中,broker即可确认该消息返回给客户端ack。

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

if (ack) {

System.out.println("消息确认成功");

} else {

//处理丢失的消息(nack)

System.out.println("消息确认失败");

}

}

});

使用return-callback时必须设置mandatory为true,或者在配置中设置mandatory-expression的值为true,可针对每次请求的消息去确定’mandatory’的boolean值,只能在提供’return -callback’时使用,与mandatory互斥。

rabbitTemplate.setMandatory(true);

//确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中即可,只要正确的到达exchange中,broker即可确认该消息返回给客户端ack。

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

@Override

public void returnedMessage(Message message, int replyCode, String replyText,

String exchange, String routingKey) {

//重新发布

RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(errorTemplate,"errorExchange", "errorRoutingKey");

Throwable cause = new Exception(new Exception("route_fail_and_republish"));

recoverer.recover(message,cause);

System.out.println("Returned Message:"+replyText);

}

});

errorTemplate配置:

<rabbit:queue id="errorQueue" name="errorQueue" auto-delete="false" durable="true">

<rabbit:queue-arguments>

<entry key="x-ha-policy" value="all"/>

<entry key="ha-params" value="1"/>

<entry key="ha-sync-mode" value="automatic"/>

</rabbit:queue-arguments>

</rabbit:queue>

<rabbit:direct-exchange id="errorExchange" name="errorExchange" auto-delete="false" durable="true">

<rabbit:bindings>

<rabbit:binding queue="errorQueue" key="errorRoutingKey"></rabbit:binding>

</rabbit:bindings>

</rabbit:direct-exchange>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">

<property name="backOffPolicy">

<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">

<property name="initialInterval" value="200" />

<property name="maxInterval" value="30000" />

</bean>

</property>

<property name="retryPolicy">

<bean class="org.springframework.retry.policy.SimpleRetryPolicy">

<property name="maxAttempts" value="5"/>

</bean>

</property>

</bean>

<rabbit:template  id="errorTemplate" connection-factory="cachingConnectionFactory" exchange="errorExchange" queue="errorQueue" routing-key="errorRoutingKey" retry-template="retryTemplate" />

同一个连接不同channel使用事务和发布确认

private RabbitTemplate rabbitTemplate;

private TransactionTemplate transactionTemplate;

@Before

public void init() {

CachingConnectionFactory connectionFactory = new CachingConnectionFactory();

connectionFactory.setHost("192.168.111.128");

connectionFactory.setPort(5672);

connectionFactory.setUsername("admin");

connectionFactory.setPassword("admin");

template = new RabbitTemplate(connectionFactory);

template.setChannelTransacted(true);

RabbitTransactionManager transactionManager = new RabbitTransactionManager(connectionFactory);

transactionTemplate = new TransactionTemplate(transactionManager);

connectionFactory.setPublisherConfirms(true);

rabbitTemplate = new RabbitTemplate(connectionFactory);

}

//发布确认测试

@Test

public void testPublishConfirm(){

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

if(ack){

System.out.println("消息确认成功");

}else{

System.out.println("消息确认失败");

}

}

});

//发送到一个不存在的exchange,则会触发发布确认

rabbitTemplate.convertAndSend("asd","aaa","message");

String message = (String) rabbitTemplate.receiveAndConvert(ROUTE);

assertEquals("message",message);

}

//事务测试

@Test

public void testSendAndReceiveInTransaction() throws Exception {

//由于有spring的事务参与,而发送操作在提交事务时,是不允许除template的事务有其他事务的参与,所以这里不会提交

//队列中就没有消息,所以在channel.basicGet时命令返回的是basic.get-empty(队列中没有消息时),而有消息时,返回basic.get-ok

String result = transactionTemplate.execute(new TransactionCallback<String>() {

@Override

public String doInTransaction(TransactionStatus status) {

template.convertAndSend(ROUTE, "message");

return (String) template.receiveAndConvert(ROUTE);

}

});

//spring事务完成,对其中的操作需要提交,发送与接收操作被认为是一个事务链而提交

assertEquals(null, result);

//这里的执行不受spring事务的影响

result = (String) template.receiveAndConvert(ROUTE);

assertEquals("message", result);

}

时间: 2024-08-06 07:58:13

rabbitmq的发布确认和事务的相关文章

【转载】rabbitmq的发布确认和事务

地址:https://my.oschina.net/lzhaoqiang/blog/670749 摘要: 介绍confirm的工作机制.使用spring-amqp介绍事务以及发布确认的使用方式.因为事务以及发布确认是针对channel来讲,所以在一个连接中两个channel,一个channel可以使用事务,另一个channel可以使用发布确认,并介绍了什么时候该使用事务,什么时候该使用发布确认 confirm的工作机制 ? Confirms是增加的一个确认机制的类,继承自标准的AMQP.这个类只

RabbitMQ消息发布和消费的确认机制

前言 新公司项目使用的消息队列是RabbitMQ,之前其实没有在实际项目上用过RabbitMQ,所以对它的了解都谈不上入门.趁着周末休息的时间也猛补习了一波,写了两个窗体应用,一个消息发布端和消息消费端.园子里解释RabbitMQ基础的很多了,这里就不对RabbitMQ的基础再做叙述了,来点实际工作中一定会碰到的问题和解决的方案. RabbitMQ 消息发布确认机制 默认情况下消息发布端执行BasicPublish方法后,消息是否到达指定的队列的结果发布端是未知的.BasicPublish方法的

RabbitMQ之消息确认机制(事务+Confirm)

概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎

Redis 发布订阅、事务、脚本、连接、HyperLogLog

欢迎大家加入 459479177QQ群进行交流 本次主要介绍Redis的发布订阅.事务.脚本.连接.HyperLogLog 一.发布订阅 1>psubscribe,订阅一个或多个指定的频道 Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "tv1" 3) (integer) 1 127.0.0.1:6379> psubscribe tv2 tv3                \

RabbitMQ的消息确认机制

一:确认种类 RabbitMQ的消息确认有两种. 一种是消息发送确认.这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递.发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列. 第二种是消费接收确认.这种是确认消费者是否成功消费了队列中的消息. 二:消息发送确认 (1)ConfirmCallback 通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调. 使用该功能需要开启确认,spring-boot中配置如下: spr

一个基于RabbitMQ的可复用的事务消息方案

原文:一个基于RabbitMQ的可复用的事务消息方案 前提# 分布式事务是微服务实践中一个比较棘手的问题,在笔者所实施的微服务实践方案中,都采用了折中或者规避强一致性的方案.参考Ebay多年前提出的本地消息表方案,基于RabbitMQ和MySQL(JDBC)做了轻量级的封装,实现了低入侵性的事务消息模块.本文的内容就是详细分析整个方案的设计思路和实施.环境依赖如下: JDK1.8+ spring-boot-start-web:2.x.x.spring-boot-start-jdbc:2.x.x.

RabbitMQ (八) : 消息确认机制之事务机制

实在没啥好说的. 生产者 public class Producer { private const string QueueName = "test_work_queue"; public static void Send() { //获取一个连接 IConnection connection = ConnectionHelper.GetConnection(); //从连接中获取一个通道 IModel channel = connection.CreateModel(); //声明

RabbitMQ confirm的确认监听模式

添加确认监听需要开启确认监听模式 实现 addConfirmListener方法confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息: 消费者: package com.flying.rabbitmq.api.confirm; i

SpringBoot集成RabbitMQ(注解+手动确认)

1.pom文件 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 2.yml配置文件 spring: #MQ配置 rabbitmq: addresses: 127.0.0.1 port: 5672 username: adminmq passwo