Spring Boot系列(8)——RabbitMQ确认、退回模式及死信队列

〇、什么是消息队列

  参考:新手也能看懂,消息队列其实很简单

       RabbitMQ运行模型与名词解释

一、应答模式

  1.什么是应答?

    消息投递到交换器(exchange)中,交换器给我们的反馈,是保障消息投递成功的一种机制。

  2.测试

  配置:

 1 #选择确认类型为交互
 2 spring.rabbitmq.publisher-confirm-type=correlated

  测试方法:

 1     @Test
 2     /**
 3      * the test is testing confirm-function in rabbitmq
 4      */
 5     void messageSendTestWithConfirm(){
 6
 7         /*
 8          * 设置消息确认回调方法;
 9          * @ack 为true时,表示投递成功;为false表示投递失败;
10          * @CorrelationData 为自定义反馈信息;
11          * @cause 为投递失败的原因;
12          */
13         rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
14             @Override
15             public void confirm(CorrelationData correlationData, boolean ack, String cause) {
16                 if(!ack){
17                     logger.error("correlationData:"+ correlationData);
18                     logger.error("ack:"+ack);
19                     logger.error("cause:"+cause);
20                 }
21             }
22         });
23
24         //消息内容
25         Map<String,String> map = new HashMap<>();
26         map.put("message","testing confire function");
27
28         //设置自定义反馈消息
29         String uuid = UUID.randomUUID().toString();
30         logger.info("消息唯一ID:"+uuid);
31         CorrelationData correlationData = new CorrelationData();
32         correlationData.setId(uuid);
33
34         //并不存在名为“exchange-dog”的exchange
35         rabbitTemplate.convertAndSend("exchange-dog","dog",map,correlationData);
36
37     }

  测试结果

1  c.d.amqp.SpringBootAmqpApplicationTests  : 消息唯一ID:e6601e83-fad7-4b53-9968-c74828e62b23
2  o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.22.130:5672]
3  o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#f7cdf8:0/[email protected] [delegate=amqp://[email protected]:5672/, localPort= 8055]
4  o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘exchange-dog‘ in vhost ‘/‘, class-id=60, method-id=40)
5  c.d.amqp.SpringBootAmqpApplicationTests  : correlationData:CorrelationData [id=e6601e83-fad7-4b53-9968-c74828e62b23]
6  c.d.amqp.SpringBootAmqpApplicationTests  : ack:false
7  c.d.amqp.SpringBootAmqpApplicationTests  : cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘exchange-dog‘ in vhost ‘/‘, class-id=60, method-id=40)

  注意:Confirm模式只管有无投递到exchange,而不管有无发送到队列当中。

二、返回模式

  1.什么是返回模式?

    当消息未投递到queue时的反馈。

  2.测试

  配置:

1 #开启返回模式
2spring.rabbitmq.publisher-returns=true

  测试方法:

 1     @Test
 2     void messageSendTestWithReturn(){
 3         /*
 4          * 设置消息返回回调方法;
 5          * 该方法执行时则表示消息投递失败
 6          * @message 为反馈信息;
 7          * @replyCode 一个反馈代码,表示不同投递失败原因;
 8          * @replyText 反馈信息
 9          */
10         rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
11             @Override
12             public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
13                 logger.error("返回消息配置:"+message.getMessageProperties().toString());
14                 logger.error("反馈代码:"+replyCode);
15                 logger.error("反馈内容:"+replyText);
16                 logger.error("exchange:"+exchange);
17                 logger.error("routingKey:"+routingKey);
18             }
19         });
20
21         //消息内容
22         Map<String,String> map = new HashMap<>();
23         map.put("message","testing return function");
24
25         //并不存在名为“dog”的routingKey,即投不到现有的queue里
26         rabbitTemplate.convertAndSend("exchange-direct","dog",map);
27     }

  测试结果:

1 c.d.amqp.SpringBootAmqpApplicationTests  : 返回消息配置:MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]
2 c.d.amqp.SpringBootAmqpApplicationTests  : 反馈代码:312
3 c.d.amqp.SpringBootAmqpApplicationTests  : 反馈内容:NO_ROUTE
4 c.d.amqp.SpringBootAmqpApplicationTests  : exchange:exchange-direct
5 c.d.amqp.SpringBootAmqpApplicationTests  : routingKey:dog

