Spring Cloud Stream异常处理

应用处理

当消费者在处理接收到的消息时,有可能会由于某些原因而抛出异常。若希望对抛出来的异常进行处理的话,就需要采取一些异常处理手段,异常处理的方式可分为三种:应用层面的处理、系统层面的处理以及通过RetryTemplate进行处理。

本小节先来介绍较为常用的应用层面的异常处理方式,该方式又细分为局部处理和全局处理。

局部处理

Stream相关的配置内容如下:

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 192.168.190.129:9876
      bindings:
        input:
          destination: stream-test-topic
          group: binder-group

所谓局部处理就是针对指定的channel进行处理,需要定义一个处理异常的方法,并在该方法上添加@ServiceActivator注解,该注解有一个inputChannel属性,用于指定对哪个channel进行处理,格式为{destination}.{group}.errors。具体代码如下:

package com.zj.node.usercenter.rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Service;

/**
 * 消费者
 *
 * @author 01
 * @date 2019-08-10
 **/
@Slf4j
@Service
public class TestStreamConsumer {

    @StreamListener(Sink.INPUT)
    public void receive1(String messageBody) {
        log.info("消费消息,messageBody = {}", messageBody);
        throw new IllegalArgumentException("参数错误");
    }

    /**
     * 处理局部异常的方法
     *
     * @param errorMessage 异常消息对象
     */
    @ServiceActivator(
        // 通过特定的格式指定处理哪个channel的异常
        inputChannel = "stream-test-topic.binder-group.errors"
    )
    public void handleError(ErrorMessage errorMessage) {
        // 获取异常对象
        Throwable errorMessagePayload = errorMessage.getPayload();
        log.error("发生异常", errorMessagePayload);

        // 获取消息体
        Message<?> originalMessage = errorMessage.getOriginalMessage();
        if (originalMessage != null) {
            log.error("消息体: {}", originalMessage.getPayload());
        } else {
            log.error("消息体为空");
        }
    }
}


全局处理

全局处理则是可以处理所有channel抛出来的异常,所有的channel抛出异常后会生成一个ErrorMessage对象,即错误消息。错误消息会被放到一个专门的channel里,这个channel就是errorChannel。所以通过监听errorChannel就可以实现全局异常的处理。具体代码如下:

@StreamListener(Sink.INPUT)
public void receive1(String messageBody) {
    log.info("消费消息,messageBody = {}", messageBody);
    throw new IllegalArgumentException("参数错误");
}

/**
 * 处理全局异常的方法
 *
 * @param errorMessage 异常消息对象
 */
@StreamListener("errorChannel")
public void handleError(ErrorMessage errorMessage) {
    log.error("发生异常. errorMessage = {}", errorMessage);
}

系统处理

系统处理方式,因消息中间件的不同而异。如果应用层面没有配置错误处理,那么error将会被传播给binder,而binder则会将error回传给消息中间件。消息中间件可以选择:

  • 丢弃消息:错误消息将被丢弃。虽然在某些情况下可以接受,但这种方式一般不适用于生产
  • requeue(重新排队,从而重新处理)
  • 将失败的消息发送给DLQ(死信队列)

DLQ

目前RabbitMQ对DLQ的支持比较好,这里以RabbitMQ为例,只需要添加DLQ相关的配置:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: stream-test-topic
          group: binder-group
      rabbit:
        bindings:
          input:
            consumer:
              # 自动将失败的消息发送给DLQ
              auto-bind-dlq: true

消息消费失败后,就会放入死信队列。在控制台操作一下,即可将死信放回消息队列,这样,客户端就可以重新处理。

如果想获取原始错误的异常堆栈,可添加如下配置:

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          input:
            consumer:
              republish-to-dlq: true


requeue

Rabbit及Kafka的binder依赖RetryTemplate实现消息重试,从而提升消息处理的成功率。然而,如果设置了spring.cloud.stream.bindings.input.consumer.max-attempts=1 ,那么RetryTemplate则不会再重试。此时可以通过requeue方式来处理异常。

需要添加如下配置:

# 默认是3,设为1则禁用重试
spring.cloud.stream.bindings.<input channel名称>.consumer.max-attempts=1
# 表示是否要requeue被拒绝的消息(即:requeue处理失败的消息)
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true

这样,失败的消息将会被重新提交到同一个handler进行处理,直到handler抛出 AmqpRejectAndDontRequeueException 异常为止。


RetryTemplate

RetryTemplate主要用于实现消息重试,也是错误处理的一种手段。有两种配置方式,一种是通过配置文件进行配置,如下示例:

spring:
  cloud:
    stream:
      bindings:
        <input channel名称>:
          consumer:
            # 最多尝试处理几次,默认3
            maxAttempts: 3
            # 重试时初始避退间隔,单位毫秒,默认1000
            backOffInitialInterval: 1000
            # 重试时最大避退间隔,单位毫秒,默认10000
            backOffMaxInterval: 10000
            # 避退乘数,默认2.0
            backOffMultiplier: 2.0
            # 当listen抛出retryableExceptions未列出的异常时,是否要重试
            defaultRetryable: true
            # 异常是否允许重试的map映射
            retryableExceptions:
              java.lang.RuntimeException: true
              java.lang.IllegalStateException: false


另一种则是通过代码配置,在多数场景下,使用配置文件定制重试行为都是可以满足需求的,但配置文件里支持的配置项可能无法满足一些复杂需求。此时可使用代码方式配置RetryTemplate,如下示例:

@Configuration
class RetryConfiguration {
    @StreamRetryTemplate
    public RetryTemplate sinkConsumerRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(retryPolicy());
        retryTemplate.setBackOffPolicy(backOffPolicy());

