RabbitMQ事物模式

Rabbit的消息确认机制(事务+confirm)
在rabbmitmq中我们可以通过持久化数据解决rabbitmq服务器异常的数据丢失问题
问题:生产者将消息发送出去之后消息到底有没有到达rabbitmq服务器默认的情况是不知道的;

事物两种方式:
AMQP实现了事务机制
Confirm模式

事务机制
txSelect、txCommit、txRollback
txSelect:用户将当前 channel设置成transation横式
txCommit:用于搜交事务
txRollback:回滚事务

一、AMQ模式

AMQ模式耗时,降低Rabbitmq消息吞吐量性能不好
生产者

package tx;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.ConnectionUtil;
import java.io.IOException;

public class TxSend {
    private static String QUE_NAME = "test_que_tx";
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUE_NAME,false,false,false,null);
        String msg = "hellow tx";
        try {
            channel.txSelect();
            channel.basicPublish("", QUE_NAME, null, msg.getBytes());
            channel.txCommit();
        } catch (IOException e) {
            channel.txRollback();
            e.printStackTrace();
        }
        System.out.println("txsend=====>" + msg);
        channel.close();
        connection.close();
    }
}

消费者

package tx;

import com.rabbitmq.client.*;
import utils.ConnectionUtil;
import java.io.IOException;

public class TxRecv {
    private static String QUE_NAME = "test_que_tx";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUE_NAME,false,false,false,null);
        Consumer consumer = new DefaultConsumer(channel) {
            //当消息到达时执行回调方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("[receivetx]:" + message);
            }
        };
        channel.basicConsume(QUE_NAME,true,consumer);
    }
}

二、Confirm模式

生产者将信道设置成confirm模式,一旦信道进入 confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出, broker回传给生产者的确认消息中 deliver-tag域包含了确认消息的序列号,此外broker也可以设置 basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。

Confirm的最大好处是异步

开启方法:channel.confirmSelect()

编程模式有三种方法:

(1)普通发一条 waitForConfirms()

(2)发一批 waitForConfirms()

(3)异步confirm模式:提供回调方法

1)普通模式代码

生产者

package confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send1 {
    static String QUEUE_NAME = "test_queue_confirm1";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.confirmSelect();
        String message = "This is confirm queue";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf-8"));
        if(!channel.waitForConfirms()) {
            System.out.println("message send failed");
        } else {
            System.out.println("message send success");
        }
        channel.close();
        connection.close();
    }
}

消息者

package confirm;

import com.rabbitmq.client.*;
import utils.ConnectionUtil;
import java.io.IOException;

public class Recv1 {
    private static String QUE_NAME = "test_queue_confirm1";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUE_NAME,false,false,false,null);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("[receiveconfirm1]:" + message);
            }
        };
        channel.basicConsume(QUE_NAME,true,consumer);
    }
}

2)批量模式

生产者

package confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send1 {
    static String QUEUE_NAME = "test_queue_confirm2";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.confirmSelect();
        String message = "This is confirm queue batch";
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("utf-8"));
        }
        //发送完一起确认
        if(!channel.waitForConfirms()) {
            System.out.println("message send failed");
        } else {
            System.out.println("message send success");
        }
        channel.close();
        connection.close();
    }
}

消费者

package confirm;

import com.rabbitmq.client.*;
import utils.ConnectionUtil;
import java.io.IOException;

public class Recv1 {
    private static String QUE_NAME = "test_queue_confirm2";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUE_NAME,false,false,false,null);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("[receiveconfirm1]:" + message);
            }
        };
        channel.basicConsume(QUE_NAME,true,consumer);
    }
}

三、异步模式

Channel对象提供的ConfirmListener()回调方法只包含 deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个 Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次 handbAck方法,unconfim集合删掉相应的一条(multiple= false)或多条( multiple=tmue)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构

生产者

package confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import utils.ConnectionUtil;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;

public class Send3 {
    static String QUEUE_NAME = "test_queue_confirm3";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.confirmSelect();

        final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {//成功调用
                if(multiple) {
                    System.out.println("----------handleAck---------multiple----------");
                    confirmSet.headSet(deliveryTag+1).clear();
                } else {
                    System.out.println("----------handleAck---------multiple----------false");
                    confirmSet.remove(deliveryTag);
                }
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {//失败调用
                if(multiple) {
                    System.out.println("----------handleNack---------multiple----------");
                    confirmSet.headSet(deliveryTag+1).clear();
                } else {
                    System.out.println("----------handleNack---------multiple----------false");
                    confirmSet.remove(deliveryTag);
                }
            }
        });
        String message = "This is confirm queue synchronized";
        while (true) {
            long seqNo = channel.getNextPublishSeqNo();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes("utf-8"));
            confirmSet.add(seqNo);
        }
    }
}