三、限流策略(手动应答消息)

  1.为什么要限流?

    若队列中消息积压过多,突然开启监听,会导致消费端崩溃。

  2.如何限流?

    使用RabbitMQ提供的Qos(服务质量保证)功能,如果一定数目消息的未被应答前,不再接受新消息。

  3.测试

  配置

1 #手动消息应答
2 spring.rabbitmq.listener.simple.acknowledge-mode=manual

  

  测试

 1     /*
 2      * 消息手动应答
 3      * @RabbitListener注解监听来自指定队列的消息
 4      */
 5
 6     @RabbitListener(queues = "springboot-queue")
 7     public void revice(Message message,Channel channel) throws IOException {
 8         try{
 9             logger.info("消息ID:"+message.getMessageProperties().getHeader("spring_returned_message_correlation"));
10             logger.info("消息标签:"+String.valueOf(message.getMessageProperties().getDeliveryTag()));
11             /* 设置Qos机制
12              * 第一个参数:单条消息的大小(0表示即无限制)
13              * 第二个参数:每次处理消息的数量
14              * 第三个参数:是否为consumer级别(false表示仅当前channel有效)
15              */
16             channel.basicQos(0,1,false);
17             //手动应答消息  第一个参数是所确认消息的标识,第二参数是是否批量确认
18             channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
19
20         }catch (Exception e){
21             channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
22             logger.error("消息ID:"+message.getMessageProperties().getHeader("spring_returned_message_correlation"));
23             logger.error("接收消息发送错误:"+e.getMessage());
24         }
25     }

四、死信队列

  1.什么是死信?

    未被正常处理的消息。

  2.出现死信的情况

    (1)消息被拒绝(reject、nack),并且不重新放回原队列(requeue=false);

    (2)消息过期(设置了Expiration);

    (3)队列已满。

  3.配置

  以下代码配置了一个正常的exchange、queue和 一个专门接收死信的exchange、queue

  在队列中配置x-dead-letter-exchange参数,表示本队列出现死信,则转发到配置指定的交换器中去

 1     /*
 2      * 创建消息队列的对象:exchange、queue、绑定规则
 3      */
 4     @Test
 5     void createObjectOfMQ(){
 6
 7         /*
 8          * 在普通队列上配置参数。表示若本队列有死信,则转发到配置指定的转发器中去
 9          * @参数键:x-dead-letter-exchange
10          * @参数名:接收死信的交换器
11          */
12         Map<String,Object> arguments = new HashMap<>();
13         arguments.put("x-dead-letter-exchange","springboot-dlx-exchange");
14
15         amqpAdmin.declareExchange(new DirectExchange("springboot-direct"));
16         amqpAdmin.declareQueue(new Queue("springboot.queue",true,false,false,arguments));
17         amqpAdmin.declareBinding(new Binding("springboot.queue", Binding.DestinationType.QUEUE,"springboot-direct","springboot",null));
18
19         //接收死信的交换器
20         amqpAdmin.declareExchange(new TopicExchange("springboot-dlx-exchange"));
21         //交换器收到的死信都转发到该队列,#表示接收所有消息
22         amqpAdmin.declareQueue(new Queue("springboot-dlx.queue",true));
23         amqpAdmin.declareBinding(new Binding("springboot-dlx.queue", Binding.DestinationType.QUEUE,"springboot-dlx-exchange","#",null));
24     }

  运行以上方法后,可在RabbitMQ管理界面看到两个exchange、两个queue成功创建

  4.发送消息

 1    @Test
 2     void sendMessageWithTTL(){
 3         String str = "dlx test";
 4
 5         //setExpiration表示设置该消息存活时间
 6         //5秒后该消息未被消息,则转发到死信交换器
 7         Message message = MessageBuilder.withBody(str.getBytes())
 8                 .setExpiration("5000")
 9                 .setContentEncoding("UTF-8")
10                 .setMessageId("dlx-001")
11                 .build();
12
13         //将消息发送到配置了"x-dead-letter-exchange"参数的队列
14         rabbitTemplate.convertAndSend("springboot-direct","springboot",message);
15     }

   发送完消息,过5秒之后可以看到信息已被投递到死信队列中去了

setExpiration

原文地址:https://www.cnblogs.com/Drajun/p/12301304.html

