译:基于Spring Cloud Stream构建和测试 message-driven 微服务

原文链接:https://piotrminkowski.wordpress.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/

作者: Piotr Mińkowski

译者: helloworldtang

img

Spring Boot和Spring Cloud为您提供了一个利用不同的通信方式快速构建微服务的解决方案。您可以基于Spring Cloud Netflix库创建同步REST微服务,正如我在之前的一篇文章中所展示的那样 使用Spring Boot 2.0, Eureka and Spring Cloud快速搭建微服务指南。您可以使用Spring WebFlux项目在Netty上创建异步的、响应式的微服务,并将其与一些Spring Cloud库相结合,如我的文章所示 使用Spring WebFlux and Spring Cloud搭建响应式微服务。最后,您可以使用Spring Cloud Stream和类似Apache Kafka或RabbitMQ这样的broker来实现基于发布/订阅模型的message-driven微服务。构建微服务的最后一种方法是本文的主要主题。我将向您展示如何在RabbitMQ broker的基础上有效地构建、扩展、运行和测试消息传递微服务。

体系结构
为了演示Spring Cloud Stream的特性,我们将设计一个示例系统,该系统使用发布/订阅模型进行跨服务通信。我们有三个微服务:order-service、product-service和account-service。应用程序order-service暴露了负责处理发送到我们系统的订单的HTTP endpoint。所有传入的订单都是异步处理的——order-service准备并发送消息到RabbitMQ exchange,然后就对调用的客户端进行响应,不需要等到消息被消费后再响应。应用程序的account-service和product-service正在侦听进入该RabbitMQ exchange的订单消息。微服务account-service负责检查客户账户是否有足够的资金来支付该订单需要的金额,如果有就从该账户扣款。微服务product-service检查是否有足够的库存,并在处理订单后改变可用产品的数量。account-service 和 product-service 都通过RabbitMQ exchange(这一次是使用direct exchange的一对一通信)发送带有操作状态的异步响应。微服务 order-service根据接收到的响应消息来更新订单状态,并通过REST endpoint GET /order/{id}提供给外部客户端。

如果您觉得我们的示例描述有点难以理解,这里有一个用于澄清的架构图。

stream-1

启用 Spring Cloud Stream
在项目中使用Spring Cloud Stream的推荐方法是使用依赖管理系统。Spring Cloud Stream有一个与整个Spring Cloud framework相关,并且独立发布的依赖管理。然而,如果我们已经在Elmhurst.RELEASE版本的dependencyManagement部分声明了spring-cloud-dependencies,就不需要在pom.xml中声明任何其他内容。如果您喜欢只使用Spring Cloud Stream项目,那么您应该定义以下部分。

下一步是将spring-cloud-streamartifact添加到项目依赖项中。我还建议您至少包括spring-cloud-sleuth 库,以提供作为源请求进入order-service 的发送消息用的traceId。

Spring Cloud Stream 编程模型
为了使您的应用程序能够连接到一个message broker,请在主类上使用@EnableBinding注解。 @EnableBinding注解将一个或多个接口作为参数。您可以在Spring Cloud Stream提供的三个接口之间进行选择:

Sink:这是用来标记从入站通道接收消息的服务。
Source: 这是用来向出站通道发送消息的。
Processor:当你需要一个入站通道和一个出站通道时,它可以被使用,因为它继承了Source and Sink接口。因为order-service发送消息,并接收它们,它的主类已经使用了@EnableBinding(Processor.class)注解。
下面是order-service项目中启用了Spring Cloud Stream binding的主类。
@SpringBootApplication
@EnableBinding(Processor.class)
public class OrderApplication {
...
public static void main(String[] args) {
new SpringApplicationBuilder(OrderApplication.class).web(true).run(args);
}
...
}
增加 message broker
在Spring Cloud Stream术语中,负责与特定message broker集成的实现称为binder。默认情况下,Spring Cloud Stream为 Kafka and RabbitMQ提供了binder实现。它能够自动检测和在类路径上查找binder。任何特定于中间件的设置都可以通过Spring Boot支持的外部配置属性来覆盖,譬如应用程序参数、环境变量,或者仅仅是application.yml文件。为了包含对RabbitMQ的支持,RabbitMQ将这篇文章用作message broker,您应该向项目添加以下依赖项。

