Spring AMQP 错误处理策略详解

1.介绍

异步消息传递是一种松耦合的分布式通信,在事件驱动体系结构实现中越来越受欢迎。幸运的是,Spring框架提供了Spring AMQP项目,可以帮助我们构建基于AMQP的消息传递解决方案。

另一方面,在这种环境中处理错误并不简单。本文将讨论错误处理策略。

2.配置环境

这里使用RabbitMQ实现AMQP标准。此外,Spring AMQP还提供了spring-rabbit模块,让集成更容易。

RabbitMQ作为独立服务器运行。执行下面命令,在Docker容器中运行:

docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management

了解更多配置信息和项目依赖项设置,请参阅Spring AMQP教程。

3.失败场景

由于分布式特性,相比一个独立整体的应用程序,通常基于消息传递的系统发生的错误类型会更多。

下面列举了一些异常类型:

网络或I/O相关故障:网络连接和I/O操作故障;

协议或基础架构相关错误:消息传递基础架构配置错误;

代理(Broker)相关故障:客户端与AMQP代理之间配置不正确。例如,达到定义的限制或阈值、身份验证或策略配置无效。

应用程序和消息相关的异常:表现为违反某些业务或应用程序规则的异常;

当然,上述故障列表并不全面,但包含了最常见的错误类型。

应该注意到,Spring AMQP默认处理了与连接有关的问题和底层问题,开箱即用。例如,重试机制、重排队策略等。此外,大多数故障和错误都会转换为AmqpException或它的子类。

接下来,我们会主要关注应用错误和高级别错误,然后介绍全局错误处理策略。

4.配置项目

首先,定义一个简单队列并进行exchange配置:

public static final String QUEUE_MESSAGES = "baeldung-messages-queue";
public static final String EXCHANGE_MESSAGES = "baeldung-messages-exchange";

@Bean
Queue messagesQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES)
.build();
}

@Bean
DirectExchange messagesExchange() {
return new DirectExchange(EXCHANGE_MESSAGES);
}

@Bean
Binding bindingMessages() {
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
}

接下来,创建一个简单的producer:

public void sendMessage() {
rabbitTemplate
.convertAndSend(SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES,
SimpleDLQAmqpConfiguration.QUEUE_MESSAGES, "Some message id:"+ messageNumber++);
}

最后,consumer会抛出一个异常:

@RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES)
public void receiveMessage(Message message) throws BusinessException {
throw new BusinessException();
}

默认情况下,所有失败的消息会马上不断添加到目标队列。

执行Maven命令运行示例程序:

mvn spring-boot:run -Dstart-class=com.baeldung.springamqp.errorhandling.ErrorHandlingApp

现在应该看到类似下面的输出:

WARN 22260 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
Execution of Rabbit message listener failed.
Caused by: com.baeldung.springamqp.errorhandling.errorhandler.BusinessException: null

因此,默认情况下会在输出中看到无数这样的消息。

处理这种情况有两种选择:

为listener设置default-requeue-rejected参数,false-spring.rabbitmq.listener.simple.default-requeue-rejected = false。

抛出AmqpRejectAndDontRequeueException,对于将来没有意义的消息可以直接丢掉。

下面讨论如何更智能地处理失败消息。

5.死信队列

死信队列(Dead Letter Queue,DLQ)用来保存未送达或失败的消息。DLQ能帮助我们处理错误消息或不良消息,监视故障模式并从系统异常中恢复。

更重要的是,这样能够防止队列陷入处理不良消息的死循环,造成系统性能下降。

这里有两个主要概念:死信交换(DLX)和死信队列(DLQ)。实际上,DLX是一种普通exchange,可以把它定义为一种常见类型:direct, topic 或者 fanout。

producer对队列一无所知,了解这一点很重要。它只知道exchange,并且所有产生的消息都根据exchange配置和message routing key进行路由。

接下来,让我们看看如何通过应用“死信队列”处理异常。

5.1.基础配置

为了配置DLQ,需要在定义队列时指定其他参数:

@Bean
Queue messagesQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ)
.build();
}

@Bean
Queue deadLetterQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}

上面的示例中,增加了两个参数:x-dead-letter-exchange 和 x-dead-letter-routing-key。x-dead-letter-exchange使用空字符串,通知代理使用默认exchange。

第二个参数与简单消息设置routing key一样重要。该选项修改了消息初始routing key,为后面通过DLX路由做准备。

5.2.失败消息路由

当消息传递失败时,将被路由到Dead Letter Exchange(DLE)。但是正如我们已经指出的那样,DLX是一种正常的交换方式。因此,如果失败消息的routing key与exchange不匹配,那么不会传给DLQ。

