RabbitMQ消息确认机制—消息发送确认和 消息接收确认

/**
* RabbitMQ消息确认机制
* 关于rabbit的生产和消费方的一些实用的操作;
* producer的confirm和consumer的ack,这两者使用的模式都是用来保证数据完整性,防止数据丢失

*/

/**
     * producer的confirm模式
     * 业务场景描述:
     * 促销系统在做活动前,需要给用户的手机发送一条活动内容短信希望用户来参加,
     * 因为用户量有点大,所以通过往短信mq中插入数据方式,让短信服务来消费mq发短信;
     * 此时插入mq消息的服务为了保证给所有用户发消息,并且要在短时间内插入完成(因此用到了异步插入方式(快速)),
     * 我们就需要知道每次插入mq是否成功,如果不成功那我们可以收集失败的信息后补发(因此confirm模式排上了用场);
     * 开启confirm模式后,返回send结果(成功或失败)
     */

    public RabbitTemplate getRabbitTemplate(RabbitTemplate.ConfirmCallback confirmCallback){
        CachingConnectionFactory connectionFactory = null;
        return getRabbitTemplate(connectionFactory, confirmCallback);

    }

    //producer生产 - confirm模式
    public RabbitTemplate getRabbitTemplate(CachingConnectionFactory connectionFactory,RabbitTemplate.ConfirmCallback confirmCallback){
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        //product开启confirm模式
        connectionFactory.setPublisherConfirms(true);
        //设置confirm回调处理
        template.setConfirmCallback(confirmCallback);
        return template;

    }