        return retryTemplate;
    }

    private ExceptionClassifierRetryPolicy retryPolicy() {
        BinaryExceptionClassifier keepRetryingClassifier = new BinaryExceptionClassifier(
                Collections.singletonList(IllegalAccessException.class
                ));
        keepRetryingClassifier.setTraverseCauses(true);

        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3);
        AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy();

        ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
        retryPolicy.setExceptionClassifier(
                classifiable -> keepRetryingClassifier.classify(classifiable) ?
                        alwaysRetryPolicy : simpleRetryPolicy);

        return retryPolicy;
    }

    private FixedBackOffPolicy backOffPolicy() {
        final FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(2);

        return backOffPolicy;
    }
}

最后还需要添加一段配置:

spring.cloud.stream.bindings.<input channel名称>.consumer.retry-template-name=myRetryTemplate

注:Spring Cloud Stream 2.2才支持设置retry-template-name

原文地址:https://blog.51cto.com/zero01/2428639

时间: 2024-07-30 20:35:34

Spring Cloud Stream异常处理的相关文章

Spring Cloud Stream消费失败后的处理策略(三):使用DLQ队列(RabbitMQ)

应用场景 前两天我们已经介绍了两种Spring Cloud Stream对消息失败的处理策略: 自动重试:对于一些因环境原因(如:网络抖动等不稳定因素)引发的问题可以起到比较好的作用,提高消息处理的成功率. 自定义错误处理逻辑:如果业务上,消息处理失败之后有明确的降级逻辑可以弥补的,可以采用这种方式,但是2.0.x版本有Bug,2.1.x版本修复. 那么如果代码本身存在逻辑错误,无论重试多少次都不可能成功,也没有具体的降级业务逻辑,之前在深入思考中讨论过,可以通过日志,或者降级逻辑记录的方式把错

Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)

应用场景 之前我们已经通过<Spring Cloud Stream消费失败后的处理策略(一):自动重试>一文介绍了Spring Cloud Stream默认的消息重试功能.本文将介绍RabbitMQ的binder提供的另外一种重试功能:重新入队. 动手试试 准备一个会消费失败的例子,可以直接沿用前文的工程,也可以新建一个,然后创建如下代码的逻辑: @EnableBinding(TestApplication.TestTopic.class) @SpringBootApplication pub

Spring Cloud Stream教程(五)编程模型

本节介绍Spring Cloud Stream的编程模型.Spring Cloud Stream提供了许多预定义的注释,用于声明绑定的输入和输出通道,以及如何收听频道. 声明和绑定频道 触发绑定@EnableBinding 您可以将Spring应用程序转换为Spring Cloud Stream应用程序,将@EnableBinding注释应用于应用程序的配置类之一.@EnableBinding注释本身使用@Configuration进行元注释,并触发Spring Cloud Stream基础架构

Spring Cloud Stream教程(四)消费群体

虽然发布订阅模型可以轻松地通过共享主题连接应用程序,但通过创建给定应用程序的多个实例来扩展的能力同样重要.当这样做时,应用程序的不同实例被放置在竞争的消费者关系中,其中只有一个实例预期处理给定消息. Spring Cloud Stream通过消费者组的概念来模拟此行为.(Spring Cloud Stream消费者组与Kafka消费者组相似并受到启发.)每个消费者绑定可以使用spring.cloud.stream.bindings..group属性来指定组名称.对于下图所示的消费者,此属性将设置

Spring Cloud Stream教程(三)持续发布 - 订阅支持

应用之间的通信遵循发布订阅模式,其中通过共享主题广播数据.这可以在下图中看到,它显示了一组交互式的Spring Cloud Stream应用程序的典型部署. SCSt传感器图6. Spring Cloud Stream Publish-Subscribe传感器向HTTP端点报告的数据将发送到名为raw-sensor-data的公共目标.从目的地,它由微服务应用程序独立处理,该应用程序计算时间窗口平均值,以及另一个将原始数据导入HDFS的微服务应用程序. 为了处理数据,两个应用程序在运行时将主题声

译:基于Spring Cloud Stream构建和测试 message-driven 微服务

原文链接:https://piotrminkowski.wordpress.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/ 作者: Piotr Mińkowski 译者: helloworldtang img Spring Boot和Spring Cloud为您提供了一个利用不同的通信方式快速构建微服务的解决方案.您可以基于Spring Cloud Netfli

(十七)JAVA springcloud ssm b2b2c多用户商城系统-消息驱动 Spring Cloud Stream

在使用spring cloud云架构的时候,我们不得不使用Spring cloud Stream,因为消息中间件的使用在项目中无处不在,我们公司后面做了娱乐方面的APP,在使用spring cloud做架构的时候,其中消息的异步通知,业务的异步处理都需要使用消息中间件机制.spring cloud的官方给出的集成建议(使用rabbit mq和kafka),我看了一下源码和配置,只要把rabbit mq集成,kafka只是换了一个pom配置jar包而已,闲话少说,我们就直接进入配置实施: 1. 简

Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

应用场景 有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的: @StreamListener(value = TestTopic.INPUT) public void receiveV1(String payload, @Header("version") String version) { if("1.0".equals(ve

第十章 消息驱动的微服务: Spring Cloud Stream

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架. 它可以基于Spring Boot 来创建独立的. 可用于生产的 Spring 应用程序. 它通过使用 Spring Integration 来连接消息代理中间件以实现消息事件驱动. Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅. 消费组以及分区这三个核心概念. 简单地说, Spring Cloud Stream 本质上就是整合了 Spr