Exchange: (AMQP default)
Routing Key: baeldung-messages-queue.dlq

如果上面的示例忽略x-dead-letter-routing-key,那么失败消息将会无限循环重试。

此外,消息元信息可以在x-death header中找到:

x-death:
count: 1
exchange: baeldung-messages-exchange
queue: baeldung-messages-queue
reason: rejected
routing-keys: baeldung-messages-queue
time: 1571232954

上面的信息可以在RabbitMQ管理控制台中看到,通常在本地端口15672上运行。

除了上面这样配置,如果使用Spring Cloud Stream,还可以利用republishToDlq和。autoBindDlq属性简化配置过程。

5.3.Dead Letter Exchange

上一节中已经看到,消息路由到DLE时更改了routing key。但是这种处理并不适用于所有情况。可以自己配置DLX并使用fanout类型定义:

public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";

@Bean
Queue messagesQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
.build();
}

@Bean
FanoutExchange deadLetterExchange() {
return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
}

@Bean
Queue deadLetterQueue() {
return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}

@Bean
Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}

这次我们使用fanout类型自定义exchange,因此消息将被发送到所有bind队列。此外,我们把x-dead-letter-exchange参数设为DLX的名字。同时,删除了x-dead-letter-routing-key参数。

现在运行示例时,无需更改初始routing key就能把失败消息发送到DLQ:

Exchange: baeldung-messages-queue.dlx
Routing Key: baeldung-messages-queue

5.4.处理死信队列消息

当然,把消息移到“死信队列”中的原因是为了在其他时间可以重新处理。

为“死信队列”定义一个listener:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessages(Message message) {
log.info("Received failed message: {}", message.toString());
}

现在运行示例代码,应该能够看到log输出:

WARN 11752 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
Execution of Rabbit message listener failed.
INFO 11752 --- [ntContainer#1-1] c.b.s.e.consumer.SimpleDLQAmqpContainer :
Received failed message:

收到了一条失败消息,接下来该怎么办?根据不同的系统要求、异常类型或者消息类型,答案也各有不同。

例如,可以让消息重新排队发送到原目的地址:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRequeue(Message failedMessage) {
log.info("Received failed message, requeueing: {}", failedMessage.toString());
rabbitTemplate.send(EXCHANGE_MESSAGES,
failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

但是这种异常处理逻辑与默认的重试策略没有区别:

INFO 23476 --- [ntContainer#0-1] c.b.s.e.c.RoutingDLQAmqpContainer :
Received message:
WARN 23476 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
Execution of Rabbit message listener failed.
INFO 23476 --- [ntContainer#1-1] c.b.s.e.c.RoutingDLQAmqpContainer :
Received failed message, requeueing:

通常采用的策略可能需要重试n次然后拒绝该消息。让我们利用消息头实现这种策略:

public void processFailedMessagesRetryHeaders(Message failedMessage) {
Integer retriesCnt = (Integer) failedMessage.getMessageProperties()
.getHeaders().get(HEADER_X_RETRIES_COUNT);
if (retriesCnt == null) retriesCnt = 1;
if (retriesCnt > MAX_RETRIES_COUNT) {
log.info("Discarding message");
return;
}
log.info("Retrying message for the {} time", retriesCnt);
failedMessage.getMessageProperties()
.getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
rabbitTemplate.send(EXCHANGE_MESSAGES,
failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

首先获取x-retries-count header值,然后与重试最大次数进行比较。如果计数器达到重试最大次数,则丢弃该消息:

WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer :
Retrying message for the 1 time
WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer :
Retrying message for the 2 time
WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer :
Discarding message

补充一点,还可以利用x-message-ttl header设置消息丢弃时间,超过该时间消息将被丢弃。这样可以防止队列无限增长。

5.5.停车场队列

还有一种情况需要考虑,即不能简单把消息丢弃,因为它可能是一项银行交易。有时可能需要手动处理某条消息,或者仅仅需要记录失败n次以上的消息。

对于这种情况,有一概念称作停车场队列。可以将DLQ中失败次数超过允许值的所有消息转发到停车场队列,以便进一步处理。

下面展示了对应的实现:

public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot";
public static final String EXCHANGE_PARKING_LOT = QUEUE_MESSAGES + "exchange.parking-lot";

@Bean
FanoutExchange parkingLotExchange() {
return new FanoutExchange(EXCHANGE_PARKING_LOT);
}

@Bean
Queue parkingLotQueue() {
return QueueBuilder.durable(QUEUE_PARKING_LOT).build();
}

@Bean
Binding parkingLotBinding() {
return BindingBuilder.bind(parkingLotQueue()).to(parkingLotExchange());
}

接下来重构listener逻辑,把消息发送到停车场队列:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRetryWithParkingLot(Message failedMessage) {
Integer retriesCnt = (Integer) failedMessage.getMessageProperties()
.getHeaders().get(HEADER_X_RETRIES_COUNT);
if (retriesCnt == null) retriesCnt = 1;
if (retriesCnt > MAX_RETRIES_COUNT) {
log.info("Sending message to the parking lot queue");
rabbitTemplate.send(EXCHANGE_PARKING_LOT,
failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
return;
}
log.info("Retrying message for the {} time", retriesCnt);
failedMessage.getMessageProperties()
.getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
rabbitTemplate.send(EXCHANGE_MESSAGES,
failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

最后,处理转发到停车场队列的消息:

@RabbitListener(queues = QUEUE_PARKING_LOT)
public void processParkingLotQueue(Message failedMessage) {
log.info("Received message in parking lot queue");
// Save to DB or send a notification.
}

现在可以把失败的消息保存到数据库,或者发送email通知。

运行程序进行测试:

WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer :
Retrying message for the 1 time
WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer :
Retrying message for the 2 time
WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer :
Sending message to the parking lot queue
INFO 14768 --- [ntContainer#2-1] c.b.s.e.c.ParkingLotDLQAmqpContainer :
Received message in parking lot queue

从输出中可以看到,几次尝试失败后,该消息已发送到停车场队列。

6.自定义错误处理

在上一节中,我们已经了解了如何使用专用队列和exchange处理故障。但有时可能需要捕获所有错误,记录到日志或持久化到数据库中。

6.1.全局ErrorHandler

目前为止,我们使用的是默认SimpleRabbitListenerContainerFactory,并且该工厂类默认调用ConditionalRejectingErrorHandler。handler会捕获各种不同的异常并将其转换为AmqpException继承树中的异常。

值得一提的是,如果想要处理连接错误,需要实现ApplicationListener接口。

简而言之,ConditionalRejectingErrorHandler负责指定是否拒绝特定消息。由异常触发的消息被拒绝后不会重新排队。

让我们自定义一个ErrorHandler,仅支持BusinessException重新排队:

public class CustomErrorHandler implements ErrorHandler {br/>@Override
public void handleError(Throwable t) {
if (!(t.getCause() instanceof BusinessException)) {
throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t);
}
}
}

此外,由于是在listener方法中抛出异常,因此将可以把异常包装为ListenerExecutionFailedException。这样就需要调用getCause方法来获取异常。

6.2.FatalExceptionStrategy

handler在后台使用FatalExceptionStrategy检查异常等级是否为fatal。如果为fatal,失败消息将被reject。

默认情况下,下列为fatal异常:

MessageConversionException

MessageConversionException

MethodArgumentNotValidException

MethodArgumentTypeMismatchException

NoSuchMethodException

ClassCastException

除了实现ErrorHandler接口,还可以自定义FatalExceptionStrategy:

public class CustomFatalExceptionStrategy
extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {br/>@Override
public boolean isFatal(Throwable t) {
return !(t.getCause() instanceof BusinessException);
}
}

最后,需要把自定义策略传给ConditionalRejectingErrorHandler构造函数:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setErrorHandler(errorHandler());
return factory;
}

@Bean
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(customExceptionStrategy());
}

@Bean
FatalExceptionStrategy customExceptionStrategy() {
return new CustomFatalExceptionStrategy();
}

7.结论

在文讨论了使用Spring AMQP(尤其是RabbitMQ)不同的错误处理方法。

每个系统都需要指定错误处理策略。我们已经介绍了事件驱动架构中最常见的错误处理方法。此外,文章中展示了可以组合多种策略来构建更全面、更强大的解决方案。

希望上述内容能解决您的疑惑,你也可以在下方给我留言,你的支持是我努力的肯定。

Spring AMQP 错误处理策略详解

原文地址:https://blog.51cto.com/14659362/2464129

时间: 2024-11-05 17:43:55

Spring AMQP 错误处理策略详解的相关文章

七牛云存储Python SDK使用教程 - 上传策略详解

文 七牛云存储Python SDK使用教程 - 上传策略详解 七牛云存储 python-sdk 七牛云存储教程 jemygraw 2015年01月04日发布 推荐 1 推荐 收藏 2 收藏,2.7k 浏览 本教程旨在介绍如何使用七牛的Python SDK来快速地进行文件上传,下载,处理,管理等工作. 前言 我们在上面的两节中了解到,客户端上传文件时,需要从业务服务器申请一个上传凭证(Upload Token),而这个上传凭证是业务服务器根据上传策略(PutPolicy)来生成的,而这个生成过程中

Spring Boot的启动器Starter详解

Spring Boot的启动器Starter详解 作者:chszs,未经博主允许不得转载.经许可的转载需注明作者和博客主页:http://blog.csdn.net/chszs Spring Boot应用启动器基本的一共有44种,具体如下: 1)spring-boot-starter 这是Spring Boot的核心启动器,包含了自动配置.日志和YAML. 2)spring-boot-starter-actuator 帮助监控和管理应用. 3)spring-boot-starter-amqp 通

Spring MVC 框架搭建及详解

一.Spring MVC环境搭建:(Spring 2.5.6 + Hibernate 3.2.0) 1. jar包引入 Spring 2.5.6:spring.jar.spring-webmvc.jar.commons-logging.jar.cglib-nodep-2.1_3.jar Hibernate 3.6.8:hibernate3.jar.hibernate-jpa-2.0-api-1.0.1.Final.jar.antlr-2.7.6.jar.commons-collections-3

spring声明式事务配置详解

spring声明式事务配置详解 君子不器 2013年06月16日 编程世界 5273次阅读 查看评论 理解Spring的声明式事务管理实现 本节的目的是消除与使用声明式事务管理有关的神秘性.简单点儿总是好的,这份参考文档只是告诉你给你的类加上@Transactional注解,在配置文件中添加('<tx:annotation-driven/>')行,然后期望你理解整个过程是怎么工作的.此节讲述Spring的声明式事务管理内部的工作机制,以帮助你在面对事务相关的问题时不至于误入迷途,回朔到上游平静

Android 开源框架Universal-Image-Loader完全解析(二)--- 图片缓存策略详解

本篇文章继续为大家介绍Universal-Image-Loader这个开源的图片加载框架,介绍的是图片缓存策略方面的,如果大家对这个开源框架的使用还不了解,大家可以看看我之前写的一篇文章Android 开源框架Universal-Image-Loader完全解析(一)--- 基本介绍及使用,我们一般去加载大量的图片的时候,都会做缓存策略,缓存又分为内存缓存和硬盘缓存,我之前也写了几篇异步加载大量图片的文章,使用的内存缓存是LruCache这个类,LRU是Least Recently Used 近

Spring中的jar包详解

下面给大家说说spring众多jar包的特点吧,无论对于初学spring的新手,还是spring高手,这篇文章都会给大家带来知识上的收获,如果你已经十分熟悉本文内容就当做一次温故知新吧.spring.jar 是包含有完整发布的单个jar包,spring.jar中除了spring-mock.jar里所包含的内容外其他所有jar包的内容,因为只有在研发环境下才会用到spring-mock.jar来进行辅助测试,正式应用系统中是用不得这些类的. 除了spring.jar文件,Spring还包括有其他1

深入浅出Spring(二) IoC详解

上次的博客深入浅出Spring(一)Spring概述中,我给大家简单介绍了一下Spring相关概念.重点是这么一句:Spring是为了解决企业应用开发的复杂性而创建的一个轻量级的控制反转(IoC)和面向切面(AOP)的容器框架.在这句话中重点有两个,一个是IoC,另一个是AOP.今天我们讲第一个IoC. IoC概念 控制反转(Inversion of Control)是一个重要的面向对象编程的法则来削减计算机程序的耦合问题. 它还有一个名字叫做依赖注入(Dependency Injection)

超轻量级DI容器框架Google Guice与Spring框架的区别教程详解及其demo代码片段分享

原创不易,转载请注明出处:超轻量级DI容器框架Google Guice与Spring框架的区别教程详解及其demo代码片段分享 代码下载地址:http://www.zuidaima.com/share/1759689106541568.htm 依赖注入,DI(Dependency Injection),它的作用自然不必多说,提及DI容器,例如spring,picoContainer,EJB容器等等,近日,google诞生了更轻巧的DI容器--Guice! 废话不多讲了,先看看Guice是如何实现

Spring Bean的生命周期详解

Spring Bean的生命周期详解 Spring IoC容器的本质目的就是为了管理Bean,对于Bean而言,在容器中存在其生命周期,它的初始化和销毁也需要一个过程,下面主要对其生命周期进行一个详解的解释.生命周期主要是为了了解Spring IoC容器初始化和销毁Bean的过程,通过下图即可以掌握Spring IoC容器初始化与销毁Bean的过程. 通过上图,我们首先可以看到生命周期的步骤. 1)如果Bean实现了接口 BeanNameAware 的 setBeanName 方法,那么它就会调