现在,我们的应用程序需要连接RabbitMQ broker的一个共享实例。这就是为什么我使用RabbitMQ在默认的5672端口上运行Docker镜像。它还可以在地址http://192.168.99.100:15672(http://192.168.99.100:15672/)下启动web仪表板。

我们需要通过设置属性 spring.rabbitmq.host为Docker机器IP 192.168.99.100 ,来覆盖Spring Boot application的中的默认设置。

实现消息驱动的微服务
Spring Cloud Stream是在Spring Integration项目之上构建的。Spring Integration扩展了Spring编程模型,以支持众所周知的企业集成模式(EIP)。EIP定义了许多在分布式系统中经常使用的经典组件。您可能已经听说过诸如消息通道、路由器、聚合器或endpoints之类的模式。让我们回到上面的例子。让我们从order-service开始,它负责接收订单,将它们发布在shared topic上,然后从下游服务收集异步响应。下面是@service,它使用Sourcebean来构建消息并将其发布到远程topic。

这个 @Service 是由controller调用,controller暴露提交新订单和通过 id获得订单状态的HTTP endpoints。

现在,让我们更仔细地看看消费端。来自order-service的OrderSender bean所发送的消息是由 account-service和product-service接收。为了从 topic exchange中接收消息,我们只需要在入参为Order的方法上添加 @StreamListener注解。我们还必须为监听器定义目标通道——在这种情况下,它是Processor.INPUT。譬如:

@StreamListener(Processor.INPUT)
public void receiveOrder(Order order) throws JsonProcessingException {
LOGGER.info("Order received: {}", mapper.writeValueAsString(order));
service.process(order);
}
接收订单由AccountServicebean处理。account-service会根据客户账户上是否有足够的资金来实现订单接受或拒绝订单。验收状态的响应通过OrderSenderbean调用的输出通道发回order-service 。

最后一步是配置。它是在 application.yml中提供的。我们必须正确地定义通道的destination。而order-service则将orders-outdestination分配给输出通道,而orders-indestination则是输入通道,account-service和 product-service则恰恰相反。这是合乎逻辑的,因为通过其输出destination通过 order-service发送的消息是通过其输入destination接收的服务接收的。但在shared broker’s exchange中,它仍然是相同的destination。下面是 order-service的配置设置。

spring:
application:
name: order-service
rabbitmq:
host: 192.168.99.100
port: 5672
cloud:
stream:
bindings:
output:
destination: orders-out
producer:
partitionKeyExpression: payload.customerId
partitionCount: 2
input:
destination: orders-in
rabbit:
bindings:
input:
consumer:
exchangeType: direct
这是为 account-service和product-service提供的配置。

最后,您可以运行上面示例中的微服务。现在,我们只需要运行每个微服务的单个实例。您可以通过运行JUnit测试类OrderControllerTest来轻松地生成一些测试请求,这是在我的源代码库中提供的 order-service中提供的。这种情况下很简单。在下一篇文章中,我们将学习更高级的示例,其中包含多个正在运行的消费服务实例。

扩展
为了扩展我们的Spring Cloud Stream应用程序,我们只需要启动每个微服务的附加实例。他们仍然会侦听与当前正在运行的实例相同的 topic exchange 中的传入消息。在添加了一个 account-service和 product-service的实例之后,我们可以发送一个测试订单。这个测试的结果对我们来说是不令人满意的… 为什么?每个微服务运行的所有实例都接收到了这个订单。这正是 topic exchanges 的工作方式——发送到topic的消息被所有的消费者接收,他们正在侦听这个topic。幸运的是,Spring Cloud Stream能够通过提供称为 consumer group的解决方案来解决这个问题。它负责保证一个消息只被一个实例处理,如果它们被放置在一个相互竞争的消费者关系中。在运行多项服务实例时,对consumer group机制的转换已经在下图中可视化了。

