Spring Cloud Stream总结

概念

1、group:

组内只有1个实例消费。如果不设置group,则stream会自动为每个实例创建匿名且独立的group——于是每个实例都会消费

组内单次只有1个实例消费,并且会轮询负载均衡。通常,在将应用程序绑定到给定目标时,最好始终指定consumer group

2、destination binder:

与外部消息系统通信的组件,为构造 Binding提供了 2 个方法,分别是 bindConsumer 和 bindProducer ,它们分别用于构造生产者和消费者。Binder使Spring Cloud Stream应用程序可以灵活地连接到中间件,目前spring为kafka、rabbitmq提供binder

3、destination binding:

Binding 是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产,由binder创建

4、partition

一个或多个生产者将数据发送到多个消费者,并确保有共同特征标识的数据由同一个消费者处理。默认是对消息进行hashCode,然后根据分区个数取余,所以对于相同的消息,总会落到同一个消费者上

注:严格来说partition不属于概念,而是一种Stream提高伸缩性、吞吐量的一种方式


注解

1、@Input,使用示例:

public interface MySink {
    @Input("my-input")
    SubscribableChannel input();
}

作用:

  • 用于接收消息
  • 为每个binding生成channel实例
  • 指定input channel的名称
  • 在spring容器中生成一个名为my-input,类型为SubscribableChannel的bean
  • 在spring容器中生成一个类,实现MySink接口。

2、@Output,使用示例:

public interface MySource {
    @Output("my-output")
    MessageChannel output();
}

作用:

  • @Input类似,只不过是用来生产消息

3、@StreamListener,使用示例:

@StreamListener(value = Sink.INPUT, condition = "headers[‘type‘]==‘dog‘")
public void receive(String messageBody) {
    log.info("Received: {}", messageBody);
}

作用:

  • 用于消费消息
  • condition的作用:用于过滤消息,只有符合条件表达式的消息才会被处理
  • condition起作用的两个条件:
    • 注解的方法没有返回值
    • 方法是一个独立方法,不支持Reactive API

4、@SendTo,使用示例:

// 接收INPUT这个channel的消息,并将返回值发送到OUTPUT这个channel
@StreamListener(Sink.INPUT)
@SendTo(Source.OUTPUT)
public String receive(String receiveMsg) {
   return "handle...";
}

作用:

  • 用于发送消息

4、@InboundChannelAdapter,使用示例:

@Bean
@InboundChannelAdapter(value = Source.OUTPUT,
        poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))
public MessageSource<String> producer() {
    return () -> new GenericMessage<>("Hello Spring Cloud Stream");
}

作用:

  • 让添加该注解的方法生产消息
  • fixedDelay:多少毫秒发送1次
  • maxMessagesPerPoll:每次发送多少条消息

5、@ServiceActivator,使用示例:

@ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT)
public String transform(String payload) {
    return payload.toUpperCase();
}

作用:

  • 标注该注解的方法能够处理消息或消息的有效内容,通过监听input消息,用方法体的代码处理后,输出到output中

6、@Transformer,使用示例:

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Object transform(String message) {
  return message.toUpperCase();
}

作用:

  • @ServiceActivator类似,标注该注解的方法能够转换消息,消息头,或消息有效内容

PollableMessageSource

PollableMessageSource允许消费者可以控制消费速率。举个例子简单演示一下,首先定义一个接口:

public interface PolledProcessor {
    @Input("pollable-input")
    PollableMessageSource input();
}

使用示例:

@Autowired
private PolledProcessor polledProcessor;

@Scheduled(fixedDelay = 5_000)
public void poll() {
    polledProcessor.input().poll(message -> {
        byte[] bytes = (byte[]) message.getPayload();
        String payload = new String(bytes);
        System.out.println(payload);
    });
}


参考:

https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers

原文地址:https://blog.51cto.com/zero01/2428649

时间: 2024-10-09 02:11:42

Spring Cloud Stream总结的相关文章

Spring Cloud Stream教程(五)编程模型

本节介绍Spring Cloud Stream的编程模型.Spring Cloud Stream提供了许多预定义的注释,用于声明绑定的输入和输出通道,以及如何收听频道. 声明和绑定频道 触发绑定@EnableBinding 您可以将Spring应用程序转换为Spring Cloud Stream应用程序,将@EnableBinding注释应用于应用程序的配置类之一.@EnableBinding注释本身使用@Configuration进行元注释,并触发Spring Cloud Stream基础架构

