Spring Cloud Stream在同一通道根据消息内容分发不同的消费逻辑

  应用场景

  有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的:

  @StreamListener(value = TestTopic.INPUT)

  public void receiveV1(String payload, @Header("version") String version) {

  if("1.0".equals(version)) {

  // Version 1.0

  }

  if("2.0".equals(version)) {

  // Version 2.0

  }

  }

  那么当消息处理逻辑复杂的时候,这段逻辑就会变得特别复杂。针对这个问题,在@StreamListener注解中提供了一个不错的属性condition,可以用来优化这样的处理结构。

  动手试试

  下面通过编写一个简单的例子来具体体会一下这个属性的用法:

  @EnableBinding(TestApplication.TestTopic.class)

  @SpringBootApplication

  public class TestApplication {

  public static void main(String[] args) {

  SpringApplication.run(TestApplication.class, args);

  }

  @RestController

  static class TestController {

  @Autowired

  private TestTopic testTopic;

  /**

  * 消息生产接口

  *

  * @param message

  * @return

  */

  @GetMapping("/sendMessage")

  public String messageWithMQ(@RequestParam String message) {

  testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "1.0").build());

  testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "2.0").build());

  return "ok";

  }

  }

  /**

  * 消息消费逻辑

  */

  @Slf4j无锡妇科医院哪家好 http://wapyyk.39.net/wx/zonghe/fc96e.html/

  @Component

  static class TestListener {

  @StreamListener(value = TestTopic.INPUT, condition = "headers[‘version‘]==‘1.0‘")

  public void receiveV1(String payload, @Header("version") String version) {

  log.info("Received v1 : " + payload + ", " + version);

  }

  @StreamListener(value = TestTopic.INPUT, condition = "headers[‘version‘]==‘2.0‘")

  public void receiveV2(String payload, @Header("version") String version) {

  log.info("Received v2 : " + payload + ", " + version);

  }

  }

  interface TestTopic {

  String OUTPUT = "example-topic-output";

  String INPUT = "example-topic-input";

  @Output(OUTPUT)

  MessageChannel output();

  @Input(INPUT)

  SubscribableChannel input();

  }

  }

  内容很简单,既包含了消息的生产,也包含了消息消费。在/sendMessage接口的定义中,发送了两条消息,一条消息的头信息中包含version=1.0,另外一条消息的头信息中包含version=2.0。在消息监听类TestListener中,对TestTopic.INPUT通道定义了两个@StreamListener,这两个监听逻辑有不同的condition,这里的表达式表示会根据消息头信息中的version值来做不同的处理逻辑分发。

  在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名),比如:

  spring.cloud.stream.bindings.example-topic-input.destination=test-topic

  spring.cloud.stream.bindings.example-topic-input.group=stream-content-route

  spring.cloud.stream.bindings.example-topic-output.destination=test-topic

  完成了上面配置之后,就可以启动应用,并尝试访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了。此时可以看到类似下面的日志:

  2018-12-24 15:50:33.361 INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener : Received v1 : hello, 1.0

  2018-12-24 15:50:33.363 INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener : Received v2 : hello, 2.0

  从日志中可以看到,两条带有不同头信息的消息,分别通过不同的监听处理逻辑输出了对应的日志打印。

原文地址:https://www.cnblogs.com/djw12333/p/11026839.html

时间: 2024-12-11 00:15:01

Spring Cloud Stream在同一通道根据消息内容分发不同的消费逻辑的相关文章

Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