/**
     * consumer的ack模式
     * 场景描述:短信服务去消费mq队列信息时,倘若服务调用的运营商发送短信接口异常了(短信运营商接口欠费),
     * 我们此时的短信是发送失败的,用户也收不到短信,但是在默认(默认开启ack)前提下mq消息已经被消费了rabbit中没有记录了(kafka例外);
     * 想要mq消息在业务逻辑异常时还存在,那么可以使用ack方式;
     * 业务无异常,发送ack标识,mq消息释放
     *
     * 在springboot中可以使用基于amqp封装的工厂类关闭自动ack模式,改为手动ack方式;
     * 只有当业务代码流程走完后,最后通过代码设置ack标识,来通知rabbit消息可以丢弃了;
     * 如果设置了手动模式后,又没有提交ack标识,那么mq中的消息一直存在无法释放(每次consumer消费后,rabbit会把noack的消息重复放入队列中):
     */

    public SimpleRabbitListenerContainerFactory listenerContainerFactory(){
        ConnectionFactory connectionFactory = null;
        return listenerContainerFactory(connectionFactory);
    }

    public SimpleRabbitListenerContainerFactory listenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //代码手动ack
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //开启消费者数量
        factory.setConcurrentConsumers(2);
        //手动确认模式可以使用 prefetch,限制通道上未完成的(“正在进行中的”)发送的数量
        //每次接受数据量,默认250
        factory.setPrefetchCount(300);
        return factory;
    }

    /**
     * 消息确认–ACK
     * 通过连接工厂设置手动ack方式,然后获取mq消息后,走完正常业务逻辑,最后再手动通知ack释放消息,如下:
     */
    private void firstNodeListener(String msg,Channel channel,Message message){
        try {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            System.out.println("firstNodeListener - 消费消息 [" + deliveryTag + "] - " + msg);
            //这里ack主要根据mq消息的唯一编号(deliverTag)来通知;
            //如果我们不设置ack确认,RabbitMQ会认为这个消息没有正常消费,会将此消息重新放入队列中
            //忘记通过basicAck返回确认信息,将导致消费者客户端退出或者关闭后,消息会被退回RabbitMQ服务器,这会使RabbitMQ服务器内存爆满,而且RabbitMQ也不会主动删除这些被退回的消息
            channel.basicAck(deliveryTag, true);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

本文源自:https://www.cnblogs.com/wangrudong003/p/11436990.html

原文地址:https://www.cnblogs.com/wueryuan/p/12299951.html

时间: 2024-10-13 11:24:55

RabbitMQ消息确认机制—消息发送确认和 消息接收确认的相关文章

storm 消息确认机制及可靠性

worker进程死掉 在一个节点 kill work进程 比方 kill 2509  对work没有影响 由于会在其它节点又一次启动进程运行topology任务 supervisor进程死掉 supervisor进程kill掉 对work进程没有影响  由于他们是互相独立的! . nimbus进程死掉(存在HA的问题) nimbus假设死掉 整个任务挂掉 存在单点故障问题!(hadoop2有ha!.!!.! storm没有ha高可用) 节点宕机(和supervisor是一样的) ack/fail

activemq的消息确认机制ACK

一.简介 消息消费者有没有接收到消息,需要有一种机制让消息提供者知道,这个机制就是消息确认机制. ACK(Acknowledgement)即确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符.表示发来的数据已确认接收无误. 二.ACK_MODE有几类 我们在开发JMS应用程序的时候,会经常使用到上述ACK_MODE,其中"INDIVIDUAL_ACKNOWLEDGE "只有ActiveMQ支持,当然开发者也可以使用它. ACK_MODE描述了Consumer与broker确认

activemq 消息阻塞优化和消息确认机制优化

一.消息阻塞优化 1.activemq消费者在从待消费队列中获取消息是会先进行预读取,默认是1000条(prefetch=1000).这样很容易造成消息积压. 2.可以通过设置prefetch的默认值来调整预读取条数,java代码如下 //设置预读取为1ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();p.setQueuePrefetch(1);//创建一个链接工厂connectionFactory = new ActiveMQCon

Win32消息循环机制等【转载】http://blog.csdn.net/u013777351/article/details/49522219

Dos的过程驱动与Windows的事件驱动 在讲本程序的消息循环之前,我想先谈一下Dos与Windows驱动机制的区别: DOS程序主要使用顺序的,过程驱动的程序设计方法.顺序的,过程驱动的程序有一个明显的开始,明显的过程及一个明显的结束,因此程序能直接控制程序事件或过程的顺序.虽然在顺序的过程驱动的程序中也有很多处理异常的方法,但这样的异常处理也仍然是顺序的,过程驱动的结构. 而Windows的驱动方式是事件驱动,就是不由事件的顺序来控制,而是由事件的发生来控制,所有的事件是无序的,所为一个程

消息转发机制入门篇

一.何时处发消息转发机制? 解:当对象接收到无法解读的消息后,就会启动“消息转发”(message forwarding)机制,程序员可经由此过程告诉对象应该如何处理未知消息. 如:-[__NSCFNumber lowercaseString] :unrecognized selector sent to instance 0x87 上面这段异常信息是由NSObjc 的”doesNotRecognizeSelector”方法所抛出的,此异常表明:消息接收者的类型是_ _NSCFNumber,而该

VS2010/MFC编程入门之五(MFC消息映射机制概述)

VS2010/MFC编程入门之五(MFC消息映射机制概述)-软件开发-鸡啄米 http://www.jizhuomi.com/software/147.html     上一讲鸡啄米为大家简单分析了MFC应用程序框架,这一讲是关于MFC消息映射机制的内容.        前面已经说过,Windows应用程序是消息驱动的.在MFC软件开发中,界面操作或者线程之间通信都会经常用到消息,通过对消息的处理实现相应的操作.比较典型的过程是,用户操作窗口,然后有消息产生,送给窗口的消息处理函数处理,对用户的

安卓中的消息循环机制Handler及Looper详解

我们知道安卓中的UI线程不是线程安全的,我们不能在UI线程中进行耗时操作,通常我们的做法是开启一个子线程在子线程中处理耗时操作,但是安卓规定不允许在子线程中进行UI的更新操作,通常我们会通过Handler机制来完成该功能,即当子线程中耗时操作完成后,在子线程中通过Handler向主线程发送消息,在主线程中的Handler的handleMessage方法中处理接受到的消息.这就是安卓中的消息机制,安卓中的消息机制主要是指Handler的运行机制,但是Handler的运行需要底层的MessageQu

springboot项目整合rabbitMq涉及消息的发送确认,消息的消费确认机制

1.引入maven依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>2.在application.yml的配置: spring: rabbitmq: host: 106.52.82.241 port: 5672 username: yang

(转)RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message