stream-2

一个 consumer group 机制的配置不是很困难。我们只需要设定group参数,并给出给定destination的组名。下面是account-service的当前binding配置。orders-indestination地是一个为直接与order-service通信而创建的队列,因此只有orders-out被分组使用spring.cloud.stream.bindings..group属性。

Consumer group机制是Apache Kafka的一个概念,它也在Spring Cloud Stream中实现,也适用于RabbitMQ broker,它本身并不支持它。因此,我认为它在RabbitMQ上的配置非常有趣。如果您在destination运行两个服务实例,而没有在destination设置组名,那么就会有两个为单个交易所创建的bindings(每个实例一个bindings),如下图所示。因为有两个应用程序在这个exchange中监听,总共有四个binding分配给那个exchange。

stream-3

如果您为选定的destination Spring Cloud Stream设置组名,则将为给定服务的所有运行实例创建单一binding。binding的名称将以组名为后缀。

B08597_11_06

因为,我们已经在项目依赖项中包含了 spring-cloud-starter-sleuth ,在实现 order-service POST endpoint的单个请求时,在交换的所有异步请求之间发送相同的 traceId 头部。由于这个原因,我们可以使用Elastic Stack (Kibana)轻松地将所有日志关联起来。

B08597_11_05

自动化测试
您可以轻松地测试您的微服务,而不需要连接到message broker。要实现它,您需要将 spring-cloud-stream-test-support包含到您的项目依赖项中。它包含 TestSupportBinderbean,它允许您与绑定通道进行交互,并检查应用程序发送和接收的任何消息。

在测试类中,我们需要声明 MessageCollectorbean,它负责接收由TestSupportBinder保留的消息。这是我的account-service测试类。使用Processorbean,我将测试订单发送到输入通道。然后,MessageCollector接收到通过输出通道发送回 order-service 的消息。测试方法的 testAccepted创建了应该被帐户服务接受的顺序,而testRejected方法则设置了过高的订单价格,从而导致拒绝订单。