应用场景 有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的: @StreamListener(value = TestTopic.INPUT) public void receiveV1(String payload, @Header("version") String version) { if("1.0".equals(ve

Spring cloud stream【入门介绍】

案例代码:https://github.com/q279583842q/springcloud-e-book ??在实际开发过程中,服务与服务之间通信经常会使用到消息中间件,而以往使用了哪个中间件比如RabbitMQ,那么该中间件和系统的耦合性就会非常高,如果我们要替换为Kafka那么变动会比较大,这时我们可以使用SpringCloudStream来整合我们的消息中间件,来降低系统和中间件的耦合性. 一.什么是SpringCloudStream ??官方定义 Spring Cloud Strea

Spring Cloud - 8 (Spring Cloud Stream) 򃲖

原文: http://blog.gqylpy.com/gqy/497 置顶:来自一名75后老程序员的武林秘籍--必读(博主推荐) 来,先呈上武林秘籍链接:http://blog.gqylpy.com/gqy/401/ 你好,我是一名极客!一个 75 后的老工程师! 我将花两分钟,表述清楚我让你读这段文字的目的! 如果你看过武侠小说,你可以把这个经历理解为,你失足落入一个山洞遇到了一位垂暮的老者!而这位老者打算传你一套武功秘籍! 没错,我就是这个老者! 干研发 20 多年了!我也年轻过,奋斗过!我

Spring Cloud入门 (6) - 微服务与消息驱动

1.Spring Cloud Stream 介绍 Spring Cloud Stream 是一个用于构建消息驱动微服务的框架.使用Stream 框架,我们不必关心如何连接各个消息代理中间件,也不必关系如何进行消息的发送与接收,只需要简单的进行配置就可以实现这些功能. 2.消息代理中间件 Spring Cloud Stream 的绑定器提供了 RabbitMQ 与 Kafka 两个消息代理中间件的实现 3.安装 RabbitMQ 我这里直接使用 docker 安装,记得要安装 management

Spring Cloud Stream如何消费自己生产的消息

在上一篇<Spring Cloud Stream如何处理消息重复消费>中,我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题.本文将继续说说在另外一个被经常问到的问题:如果微服务生产的消息自己也想要消费一份,应该如何实现呢? 常见错误 在放出标准答案前,先放出一个常见的错误姿势和告警信息(以便您可以通过搜索引擎找到这里^_^).以下错误基于Spring Boot 2.0.5.Spring Cloud Finchley SR1. 首先,根据入门示例,为了生产和消费消息,需

(十七)JAVA springcloud ssm b2b2c多用户商城系统-消息驱动 Spring Cloud Stream

在使用spring cloud云架构的时候,我们不得不使用Spring cloud Stream,因为消息中间件的使用在项目中无处不在,我们公司后面做了娱乐方面的APP,在使用spring cloud做架构的时候,其中消息的异步通知,业务的异步处理都需要使用消息中间件机制.spring cloud的官方给出的集成建议(使用rabbit mq和kafka),我看了一下源码和配置,只要把rabbit mq集成,kafka只是换了一个pom配置jar包而已,闲话少说,我们就直接进入配置实施: 1. 简

第十章 消息驱动的微服务: Spring Cloud Stream

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架. 它可以基于Spring Boot 来创建独立的. 可用于生产的 Spring 应用程序. 它通过使用 Spring Integration 来连接消息代理中间件以实现消息事件驱动. Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅. 消费组以及分区这三个核心概念. 简单地说, Spring Cloud Stream 本质上就是整合了 Spr

Spring cloud stream【消息分组】

??上篇文章我们简单的介绍了stream的使用,发现使用还是蛮方便的,但是在上个案例中,如果有多个消息接收者,那么消息生产者发送的消息会被多个消费者都接收到,这种情况在某些实际场景下是有很大问题的,比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况.这时我们就可以使用Stream中的消息分组来解决了! Stream消息分组 ??消息分组的作用我们已经介绍了.注意在Stream中处于同一个gr

Spring cloud stream【消息分区】

??在上篇文章中我们给大家介绍了Stream的消息分组,可以实现消息的重复消费的问题,但在某些场景下分组还不能满足我们的需求,比如,同时有多条同一个用户的数据,发送过来,我们需要根据用户统计,但是消息被分散到了不同的集群节点上了,这时我们就可以考虑消息分区了. ??当生产者将消息数据发送给多个消费者实例时,保证同一消息数据始终是由同一个消费者实例接收和处理. Stream 消息分区 创建项目 ??将我们上篇文章中的分组的三个项目,拷贝一份修改名称及服务名称 没有分区的情况下演示 发送多条消息查看