RabbitMQ 之消息确认机制(事务+Confirm)
https://blog.csdn.net/u013256816/article/details/55515234
概述:
在 Rabbitmq 中我们可以通过持久化来解决因为服务器异常而导致丢失的问题
除此之外我们还会遇到一个问题:生产者将消息发送出去之后,消息到底有没有正
确到达 Rabbit 服务器呢?如果不错得数处理,我们是不知道的,(即 Rabbit 服务器
不会反馈任何消息给生产者),也就是默认的情况下是不知道消息有没有正确到达;
导致的问题:消息到达服务器之前丢失,那么持久化也不能解决此问题,因为消息根本就没有到达 Rabbit 服务器!
RabbitMQ 为我们提供了两种方式:
1. 通过 AMQP 事务机制实现,这也是 AMQP 协议层面提供的解决方案;
2. 通过将 channel 设置成 confirm 模式来实现
事务机制 RabbitMQ 中与事务机制有关的方法有三个:txSelect(), txCommit()以及 txRollback(),
txSelect 用于将当前 channel 设置成 transaction 模式,txCommit 用于提交事务,
txRollback 用于回滚事务,在通过 txSelect 开启事务之后,我们便可以发布消息
给 broker 代理服务器了,如果 txCommit 提交成功了,则消息一定到达了 broker 了,
如果在 txCommit执行之前 broker 异常崩溃或者由于其他原因抛出异常,这个时候
我们便可以捕获异常通过 txRollback 回滚事务了。
txSelect txCommit txRollback txSelect:用户将当前channel设置成transation模式 txCommit :用于提交事务
txRollback :用户回滚事务
生产者:
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.util.ConnectionUtils; public class TXsend { private static final String QUEUE_NAMW = "test_tx_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection conn = ConnectionUtils.getConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(QUEUE_NAMW, false, false, false, null); String msg = "tx"; try { //开启事务模式、 channel.txSelect(); channel.basicPublish("", QUEUE_NAMW, null, msg.getBytes()); //模拟事故 int i = 1/0; //提交 channel.txCommit(); } catch (Exception e) { //进行事务回滚 channel.txRollback(); System.out.println("TxRollback..."); } channel.close(); conn.close(); } }
消费者:
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.util.ConnectionUtils; public class TxReceive { private static final String QUEUE_NAMW = "test_tx_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection conn = ConnectionUtils.getConnection(); Channel channel = conn.createChannel(); //队列声明 channel.queueDeclare(QUEUE_NAMW, false, false, false, null); channel.basicQos(1); //绑定队列到交换机转发器 //channel.queueBind(QUEUE_NAMW, "", ""); //定义一个消费者 Consumer consumer = new DefaultConsumer(channel){ //收到消息就会触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("消费者1接收到的消息" + msg); try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); }finally{ System.out.println("消费者1处理完成!"); //手动回执 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //监听队列 //自动应答false boolean autoAck = false; channel.basicConsume(QUEUE_NAMW, autoAck, consumer); } }
此时消费者不会接收到消息
此种模式还是很耗时的,采用这种方式 降低了 Rabbitmq 的消息吞吐量
Confirm模式
概述
上面我们介绍了 RabbitMQ 可能会遇到的一个问题,即生成者不知道消息是否真正到达 broker,随
后通过 AMQP 协议层面为我们提供了事务机制解决了这个问题,但是采用事务机制实现会降低
RabbitMQ 的消息吞吐量,那么有没有更加高效的解决方式呢?答案是采用 Confirm 模式。
producer 端 confirm 模式的实现原理
该模式最大的好处就是异步的!!!
开启 confirm 模式的方法 已经在 transaction 事务模式的 channel 是不能再设置成 confirm 模式的,即这两种模式是不能共存的。 生产者通过调用 channel 的 confirmSelect 方法将 channel 设置为 confirm 模式 核心代码:
//生产者通过调用channel的confirmSelect方法将channel设置为confirm模式
channel.confirmSelect();
编程模式
1. 普通 confirm 模式:每发送一条消息后,调用 waitForConfirms()方法,等待服务器端
confirm。实际上是一种串行 confirm 了。
2. 批量 confirm 模式:每发送一批消息后,调用 waitForConfirms()方法,等待服务器端
confirm。
3. 异步 confirm 模式:提供一个回调方法,服务端 confirm 了一条或者多条消息后 Client 端会回
调这个方法。
普通模式:
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.util.ConnectionUtils; public class confirm{ private static final String QUEUE_NAMW = "test_tx_confirm1"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection conn = ConnectionUtils.getConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(QUEUE_NAMW, false, false, false, null); //生产者调用confirmSelect,将channel设置为confirm模式 channel.confirmSelect(); String msg = "confirm"; channel.basicPublish("", QUEUE_NAMW, null, msg.getBytes()); if(!channel.waitForConfirms()){ System.out.println("send failed"); }else{ System.out.println("send ok"); } channel.close(); conn.close(); } }
批量模式
批量 confirm 模式稍微复杂一点,客户端程序需要定期(每隔多少秒)
或者定量(达到多少条)或者两则结合起来publish 消息,然后等待
服务器端 confirm, 相比普通 confirm 模式,批量极大提升 confirm
效率,但是问题在于一旦出现 confirm 返回 false 或者超时的情
况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数
量,并且,当消息经常丢失时,批量 confirm 性能应该是不升反降的。
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.util.ConnectionUtils;
public class TXsend {
private static final String QUEUE_NAMW = "test_tx_confirm1";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection conn = ConnectionUtils.getConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAMW, false, false, false, null);
//1
//生产者调用confirmSelect,将channel设置为confirm模式
channel.confirmSelect();
//2
String msg = "confirm";
//批量发送
for(int i=1;i<=10;i++){
channel.basicPublish("", QUEUE_NAMW, null, msg.getBytes());
}
//3
//确认
if(!channel.waitForConfirms()){
System.out.println("send failed");
}else{
System.out.println("send ok");
}
channel.close();
conn.close();
}
}
异步模式
Channel 对象提供的 ConfirmListener()回调方法只包含 deliveryTag
(当前 Chanel 发出的消息序号),我们需要自己为每一个 Channel
维护一个 unconfirm 的消息序号集合,每 publish 一条数据,集合中
元素加 1,每回调一次 handleAck方法,unconfirm 集合删掉相应的
一条(multiple=false)或多条(multiple=true)记录。从程序运行
效率上看,这个unconfirm 集合最好采用有序集合 SortedSet 存储结构。
实际上,SDK 中的 waitForConfirms()方法也是通过 SortedSet维护消息序号的。
import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.util.ConnectionUtils; public class TXsend { private static final String QUEUE_NAMW = "test_tx_confirm3"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection conn = ConnectionUtils.getConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(QUEUE_NAMW, false, false, false, null); //生产者调用confirmSelect,将channel设置为confirm模式 channel.confirmSelect(); //未确认的消息标识 final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); //频道加一个监听 channel.addConfirmListener(new ConfirmListener() { //回调/重发重试 可以1s之后再发 10s之后再发 @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { if(multiple){ System.out.println("handleNack-----multiple =1"); confirmSet.headSet(deliveryTag+1).clear();; }else{ System.out.println("handleNack-----multiple =0"); confirmSet.remove(deliveryTag); } } //没问题的handleAck @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { if(multiple){ System.out.println("handleAck-----multiple =1"); confirmSet.headSet(deliveryTag+1).clear();; }else{ System.out.println("handleAck-----multiple =0"); confirmSet.remove(deliveryTag); } } }); String msg = "confirm"; //模拟插入数据 while(true){ long seqNo = channel.getNextPublishSeqNo(); channel.basicPublish("", QUEUE_NAMW, null, msg.getBytes()); confirmSet.add(seqNo); } } }
原文地址:https://www.cnblogs.com/Mrchengs/p/10531721.html