Spring Cloud Stream教程(四)消费群体

虽然发布订阅模型可以轻松地通过共享主题连接应用程序,但通过创建给定应用程序的多个实例来扩展的能力同样重要.当这样做时,应用程序的不同实例被放置在竞争的消费者关系中,其中只有一个实例预期处理给定消息. Spring Cloud Stream通过消费者组的概念来模拟此行为.(Spring Cloud Stream消费者组与Kafka消费者组相似并受到启发.)每个消费者绑定可以使用spring.cloud.stream.bindings..group属性来指定组名称.对于下图所示的消费者,此属性将设置

Spring Cloud Stream教程(三)持续发布 - 订阅支持

应用之间的通信遵循发布订阅模式,其中通过共享主题广播数据.这可以在下图中看到,它显示了一组交互式的Spring Cloud Stream应用程序的典型部署. SCSt传感器图6. Spring Cloud Stream Publish-Subscribe传感器向HTTP端点报告的数据将发送到名为raw-sensor-data的公共目标.从目的地,它由微服务应用程序独立处理,该应用程序计算时间窗口平均值,以及另一个将原始数据导入HDFS的微服务应用程序. 为了处理数据,两个应用程序在运行时将主题声

译:基于Spring Cloud Stream构建和测试 message-driven 微服务

原文链接:https://piotrminkowski.wordpress.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/ 作者: Piotr Mińkowski 译者: helloworldtang img Spring Boot和Spring Cloud为您提供了一个利用不同的通信方式快速构建微服务的解决方案.您可以基于Spring Cloud Netfli

Spring Cloud Stream消费失败后的处理策略(三):使用DLQ队列(RabbitMQ)

应用场景 前两天我们已经介绍了两种Spring Cloud Stream对消息失败的处理策略: 自动重试:对于一些因环境原因(如:网络抖动等不稳定因素)引发的问题可以起到比较好的作用,提高消息处理的成功率. 自定义错误处理逻辑:如果业务上,消息处理失败之后有明确的降级逻辑可以弥补的,可以采用这种方式,但是2.0.x版本有Bug,2.1.x版本修复. 那么如果代码本身存在逻辑错误,无论重试多少次都不可能成功,也没有具体的降级业务逻辑,之前在深入思考中讨论过,可以通过日志,或者降级逻辑记录的方式把错

Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)

应用场景 之前我们已经通过<Spring Cloud Stream消费失败后的处理策略(一):自动重试>一文介绍了Spring Cloud Stream默认的消息重试功能.本文将介绍RabbitMQ的binder提供的另外一种重试功能:重新入队. 动手试试 准备一个会消费失败的例子,可以直接沿用前文的工程,也可以新建一个,然后创建如下代码的逻辑: @EnableBinding(TestApplication.TestTopic.class) @SpringBootApplication pub

(十七)JAVA springcloud ssm b2b2c多用户商城系统-消息驱动 Spring Cloud Stream

在使用spring cloud云架构的时候,我们不得不使用Spring cloud Stream,因为消息中间件的使用在项目中无处不在,我们公司后面做了娱乐方面的APP,在使用spring cloud做架构的时候,其中消息的异步通知,业务的异步处理都需要使用消息中间件机制.spring cloud的官方给出的集成建议(使用rabbit mq和kafka),我看了一下源码和配置,只要把rabbit mq集成,kafka只是换了一个pom配置jar包而已,闲话少说,我们就直接进入配置实施: 1. 简

Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

应用场景 有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的: @StreamListener(value = TestTopic.INPUT) public void receiveV1(String payload, @Header("version") String version) { if("1.0".equals(ve

第十章 消息驱动的微服务: Spring Cloud Stream

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架. 它可以基于Spring Boot 来创建独立的. 可用于生产的 Spring 应用程序. 它通过使用 Spring Integration 来连接消息代理中间件以实现消息事件驱动. Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅. 消费组以及分区这三个核心概念. 简单地说, Spring Cloud Stream 本质上就是整合了 Spr

Spring Cloud Stream在同一通道根据消息内容分发不同的消费逻辑

应用场景 有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的: @StreamListener(value = TestTopic.INPUT) public void receiveV1(String payload, @Header("version") String version) { if("1.0".equals(ve