8、RabbitMQ-消息的确认机制(生产者)

RabbitMQ 之消息确认机制(事务+Confirm)

https://blog.csdn.net/u013256816/article/details/55515234

概述:

在 Rabbitmq 中我们可以通过持久化来解决因为服务器异常而导致丢失的问题

除此之外我们还会遇到一个问题:生产者将消息发送出去之后,消息到底有没有正

确到达 Rabbit 服务器呢?如果不错得数处理,我们是不知道的,(即 Rabbit 服务器

不会反馈任何消息给生产者),也就是默认的情况下是不知道消息有没有正确到达;

导致的问题:消息到达服务器之前丢失,那么持久化也不能解决此问题,因为消息根本就没有到达 Rabbit 服务器!

RabbitMQ 为我们提供了两种方式:

1. 通过 AMQP 事务机制实现,这也是 AMQP 协议层面提供的解决方案;

2. 通过将 channel 设置成 confirm 模式来实现

事务机制                                                                                                                                                                          RabbitMQ 中与事务机制有关的方法有三个:txSelect(), txCommit()以及 txRollback(),

txSelect 用于将当前 channel 设置成 transaction 模式,txCommit 用于提交事务,

txRollback 用于回滚事务,在通过 txSelect 开启事务之后,我们便可以发布消息

给 broker 代理服务器了,如果 txCommit 提交成功了,则消息一定到达了 broker 了,

如果在 txCommit执行之前 broker 异常崩溃或者由于其他原因抛出异常,这个时候

我们便可以捕获异常通过 txRollback 回滚事务了。

txSelect   txCommit  txRollback                                                                                                                                txSelect:用户将当前channel设置成transation模式                                                                                                          txCommit :用于提交事务

txRollback :用户回滚事务

生产者:

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.util.ConnectionUtils;
public class TXsend {
     private static final String QUEUE_NAMW = "test_tx_queue";

     public static void main(String[] args) throws IOException,  TimeoutException {
         Connection conn = ConnectionUtils.getConnection();
           Channel channel = conn.createChannel();

           channel.queueDeclare(QUEUE_NAMW, false, false, false,  null);

           String msg = "tx";

           try {
                //开启事务模式、
                channel.txSelect();
                channel.basicPublish("", QUEUE_NAMW, null,  msg.getBytes());
                //模拟事故
                int i = 1/0;
                //提交
                channel.txCommit();
           } catch (Exception e) {          //进行事务回滚
                channel.txRollback();
                System.out.println("TxRollback...");
           }
           channel.close();
           conn.close();
     }
}

消费者:

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.util.ConnectionUtils;
public class TxReceive {

     private static final String QUEUE_NAMW = "test_tx_queue";
public static void main(String[] args) throws IOException,  TimeoutException {

           Connection conn = ConnectionUtils.getConnection();

           Channel channel = conn.createChannel();

           //队列声明
           channel.queueDeclare(QUEUE_NAMW, false, false, false,  null);

           channel.basicQos(1);

           //绑定队列到交换机转发器

           //channel.queueBind(QUEUE_NAMW, "", "");

                     //定义一个消费者
                     Consumer consumer = new  DefaultConsumer(channel){
                           //收到消息就会触发这个方法
                           @Override
                           public void handleDelivery(String  consumerTag, Envelope envelope, BasicProperties properties,  byte[] body)
                                     throws IOException {
                                String msg = new  String(body,"utf-8");
                                System.out.println("消费者1接收到的消息" + msg);

                                try {
                                     Thread.sleep(1500);
                                } catch (InterruptedException e)  {
                                     e.printStackTrace();
                                }finally{
                                     System.out.println("消费者1处理完成!");
                                     //手动回执
                                     channel.basicAck(envelope.getDeliveryTag(), false);
                                }
                           }
                     };
                     //监听队列
                     //自动应答false
                     boolean autoAck = false;
                     channel.basicConsume(QUEUE_NAMW, autoAck,  consumer);
     }
}

此时消费者不会接收到消息

此种模式还是很耗时的,采用这种方式 降低了 Rabbitmq 的消息吞吐量

Confirm模式

概述

上面我们介绍了 RabbitMQ 可能会遇到的一个问题,即生成者不知道消息是否真正到达 broker,随

后通过 AMQP 协议层面为我们提供了事务机制解决了这个问题,但是采用事务机制实现会降低

RabbitMQ 的消息吞吐量,那么有没有更加高效的解决方式呢?答案是采用 Confirm 模式。

producer 端 confirm 模式的实现原理

   该模式最大的好处就是异步的!!!    

开启 confirm 模式的方法                                                                                                                     已经在 transaction 事务模式的 channel 是不能再设置成 confirm 模式的,即这两种模式是不能共存的。                生产者通过调用 channel 的 confirmSelect 方法将 channel 设置为 confirm 模式                                                            核心代码:

//生产者通过调用channel的confirmSelect方法将channel设置为confirm模式

channel.confirmSelect();     

编程模式

1. 普通 confirm 模式:每发送一条消息后,调用 waitForConfirms()方法,等待服务器端

confirm。实际上是一种串行 confirm 了。

2. 批量 confirm 模式:每发送一批消息后,调用 waitForConfirms()方法,等待服务器端

confirm。

3. 异步 confirm 模式:提供一个回调方法,服务端 confirm 了一条或者多条消息后 Client 端会回

调这个方法。

普通模式:

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.util.ConnectionUtils;
public class confirm{
     private static final String QUEUE_NAMW =  "test_tx_confirm1";

     public static void main(String[] args) throws IOException,  TimeoutException, InterruptedException {
         Connection conn = ConnectionUtils.getConnection();
           Channel channel = conn.createChannel();

           channel.queueDeclare(QUEUE_NAMW, false, false, false,  null);

           //生产者调用confirmSelect,将channel设置为confirm模式
           channel.confirmSelect();
           String msg = "confirm";
           channel.basicPublish("", QUEUE_NAMW, null,  msg.getBytes());
           if(!channel.waitForConfirms()){
                System.out.println("send failed");
           }else{
                System.out.println("send ok");
           }
           channel.close();
           conn.close();
     }
}

批量模式

批量 confirm 模式稍微复杂一点,客户端程序需要定期(每隔多少秒)

或者定量(达到多少条)或者两则结合起来publish 消息,然后等待

服务器端 confirm, 相比普通 confirm 模式,批量极大提升 confirm

效率,但是问题在于一旦出现 confirm 返回 false 或者超时的情

况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数

量,并且,当消息经常丢失时,批量 confirm 性能应该是不升反降的。

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.util.ConnectionUtils;
public class TXsend {
private static final String QUEUE_NAMW = "test_tx_confirm1";

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
      Connection conn = ConnectionUtils.getConnection();
      Channel channel = conn.createChannel();

      channel.queueDeclare(QUEUE_NAMW, false, false, false, null);

      //1
      //生产者调用confirmSelect,将channel设置为confirm模式
      channel.confirmSelect();

      //2
      String msg = "confirm";
      //批量发送
      for(int i=1;i<=10;i++){
        channel.basicPublish("", QUEUE_NAMW, null, msg.getBytes());
      }

      //3
      //确认
      if(!channel.waitForConfirms()){
        System.out.println("send failed");
      }else{
        System.out.println("send ok");
      }

      channel.close();
      conn.close();
}
}

异步模式

Channel 对象提供的 ConfirmListener()回调方法只包含 deliveryTag

(当前 Chanel 发出的消息序号),我们需要自己为每一个 Channel

维护一个 unconfirm 的消息序号集合,每 publish 一条数据,集合中

元素加 1,每回调一次 handleAck方法,unconfirm 集合删掉相应的

一条(multiple=false)或多条(multiple=true)记录。从程序运行

效率上看,这个unconfirm 集合最好采用有序集合 SortedSet 存储结构。

实际上,SDK 中的 waitForConfirms()方法也是通过 SortedSet维护消息序号的。

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.util.ConnectionUtils;
public class TXsend {
     private static final String QUEUE_NAMW =  "test_tx_confirm3";

     public static void main(String[] args) throws IOException,  TimeoutException, InterruptedException {
         Connection conn = ConnectionUtils.getConnection();
           Channel channel = conn.createChannel();

           channel.queueDeclare(QUEUE_NAMW, false, false, false,  null);

           //生产者调用confirmSelect,将channel设置为confirm模式
           channel.confirmSelect();

           //未确认的消息标识
           final SortedSet<Long> confirmSet =  Collections.synchronizedSortedSet(new TreeSet<Long>());

           //频道加一个监听
           channel.addConfirmListener(new ConfirmListener() {

                //回调/重发重试  可以1s之后再发 10s之后再发
                @Override
                public void handleNack(long deliveryTag, boolean  multiple) throws IOException {
                     if(multiple){
                           System.out.println("handleNack-----multiple =1");
                           confirmSet.headSet(deliveryTag+1).clear();;
                     }else{
                           System.out.println("handleNack-----multiple =0");
                           confirmSet.remove(deliveryTag);
                     }
                }

                //没问题的handleAck
                @Override
                public void handleAck(long deliveryTag, boolean  multiple) throws IOException {
                     if(multiple){
                           System.out.println("handleAck-----multiple =1");
                           confirmSet.headSet(deliveryTag+1).clear();;
                     }else{
                           System.out.println("handleAck-----multiple =0");
                           confirmSet.remove(deliveryTag);
                     }

                }
           });

           String msg = "confirm";
           //模拟插入数据
            while(true){
                long seqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("", QUEUE_NAMW, null,  msg.getBytes());
                confirmSet.add(seqNo);
           }
     }
}