消费者

package confirm;

import com.rabbitmq.client.*;
import utils.ConnectionUtil;
import java.io.IOException;

public class Recv3 {
    private static String QUE_NAME = "test_queue_confirm3";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUE_NAME,false,false,false,null);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("[receiveconfirm3]:" + message);
            }
        };
        channel.basicConsume(QUE_NAME,true,consumer);
    }
}

原文地址:https://www.cnblogs.com/xiaofengfree/p/10425313.html

时间: 2024-08-01 14:45:57

RabbitMQ事物模式的相关文章

使用rabbitmq rpc 模式

服务器端 安装 ubuntu 16.04 server 安装 rabbitmq-server 设置 apt 源 curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.python.sh | bash 使用 apt-get install rabbitmq-server 安装 rabbitmq 服务器 按键Y或者 y 确认安装 rabbitmq-server 简单管理 rabbitm

Rabbitmq -Publish_Subscribe模式- python编码实现

what is Exchanges ?? Let's quickly go over what we covered in the previous tutorials: A producer is a user application that sends messages. A queue is a buffer that stores messages. A consumer is a user application that receives messages. The core id

RabbitMQ广播模式

广播模式:1对多,produce发送一则消息多个consumer同时收到.注意:广播是实时的,produce只负责发出去,不会管对端是否收到,若发送的时刻没有对端接收,那消息就没了,因此在广播模式下设置消息持久化是无效的. 三种广播模式: fanout: 所有bind到此exchange的queue都可以接收消息(纯广播,绑定到RabbitMQ的接受者都能收到消息):direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息:topic:所有符合routin

RabbitMQ消息模式02

消费端限流 什么是消费端的限流? 假设一个场景,首先,我们RabbitMQ服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据! 消费端限流RabbitMQ提供的解决方案 RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于Consumer或者Channel设置Qos的值)未被确认前,不进行消费新的消息 Void BasicQos(uint pre

Rabbitmq -Routeing模式- python编码实现

(using the pika 0.10.0 Python client) In the previous tutorial we built a simple logging system. We were able to broadcast log messages to many receivers. In this tutorial we're going to add a feature to it - we're going to make it possible to subscr

RabbitMQ/JAVA (发布/订阅模式)

发布/订阅模式即生产者将消息发送给多个消费者. 下面介绍几个在发布/订阅模式中的关键概念-- 1. Exchanges (转发器) 可能原来我们都是基于一个队列发送和接收消息.现在介绍一下完整的消息传递模式. Rabbitmq消息模式的核心理念是:生产者没有直接发送任何消息到队列.实际上,生产者都不知道这个消息是发送给哪个队列的.相反,生产者只能发送消息给转发器. 转发器一方面接收生产者的消息,另一方面向队列推送消息. 转发器必须清楚的指导如何处理接收到的消息,需要附加队列吗?附加几个?或者是否

RabbitMQ指南之三:发布/订阅模式(Publish/Subscribe)

在上一章中,我们创建了一个工作队列,工作队列模式的设想是每一条消息只会被转发给一个消费者.本章将会讲解完全不一样的场景: 我们会把一个消息转发给多个消费者,这种模式称之为发布-订阅模式. 为了阐述这个模式,我们将会搭建一个简单的日志系统,它包含两种程序:一种发送日志消息,另一种接收并打印日志消息.在这个日志系统里,每一个运行的消费者都可以获取到消息,在这种情况下,我们可以实现这种需求:一个消费者接收消息并写入磁盘,另一个消费者接收消息并打印在电脑屏幕上.简单来说,生产者发布的消息将会以广播的形式

【译】RabbitMQ:发布-订阅(Publish/Subscribe)

在前一篇教程中,我们创建了一个工作队列,我们假设在工作队列后的每一个任务都只被调度给一个消费者.在这一部分,我们将做一些完全不一样的事情,调度同一条消息给多个消费者,也就是有名的“发布-订阅”模式.为了阐述这种模式,我们将构建一个简单的日志系统.该系统将由两部分组成:一部分发送日志消息,另一部分接收并且打印日志消息,在这个日志系统中,每一份运行着的接收程序都将会收到消息.这样我们可以运行一个接收者把日志写入到磁盘中,同时可以运行另一个接收者将日志打印到显示器上面.也就是说,发布的日志消息会被广播

springboot学习笔记-6 springboot整合RabbitMQ

一 RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让给apache). 消息中间件的工作过程可以用生产者消费者模型来表示.即,生产者不断的向消息队列发送信息,而消费者从消息队列中消费信息.具体过程如下: 从上图可看出,对于消息队列来说,生产者,消息队列,消费者是最重要的三个概念,生产者发消息到消息队列中去,消费者监听指定的消息