总结
当您不需要来自API的同步响应时,Message-driven的微服务是一个不错的选择。在本文中,我展示了在您的微服务之间的跨服务通信中发布/订阅模型的示例用例。源代码在GitHub上是常见的(https://github.com/helloworldtang/sample-message-driven-microservices.git【原文源码maven不能运行,这个项目fork原代码并修复了错误】)。对于使用Spring Cloud Stream库、Apache Kafka的更有趣的例子,您可以参考我的书中第11章, Mastering Spring Cloud(https://www.packtpub.com/application-development/mastering-spring-cloud)。

关注社区公号,加入社区纯技术微信群

原文地址:https://www.cnblogs.com/softidea/p/9310889.html

时间: 2024-08-01 21:53:53

译:基于Spring Cloud Stream构建和测试 message-driven 微服务的相关文章

实战:基于spring cloud + docker构建微服务

本系列记录学习 spring-cloud-microservice-example的实战过程,并对利用spring cloud + docker 构建端到端的微服务架构技术进行解析. 0.安装前的准备,下列软件需要安装. Maven 3 Java 8 Docker Docker Compose 我的环境 Ubuntu 16.04 Java openjdk 1.8.0 Docker 18.03.1-ce docker-compose 1.8.0 1.克隆或复制工程 $ docker clone h

spring cloud实战与思考(二) 微服务之间通过fiegn上传多个文件1

需求场景: 微服务之间调用接口一次性上传多个文件. 上传文件的同时附带其他参数. 多个文件能有效的区分开,以便进行不同处理. Spring cloud的微服务之间接口调用使用Feign.原装的Feign不支持文件的传输.需要借助"Feign-form"库才行.但是貌似"Feign-form"库(至少是3.0.3版本)只支持单文件上传.在接口中使用多文件参数时会报异常: feign.codec.EncodeException: class [Lorg.springfr

spring cloud实战与思考(三) 微服务之间通过fiegn上传一组文件(下)

需求场景: 用户调用微服务1的接口上传一组图片和对应的描述信息.微服务1处理后,再将这组图片上传给微服务2进行处理.各个微服务能区分开不同的图片进行不同处理. 上一篇博客已经讨论了在微服务之间传递一组图片和对应参数的解决方案.现在来看看如何对组内文件进行区分.当前项目中使用了"commons-fileupload"和"feign-form"两个库进行文件传输. "commons-fileupload"库可以将http request转换成&quo

第十章 消息驱动的微服务: Spring Cloud Stream

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架. 它可以基于Spring Boot 来创建独立的. 可用于生产的 Spring 应用程序. 它通过使用 Spring Integration 来连接消息代理中间件以实现消息事件驱动. Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅. 消费组以及分区这三个核心概念. 简单地说, Spring Cloud Stream 本质上就是整合了 Spr

Spring Cloud Stream 进行服务之间的通讯

Spring Cloud Stream Srping cloud Bus的底层实现就是Spring Cloud Stream,Spring Cloud Stream的目的是用于构建基于消息驱动(或事件驱动)的微服务架构.Spring Cloud Stream本身对Spring Messaging.Spring Integration.Spring Boot Actuator.Spring Boot Externalized Configuration等模块进行封装(整合)和扩展,下面我们实现两个

基于Spring Cloud微服务架构

1. 微服务简介 1.1 什么是微服务架构 微服务架构是系统架构上的一种设计风格 将大系统拆分成N个小型服务 这些小型服务都在各自的线程中运行 小服务间通过HTTP协议进行通信 有自己的数据存储.业务开发.自动化测试和独立部署机制 可以由不同语言编写 小结:微服务架构的思想,不只是停留在开发阶段,它贯穿了设计,研发,测试,发布,运维等各个软件生命周期. 2. 架构体系 架构样例: 2.1 微服务发布--持续集成 3. 微服务架构九大特性 服务组件化-- 组件是可独立更换.升级的单元.就像PC中的

基于Spring Cloud的微服务构建学习-3 服务治理:Spring Cloud Eureka

基于Spring Cloud的微服务构建学习-3 服务治理:Spring Cloud Eureka 什么是服务治理 服务治理可以说是微服务架构中最为核心和基础的模块,它主要用来实现各个微服务实例的自动化注册与发现. 为什么需要服务治理模块 在最初构建微服务系统的时候可能服务并不多,我们可以通过做一些静态配置来完成服务调用 此时看着一切都还正常. 随着项目逐渐接近尾声,维护人员需要维护的服务越来越多,越来越复杂,最终形成大量的配置文件,维护将会变得越来越困难.此时,微服务应用实例自动化管理框架变得

基于Spring Cloud的微服务构建学习-2 Spring Boot

基于Spring Cloud的微服务构建学习-2 Spring Boot 为什么使用Spring Boot而不是Spring Spring Boot具有自动化配置,快速开发,轻松部署优点,非常适合用作微服务架构中各项具体微服务的开发框架.它不仅可以帮助我们快速的构建微服务,还可以轻松简单的整合Spring Cloud实现系统服务化,而如果使用了传统的Spring构建方式的话,在整合过程中我们还需要做更多的依赖管理工作才能让它们完好的运行起来. Spring Boot的宗旨并非是重写Spring或

基于Spring Cloud的微服务落地

请点击此处输入图片描述 微服务架构模式的核心在于如何识别服务的边界,设计出合理的微服务.但如果要将微服务架构运用到生产项目上,并且能够发挥该架构模式的重要作用,则需要微服务框架的支持. 在Java生态圈,目前使用较多的微服务框架就是集成了包括Netfilix OSS以及Spring的Spring Cloud.它包括: Spring Cloud Config:配置管理工具,支持使用Git存储配置内容,可以实现应用配置的外部化存储,支持客户端配置信息刷新.加密/解密配置内容等. Spring Clo