springboot整合rabbit,支持消息确认机制

安装

推荐一篇博客https://blog.csdn.net/zhuzhezhuzhe1/article/details/80464291

项目结构

POM.XML

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 4     <modelVersion>4.0.0</modelVersion>
 5
 6     <groupId>com.example</groupId>
 7     <artifactId>rabbitmq</artifactId>
 8     <version>0.0.1-SNAPSHOT</version>
 9     <packaging>jar</packaging>
10
11     <name>rabbitmq</name>
12     <description>Spring Boot 整合RabbitMQ</description>
13
14     <parent>
15         <groupId>org.springframework.boot</groupId>
16         <artifactId>spring-boot-starter-parent</artifactId>
17         <version>2.0.5.RELEASE</version>
18         <relativePath/> <!-- lookup parent from repository -->
19     </parent>
20
21     <properties>
22         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
23         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
24         <java.version>1.8</java.version>
25     </properties>
26
27     <dependencies>
28         <dependency>
29             <groupId>org.springframework.boot</groupId>
30             <artifactId>spring-boot-starter</artifactId>
31         </dependency>
32
33         <!-- rabbitmq -->
34         <dependency>
35             <groupId>org.springframework.boot</groupId>
36             <artifactId>spring-boot-starter-amqp</artifactId>
37         </dependency>
38
39         <dependency>
40             <groupId>org.springframework.boot</groupId>
41             <artifactId>spring-boot-starter-test</artifactId>
42             <scope>test</scope>
43         </dependency>
44     </dependencies>
45
46     <build>
47         <plugins>
48             <plugin>
49                 <groupId>org.springframework.boot</groupId>
50                 <artifactId>spring-boot-maven-plugin</artifactId>
51             </plugin>
52         </plugins>
53     </build>
54
55
56 </project>

POM.XML

application.yml

需要将publisher-confrems设为true,启动确认回调, 将 publisher-returns设为true 确认返回回调

rabbitmq配置类--RabbitConfig

第一部分, 定义队列

第二部分,设置一些消息处理策略

 1 package com.example.rabbitmq;
 2
 3 import org.slf4j.Logger;
 4 import org.slf4j.LoggerFactory;
 5 import org.springframework.amqp.core.Queue;
 6 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 7 import org.springframework.context.annotation.Bean;
 8 import org.springframework.context.annotation.Configuration;
 9
10 import javax.annotation.Resource;
11
12 /**
13  * rabbitMq 配置类
14  * @author milicool
15  * Created on 2018/9/14
16  */
17 @Configuration
18 public class RabbitConfig {
19     @Resource
20     private RabbitTemplate rabbitTemplate;
21
22     /**
23      * 定义一个hello的队列
24      * Queue 可以有4个参数
25      *      1.队列名
26      *      2.durable       持久化消息队列 ,rabbitmq重启的时候不需要创建新的队列 默认true
27      *      3.auto-delete   表示消息队列没有在使用时将被自动删除 默认是false
28      *      4.exclusive     表示该消息队列是否只在当前connection生效,默认是false
29      */
30     @Bean
31     public Queue helloQueue() {
32         return new Queue("queue-test");
33     }
34
35     /** ======================== 定制一些处理策略 =============================*/
36
37     /**
38      * 定制化amqp模版
39      *
40      * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调   即消息发送到exchange  ack
41      * ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调  即消息发送不到任何一个队列中  ack
42      */
43     @Bean
44     public RabbitTemplate rabbitTemplate() {
45         Logger log = LoggerFactory.getLogger(RabbitTemplate.class);
46
47         // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
48         rabbitTemplate.setMandatory(true);
49
50         // 消息返回, yml需要配置 publisher-returns: true
51         rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
52             String correlationId = message.getMessageProperties().getCorrelationIdString();
53             log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
54         });
55
56         // 消息确认, yml需要配置 publisher-confirms: true
57         rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
58             if (ack) {
59                 // log.debug("消息发送到exchange成功,id: {}", correlationData.getId());
60             } else {
61                 log.debug("消息发送到exchange失败,原因: {}", cause);
62             }
63         });
64
65         return rabbitTemplate;
66     }
67 }

配置类

生产者

 1 /**
 2  * 生产者
 3  * @author milicool
 4  * Created on 2018/9/14
 5  */
 6 @Component
 7 public class Producer {
 8
 9     @Autowired
10     private RabbitTemplate rabbitTemplate;
11
12     /**
13      * 给hello队列发送消息
14      */
15     public void send() {
16         for (int i =0; i< 100; i++) {
17             String msg = "hello, 序号: " + i;
18             System.out.println("Producer, " + msg);
19             rabbitTemplate.convertAndSend("queue-test", msg);
20         }
21     }
22
23 }