原文地址:https://www.cnblogs.com/Mrchengs/p/10531721.html

时间: 2024-08-30 17:22:08

8、RabbitMQ-消息的确认机制(生产者)的相关文章

RabbitMQ 消息可靠性的机制

RabbitMQ 消息可靠性 一.发布确认机制. 生成者发送消息,Exchange路由消息到队列,RabbitMQ就会给生产者发送确认Ack.(注意:发布确认机制不能和事务机制一起使用) 注意:多消息发布确认机制情况下,倘若要发送 100 条消息,发送 90 条后,突然网络故障,后面的消息发送失败了,那么 isAllPublished 返回的是 false,而前面 90 条消息已经发送到消息队列了.我们还不知道哪些消息是发送失败的,所以很多条消息发布确认,建议分几次发送或多通道发送. 二.持久化

ActiveMQ的学习(三)(ActiveMQ的消息事务和消息的确认机制)

ActiveMQ的消息事务 消息事务,是保证消息传递原子性的一个重要特性,和JDBC的事务特征类似. 一个事务性发送,其中一组消息要么能够全部保证到达服务器,要么都不到达服务器.生产者,消费者与消息服务器都支持事务性.ActiveMQ得事务主要偏向在生产者得应用. ActiveMQ消息事务流程图: 原生jms事务发送(生产者的事务发送) 不加事务得情况:(程序没有错误,10条消息会到达mq中) 不加事务得情况:(程序有错误,结果是发送成功3条,其余不成功---因为没有加事务) 加事务得情况:(程

ActiveMQ(16):Message Dispatch的分发策略、消息批量确认和生产者流量控制

一.分发策略(Dispatch Policies) 1.1 严格顺序分发策略(Strict Order Dispatch Policy) 通常ActiveMQ会保证topic consumer以相同的顺序接收来自同一个producer的消息,但有时候也需要保证不同的topic consumer以 相同的顺序接收消息,然而,由于多线程和异步处理,不同的topic consumer可能会以不同的顺序接收来自不同producer的消息. Strict order dispatch policy会保证每

RabbitMQ消息发布和消费的确认机制

前言 新公司项目使用的消息队列是RabbitMQ,之前其实没有在实际项目上用过RabbitMQ,所以对它的了解都谈不上入门.趁着周末休息的时间也猛补习了一波,写了两个窗体应用,一个消息发布端和消息消费端.园子里解释RabbitMQ基础的很多了,这里就不对RabbitMQ的基础再做叙述了,来点实际工作中一定会碰到的问题和解决的方案. RabbitMQ 消息发布确认机制 默认情况下消息发布端执行BasicPublish方法后,消息是否到达指定的队列的结果发布端是未知的.BasicPublish方法的

学习ActiveMQ(六):JMS消息的确认与重发机制

当我们发送消息的时候,会出现发送失败的情况,此时我们需要用到activemq为我们提供了消息重发机制,进行消息的重新发送.那么我们怎么知道消息有没有发送失败呢?activemq还有消息确认机制,消费者在接收到消息的时候可以进行确认.本节将确认机制和重发机制一起在原有的代码中学习. 消息确认机制有四种:定义于在session对象中 AUTO_ACKNOWLEDGE= 1 :自动确认 CLIENT_ACKNOWLEDGE= 2:客户端手动确认 UPS_OK_ACKNOWLEDGE= 3: 自动批量确

【转】RabbitMQ基础——和——持久化机制

这里原来有一句话,触犯啦天条,被阉割!!!! 首先不去讨论我的日志组件怎么样.因为有些日志需要走网络,有的又不需要走网路,也是有性能与业务场景的多般变化在其中,就把他抛开,我们只谈消息RabbitMQ. 那么什么是RabbitMQ,它是用来解决什么问题的,性能如何,又怎么用?我会在下面一一阐述,如有错误,不到之处,还望大家不吝赐教. RabbitMQ简介 必须一提的是rabbitmq是由LShift提供的一个消息队列协议(AMQP)的开源实现,由以高性能.健壮以及可伸缩性出名的Erlang写成(

rabbitMQ基础知识--消息确认机制之生产者端的确认机制

一:消息确认种类 RabbitMQ的消息确认有两种. 一种是消息发送确认.这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递.发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列. 第二种是消费接收确认.这种是确认消费者是否成功消费了队列中的消息. 具体建议参考:https://www.cnblogs.com/nizuimeiabc1/p/9397326.html 这里我们重点研究下生产者确认的情况. 生产者确认模式实现原理: 生产者将信道设置成conf

(转)RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

RabbitMQ之消息确认机制(事务+Confirm)

概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