时间: 2024-10-28 06:51:01

Spring Boot系列(8)——RabbitMQ确认、退回模式及死信队列的相关文章

Spring Boot中使用RabbitMQ的示例代码

很久没有写Spring Boot的内容了,正好最近在写Spring Cloud Bus的内容,因为内容会有一些相关性,所以先补一篇关于AMQP的整合. http://www.ljhseo.com/http://www.xyrjkf.net/http://www.xyrjkf.cn/http://www.xyrjkf.com.cn/http://www.zjdygsi.cn/http://www.zjdaiyun.cn/http://www.jsdygsi.cn/http://www.xyrjkf

Java微服务实践—Spring Boot系列

Java微服务实践-Spring Boot系列网盘地址:https://pan.baidu.com/s/1aMqPO4pXLeXDHvRuze-JWw 密码: j62z备用地址(腾讯微云):https://share.weiyun.com/c74335d7e383158ee3c4aaf193d471ed 密码:r5gqsk 原文地址:http://blog.51cto.com/12218470/2088359

spring boot系列(五)spring boot 配置spring data jpa (查询方法)

接着上面spring boot系列(四)spring boot 配置spring data jpa 保存修改方法继续做查询的测试: 1 创建UserInfo实体类,代码和https://www.cnblogs.com/kxm87/p/9273555.html中的一样. 2 创建数据库操作类相当于dao层,主要创建一个接口UserRepository,继承JpaRepository接口即可.本代码中主要都是自定义方法. 使用findXX 或者countXX(这两个不用编写sql,jpa会自动生成)

Spring Boot系列——如何集成Log4j2

上篇<Spring Boot系列--日志配置>介绍了Spring Boot如何进行日志配置,日志系统用的是Spring Boot默认的LogBack. 事实上,除了使用默认的LogBack,Spring Boot还可以使用Log4j.Log42等作为自己的日志系统.今天就那Log4j2来举例,说明Spring Boot是如何集成其他日志系统的. 添加jar包依赖 上篇提到过,Spring Boot默认使用LogBack,但是我们没有看到显示依赖的jar包,其实是因为所在的jar包spring-

Spring Boot系列——7步集成RabbitMQ

RabbitMQ是一种我们经常使用的消息中间件,通过RabbitMQ可以帮助我们实现异步.削峰的目的. 今天这篇,我们来看看Spring Boot是如何集成RabbitMQ,发送消息和消费消息的.同时我们介绍下死信队列. 集成RabbitMQ 集成RabbitMQ只需要如下几步即可 1.添加maven依赖 <!--rabbitmq--> <dependency> ? ? <groupId>org.springframework.boot</groupId>

详解Spring Boot中的RabbitMQ

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用. 消息中间件在互联网公司的使用中越来越多,刚才还看到新闻阿里将 RocketMQ 捐献给了 Apache,当然了今天的主角还是讲 RabbitMQ.消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的目的.在分布式的系统中,消息队列也会被用在很多其它的方面,比如:分布式

Spring Boot系列—(一)入门

前言 因为项目组需要进行微服务改造,而微服务开发中需要以Spring Boot为基础.因此需要先弄懂SpringBoot. 我们先来看看SpringBoot的背景由来,SpringBoot是什么,一个简单的SpringBoot样例工程 1.SpringBoot的背景由来 springboot是为了解决什么问题而产生的呢? 我们先来看看,在spring boot之前,作为一个java开发者要从无到有的开发一个新的web项目,我们都需要做哪些? 第一,我们创建一个java web项目,使用maven

spring boot系列03--spring security (基于数据库)登录和权限控制(下)

(接上篇) 后台 先说一下AuthConfig.java Spring Security的主要配置文件之一 AuthConfig 1 @Configuration 2 @EnableWebSecurity 3 public class AuthConfig extends WebSecurityConfigurerAdapter { 4 @Override 5 protected void configure(HttpSecurity httpSecurity) throws Exception

spring boot系列--spring security (基于数据库)登录和权限控制

先说一下AuthConfig.java Spring Security的主要配置文件之一 AuthConfig 1 @Configuration 2 @EnableWebSecurity 3 public class AuthConfig extends WebSecurityConfigurerAdapter { 4 @Override 5 protected void configure(HttpSecurity httpSecurity) throws Exception { 6 http