消费者

 1 /**
 2  * 消费者
 3  * @author milicool
 4  * Created on 2018/9/14
 5  */
 6 @Component
 7 public class Comsumer {
 8     private Logger log = LoggerFactory.getLogger(Comsumer.class);
 9
10     @RabbitListener(queues = "queue-test")
11     public void process(Message message, Channel channel) throws IOException {
12         // 采用手动应答模式, 手动确认应答更为安全稳定
13         channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
14         log.info("receive: " + new String(message.getBody()));
15     }
16 }

测试类

 1 @RunWith(SpringRunner.class)
 2 @SpringBootTest
 3 public class RabbitmqApplicationTests {
 4
 5     @Autowired
 6     private Producer producer;
 7
 8     @Test
 9     public void contextLoads() {
10         producer.send();
11     }
12
13 }

测试结果

测试结果太长,没有截取全部,可以查看到消费者接收到了全部消息,如果有的消息在没有接收完,消息将被持久化,下次启动时消费

web端查看

感谢阅读  o(∩_∩)o

原文地址:https://www.cnblogs.com/milicool/p/9662447.html

时间: 2024-10-09 01:53:25

springboot整合rabbit,支持消息确认机制的相关文章

RabbitMQ消息确认机制—消息发送确认和 消息接收确认

/** * RabbitMQ消息确认机制 * 关于rabbit的生产和消费方的一些实用的操作: * producer的confirm和consumer的ack,这两者使用的模式都是用来保证数据完整性,防止数据丢失 */ /** * producer的confirm模式 * 业务场景描述: * 促销系统在做活动前,需要给用户的手机发送一条活动内容短信希望用户来参加, * 因为用户量有点大,所以通过往短信mq中插入数据方式,让短信服务来消费mq发短信: * 此时插入mq消息的服务为了保证给所有用户发

activemq的消息确认机制ACK

一.简介 消息消费者有没有接收到消息,需要有一种机制让消息提供者知道,这个机制就是消息确认机制. ACK(Acknowledgement)即确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符.表示发来的数据已确认接收无误. 二.ACK_MODE有几类 我们在开发JMS应用程序的时候,会经常使用到上述ACK_MODE,其中"INDIVIDUAL_ACKNOWLEDGE "只有ActiveMQ支持,当然开发者也可以使用它. ACK_MODE描述了Consumer与broker确认

RabbitMQ (十二) 消息确认机制 - 发布者确认

消费者确认解决的问题是确认消息是否被消费者"成功消费". 它有个前提条件,那就是生产者发布的消息已经"成功"发送出去了. 因此还需要一个机制来告诉生产者,你发送的消息真的"成功"发送了. 在标准的AMQP 0-9-1,保证消息不会丢失的唯一方法是使用事务:在通道上开启事务,发布消息,提交事务.但是事务是非常重量级的,它使得RabbitMQ的吞吐量降低250倍.为了解决这个问题,RabbitMQ 引入了 发布者确认(Publisher Confir

activemq 消息阻塞优化和消息确认机制优化

一.消息阻塞优化 1.activemq消费者在从待消费队列中获取消息是会先进行预读取,默认是1000条(prefetch=1000).这样很容易造成消息积压. 2.可以通过设置prefetch的默认值来调整预读取条数,java代码如下 //设置预读取为1ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();p.setQueuePrefetch(1);//创建一个链接工厂connectionFactory = new ActiveMQCon

(转)RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

RabbitMQ之消息确认机制(事务+Confirm)

概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎

RabbitMQ 消息确认机制

消息确认机制 在之前异常处理部分就已经写了,对于consumer的异常退出导致消息丢失,可以时候consumer的消息确认机制.重复的就不说了,这里说一些不一样的. consumer的消息确认机制 当一个消费者收到一个快递,但是这个包裹是破损的,这时候一般会有以下选择 拒收快递,让快递员把快递寄回. (如果有多个consumer可能这条消息会到其它的consumer中,如果只有一个,那么下次获取还是可以拿到) 签收快递,然后偷偷的扔了(钱多任性) 拒收快递,联系商家再给我补发一个 下面是具体的方

storm 消息确认机制及可靠性

worker进程死掉 在一个节点 kill work进程 比方 kill 2509  对work没有影响 由于会在其它节点又一次启动进程运行topology任务 supervisor进程死掉 supervisor进程kill掉 对work进程没有影响  由于他们是互相独立的! . nimbus进程死掉(存在HA的问题) nimbus假设死掉 整个任务挂掉 存在单点故障问题!(hadoop2有ha!.!!.! storm没有ha高可用) 节点宕机(和supervisor是一样的) ack/fail