RabbitMQ发送端事务管理 —— 事务机制 和 确认机制

一、AMQP提供

事务机制,比较消耗性能

try {
    channel.txSelect();
    channel.basicPublish(EXCHANGE_NAME, "queue22", true, MessageProperties.PERSISTENT_TEXT_PLAIN,
            msg.getBytes());
    channel.addReturnListener(new ReturnListener() {
        public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4,
                byte[] arg5) throws IOException {
            System.out.println("返回的消息是:" + new String(arg5));
        }
    });
    channel.txCommit();
} catch (Exception e) {
    e.printStackTrace();
    channel.txRollback();
}

二、RabbitMQ提供

消息确认机制(效率比事务机制高)

try {
    channel.confirmSelect();
    channel.basicPublish(EXCHANGE_NAME, "queue2", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
    if(!channel.waitForConfirms()) {
        System.out.println("send message failed");
    }
} catch (Exception e) {
    e.printStackTrace();
}

但是确认模式,是每发送一条消息后就调用channel.waitForConfirms()方法,之后等待服务器的确认,这实际上是一种串行、同步等待的方式,事务机制和它一样(但在实际大数量量测试时,会发现确认机制只比事务机制效率高一点点。是因为同步等待的方式下,confirm机制发送一条消息需要通信交互的命令是2条:Basic.Publish和Basic.Ack;事务机制是3条:Basic.Publish、Tx.Commit/Tx.Commit-Ok或者Tx.Rollback/Tx.Rollback-Ok)

优化方案一:

批量confirm方法,比较简单,不写代码了,就是将channel.basicPublish用循环框起来,发送次数到达一定数量后,进行waitForConfirms()即可。缺点是,如果这批出现返回Basic.Nack或者超时情况,客户端需要将这一批重新发送,当消息经常丢失是,性能不升反降。

注意:要将发送出去的消息存入缓存 中,可以是ArrayList或者BlockingQueue之类的,然后适时的清空,或者重发里面的信息。

优化方案二:

异步confirm方法,提供一个回调方法,服务端确认了一条或者多条消息后客户端会回调这个方法进行处理

channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("Nack,SeqNo:"+deliveryTag+", multiple:"+multiple);
        if(multiple) {
            confirmSet.headSet(deliveryTag-1).clear();
        }else {
            confirmSet.remove(deliveryTag);
        }
    }
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("deliveryTag:"+deliveryTag+", multiple:"+multiple);
        if(multiple) {
            confirmSet.headSet(deliveryTag-1).clear();
        }else {
            confirmSet.remove(deliveryTag);
        }
        //注意这里需要添加处理消息重发的场景
    }
});
channel.basicPublish(EXCHANGE_NAME, "queue22", true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

三、总结

1、事务机制和确认机制是互斥的,不能共存。

2、事务机制和确认机制确保的是消息能够正确地发送到RabbitMQ的交换器,如果些交换器没有匹配队列,那么消息也会丢失。

3、普通事务机制和普通confirm的方式吞吐量很低,但方式简单,不需要在客户端维护状态(这里指的是维护deliveryTag及缓存未确认的消息)。

  批量confirm方式的问题在于遇到RabbitMQ服务端返回Basicnack需要重发批量导致的性能降低。

  异步confirm方式和批量confirm一样需要在客户端维护状态。

原文地址:https://www.cnblogs.com/yifanSJ/p/9070039.html

时间: 2024-11-09 21:06:32

RabbitMQ发送端事务管理 —— 事务机制 和 确认机制的相关文章

JavaEE中的事务管理——事务概述(1)

今天打算说一说事务管理,读者可能了解也有可能不了解,其实很简单(大牛请自行绕过).本来想引用个成语的啥的来描述事务的特点,但是搜肠刮肚也没有发现合适的,于是就找了下面几组成语来描述事务性.其实在官方文档中对于事务的描述也是分四个方面来说的.这里算是用自己的理解解释一下罢了. 第一对词语是:"开弓没有回头箭"和"前功尽弃"(功亏一篑?功败垂成?) 这一对词语结合起来看就是事务的原子性,就是我们平时说的要做就做完,要不做就一点儿也不要做.其中前面的做完就是说明这个事务已

JavaEE中的事务管理——事务划界

前面博文中大致介绍了一下事务,其实在企业应用服务器中事务是在不同的级别上存在的.比较简单的事务是最底层的事务,就是位于资源级别的事务管理.假设数据最终要存储在一个关系型数据库中,那么最底层的事务就是位于这里.我们把这种事务称之为资源本地事务(resource-localtransaction)在不用容器的大部分情况下开发人员要面对的事务都属于这里(其他的事务面对不到是因为水平不够!).理解了数据的事务之后来解决资源本地事务就易如反掌了.如果使用了企业应用服务器那么要处理的大多数事务就是JavaT

ActiveMQ 重发机制与确认机制 实践

一.配置spring-activemq.xml 1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation=&qu

spring笔记--事务管理之声明式事务

事务简介: 事务管理是企业级应用开发中必不可少的技术,主要用来确保数据的完整性和一致性, 事务:就是一系列动作,它们被当作一个独立的工作单元,这些动作要么全部完成,要么全部不起作用. Spring中使用事务: 作为一个受欢迎的企业应用框架,Spring在不同的事务管理API上定义了一个抽象层,而开发时不必了解底层的事务管理API,就可以使用Spring的事务管理机制. Spring既支持编程式的事务管理,也支持声明式的事务管理,大多数情况我们选择后者. 编程式事务管理:将事务管理代码嵌入到业务代

Java数据库连接——JDBC调用存储过程,事务管理和高级应用

阅读目录 一.JDBC常用的API深入详解及存储过程的调用1.存储过程(Stored Procedure)的介绍2.JDBC调用无参存储过程3.JDBC调用含输入参数存储过程4.JDBC调用含输出参数存储过程二.JDBC的事务管理1.JDBC实现事务管理2.通过代码实现事物的管理三.数据库连接池(dbcp.c3p0)1.dbcp使用步骤2.c3p0使用步骤3.连接池总结四.JDBC的替代产品(Hibernate.Mybatis)1.Commons-dbutils 2.Hibernate简介3.M

[Spring框架]Spring 事务管理基础入门总结.

前言:在之前的博客中已经说过了数据库的事务, 不过那里面更多的是说明事务的一些锁机制, 今天来说一下Spring管理事务的一些基础知识. 之前的文章: [数据库事务与锁]详解一: 彻底理解数据库事务一, 什么是事务 事务是逻辑上一组操作,这组操作要么全都成功,要么全都失败. 事务的属性: ACID原子性(Atomicity): 事务作为一个整体被执行,包含在其中的对数据的操作要么全部被执行,要么都不执行.一致性(Consistency):事务应确保数据库的状态从一个一致状态转变为另一个一致状态.

spring事务管理aop

达内12 note unit 09 01 1.spring事务管理 2.spring提供了对事务管理支持 spring采用aop机制完成事务控制 可以实现在不修改原有组件代码情况下实现事务控制功能. spring提供了两种事务管理方式: a.编程式事务管理(编写java代码) TransactionTemplate b.声明式事务管理(编写配置,大家都用这种) xml版本配置 注解版本配置 --配置DataSourceTransactionManager --开启事务注解配置<tx:annota

Spring整合事务管理

不用每次 db操作 都要 开启事务  提交事务之类 了 抽取出来 事务管理方式: 编程式的(麻烦) 声明式的事务管理: 不同的框架机制 有不同的 TransactionManager JDBC  Mybatis 建表: create DATABASE spring; use spring; CREATE table book( isbn VARCHAR(50) PRIMARY KEY, book_name VARCHAR(100), price int ); CREATE TABLE book_

spring+mybatis之声明式事务管理初识(小实例)

前几篇的文章都只是初步学习spring和mybatis框架,所写的实例也都非常简单,所进行的数据访问控制也都很简单,没有加入事务管理.这篇文章将初步接触事务管理. 1.事务管理 理解事务管理之前,先通过一个例子讲一下什么是事务管理:取钱. 比如你去ATM机取1000块钱,大体有两个步骤:首先输入密码金额,银行卡扣掉1000元钱:然后ATM出1000元钱.这两个步骤必须是要么都执行要么都不执行.如果银行卡扣除了1000块但是ATM出钱失败的话,你将会损失1000元:如果银行卡扣钱失败但是ATM却出