Spring Cloud Stream教程(四)消费群体

虽然发布订阅模型可以轻松地通过共享主题连接应用程序,但通过创建给定应用程序的多个实例来扩展的能力同样重要。当这样做时,应用程序的不同实例被放置在竞争的消费者关系中,其中只有一个实例预期处理给定消息。

Spring Cloud Stream通过消费者组的概念来模拟此行为。(Spring Cloud Stream消费者组与Kafka消费者组相似并受到启发。)每个消费者绑定可以使用spring.cloud.stream.bindings..group属性来指定组名称。对于下图所示的消费者,此属性将设置为spring.cloud.stream.bindings..group=hdfsWrite或spring.cloud.stream.bindings..group=average。

SCSt组
图7. Spring Cloud Stream消费者组
订阅给定目标的所有组都会收到已发布数据的副本,但每个组中只有一个成员从该目的地接收给定的消息。默认情况下,当未指定组时,Spring Cloud Stream将应用程序分配给与所有其他消费者组发布 - 订阅关系的匿名独立单个成员消费者组。

耐久力

符合Spring Cloud Stream的有意义的应用模式,消费者群体订阅是持久的。也就是说,绑定实现确保组预订是持久的,一旦已经创建了一个组的至少一个订阅,即使组中的所有应用程序都被停止,组也将接收消息。

注意
匿名订阅本质上是不耐用的。对于某些binder实现(例如RabbitMQ),可以具有非持久组的订阅。

通常,当将应用绑定到给定目的地时,最好始终指定消费者组。在扩展Spring Cloud Stream应用程序时,必须为每个输入绑定指定一个使用者组。这样可以防止应用程序的实例收到重复的消息(除非需要这种行为,这是不寻常的)。

原文地址:http://blog.51cto.com/13590198/2069027

时间: 2024-10-01 23:46:12

Spring Cloud Stream教程(四)消费群体的相关文章

Spring Cloud 入门教程(四): 分布式环境下自动发现配置服务

前一章, 我们的Hello world应用服务,通过配置服务器Config Server获取到了我们配置的hello信息"hello world". 但自己的配置文件中必须配置config server的URL(http://localhost:8888), 如果把config server搬到另外一个独立IP上, 那么作为一个client的hello world应用必须修改自己的bootstrap.yml中的config server的URL地址.这明显是不够方便的. 既然confi

Spring Cloud Stream教程(五)编程模型

本节介绍Spring Cloud Stream的编程模型.Spring Cloud Stream提供了许多预定义的注释,用于声明绑定的输入和输出通道,以及如何收听频道. 声明和绑定频道 触发绑定@EnableBinding 您可以将Spring应用程序转换为Spring Cloud Stream应用程序,将@EnableBinding注释应用于应用程序的配置类之一.@EnableBinding注释本身使用@Configuration进行元注释,并触发Spring Cloud Stream基础架构

Spring Cloud Stream教程(三)持续发布 - 订阅支持

应用之间的通信遵循发布订阅模式,其中通过共享主题广播数据.这可以在下图中看到,它显示了一组交互式的Spring Cloud Stream应用程序的典型部署. SCSt传感器图6. Spring Cloud Stream Publish-Subscribe传感器向HTTP端点报告的数据将发送到名为raw-sensor-data的公共目标.从目的地,它由微服务应用程序独立处理,该应用程序计算时间窗口平均值,以及另一个将原始数据导入HDFS的微服务应用程序. 为了处理数据,两个应用程序在运行时将主题声

Spring Cloud 入门教程(五): Ribbon实现客户端的负载均衡

接上节,假如我们的Hello world服务的访问量剧增,用一个服务已经无法承载, 我们可以把Hello World服务做成一个集群. 很简单,我们只需要复制Hello world服务,同时将原来的端口8762修改为8763.然后启动这两个Spring Boot应用, 就可以得到两个Hello World服务.这两个Hello world都注册到了eureka服务中心.这时候再访问http://localhost:8761, 可以看到两个hello world服务已经注册.(服务与注册参见Spr

Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)

应用场景 之前我们已经通过<Spring Cloud Stream消费失败后的处理策略(一):自动重试>一文介绍了Spring Cloud Stream默认的消息重试功能.本文将介绍RabbitMQ的binder提供的另外一种重试功能:重新入队. 动手试试 准备一个会消费失败的例子,可以直接沿用前文的工程,也可以新建一个,然后创建如下代码的逻辑: @EnableBinding(TestApplication.TestTopic.class) @SpringBootApplication pub

Spring Cloud Stream消费失败后的处理策略(三):使用DLQ队列(RabbitMQ)

应用场景 前两天我们已经介绍了两种Spring Cloud Stream对消息失败的处理策略: 自动重试:对于一些因环境原因(如:网络抖动等不稳定因素)引发的问题可以起到比较好的作用,提高消息处理的成功率. 自定义错误处理逻辑:如果业务上,消息处理失败之后有明确的降级逻辑可以弥补的,可以采用这种方式,但是2.0.x版本有Bug,2.1.x版本修复. 那么如果代码本身存在逻辑错误,无论重试多少次都不可能成功,也没有具体的降级业务逻辑,之前在深入思考中讨论过,可以通过日志,或者降级逻辑记录的方式把错

Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

应用场景 有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的: @StreamListener(value = TestTopic.INPUT) public void receiveV1(String payload, @Header("version") String version) { if("1.0".equals(ve

Spring Cloud Stream在同一通道根据消息内容分发不同的消费逻辑

应用场景 有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的: @StreamListener(value = TestTopic.INPUT) public void receiveV1(String payload, @Header("version") String version) { if("1.0".equals(ve

Spring Cloud Stream如何消费自己生产的消息

在上一篇<Spring Cloud Stream如何处理消息重复消费>中,我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题.本文将继续说说在另外一个被经常问到的问题:如果微服务生产的消息自己也想要消费一份,应该如何实现呢? 常见错误 在放出标准答案前,先放出一个常见的错误姿势和告警信息(以便您可以通过搜索引擎找到这里^_^).以下错误基于Spring Boot 2.0.5.Spring Cloud Finchley SR1. 首先,根据入门示例,为了生产和消费消息,需