原文链接:https://piotrminkowski.wordpress.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/
作者: Piotr Mińkowski
译者: helloworldtang
img
Spring Boot和Spring Cloud为您提供了一个利用不同的通信方式快速构建微服务的解决方案。您可以基于Spring Cloud Netflix库创建同步REST微服务,正如我在之前的一篇文章中所展示的那样 使用Spring Boot 2.0, Eureka and Spring Cloud快速搭建微服务指南。您可以使用Spring WebFlux项目在Netty上创建异步的、响应式的微服务,并将其与一些Spring Cloud库相结合,如我的文章所示 使用Spring WebFlux and Spring Cloud搭建响应式微服务。最后,您可以使用Spring Cloud Stream和类似Apache Kafka或RabbitMQ这样的broker来实现基于发布/订阅模型的message-driven微服务。构建微服务的最后一种方法是本文的主要主题。我将向您展示如何在RabbitMQ broker的基础上有效地构建、扩展、运行和测试消息传递微服务。
体系结构
为了演示Spring Cloud Stream的特性,我们将设计一个示例系统,该系统使用发布/订阅模型进行跨服务通信。我们有三个微服务:order-service、product-service和account-service。应用程序order-service暴露了负责处理发送到我们系统的订单的HTTP endpoint。所有传入的订单都是异步处理的——order-service准备并发送消息到RabbitMQ exchange,然后就对调用的客户端进行响应,不需要等到消息被消费后再响应。应用程序的account-service和product-service正在侦听进入该RabbitMQ exchange的订单消息。微服务account-service负责检查客户账户是否有足够的资金来支付该订单需要的金额,如果有就从该账户扣款。微服务product-service检查是否有足够的库存,并在处理订单后改变可用产品的数量。account-service 和 product-service 都通过RabbitMQ exchange(这一次是使用direct exchange的一对一通信)发送带有操作状态的异步响应。微服务 order-service根据接收到的响应消息来更新订单状态,并通过REST endpoint GET /order/{id}提供给外部客户端。
如果您觉得我们的示例描述有点难以理解,这里有一个用于澄清的架构图。
stream-1
启用 Spring Cloud Stream
在项目中使用Spring Cloud Stream的推荐方法是使用依赖管理系统。Spring Cloud Stream有一个与整个Spring Cloud framework相关,并且独立发布的依赖管理。然而,如果我们已经在Elmhurst.RELEASE版本的dependencyManagement部分声明了spring-cloud-dependencies,就不需要在pom.xml中声明任何其他内容。如果您喜欢只使用Spring Cloud Stream项目,那么您应该定义以下部分。
下一步是将spring-cloud-streamartifact添加到项目依赖项中。我还建议您至少包括spring-cloud-sleuth 库,以提供作为源请求进入order-service 的发送消息用的traceId。
Spring Cloud Stream 编程模型
为了使您的应用程序能够连接到一个message broker,请在主类上使用@EnableBinding注解。 @EnableBinding注解将一个或多个接口作为参数。您可以在Spring Cloud Stream提供的三个接口之间进行选择:
Sink:这是用来标记从入站通道接收消息的服务。
Source: 这是用来向出站通道发送消息的。
Processor:当你需要一个入站通道和一个出站通道时,它可以被使用,因为它继承了Source and Sink接口。因为order-service发送消息,并接收它们,它的主类已经使用了@EnableBinding(Processor.class)注解。
下面是order-service项目中启用了Spring Cloud Stream binding的主类。
@SpringBootApplication
@EnableBinding(Processor.class)
public class OrderApplication {
...
public static void main(String[] args) {
new SpringApplicationBuilder(OrderApplication.class).web(true).run(args);
}
...
}
增加 message broker
在Spring Cloud Stream术语中,负责与特定message broker集成的实现称为binder。默认情况下,Spring Cloud Stream为 Kafka and RabbitMQ提供了binder实现。它能够自动检测和在类路径上查找binder。任何特定于中间件的设置都可以通过Spring Boot支持的外部配置属性来覆盖,譬如应用程序参数、环境变量,或者仅仅是application.yml文件。为了包含对RabbitMQ的支持,RabbitMQ将这篇文章用作message broker,您应该向项目添加以下依赖项。
现在,我们的应用程序需要连接RabbitMQ broker的一个共享实例。这就是为什么我使用RabbitMQ在默认的5672端口上运行Docker镜像。它还可以在地址http://192.168.99.100:15672(http://192.168.99.100:15672/)下启动web仪表板。
我们需要通过设置属性 spring.rabbitmq.host为Docker机器IP 192.168.99.100 ,来覆盖Spring Boot application的中的默认设置。
实现消息驱动的微服务
Spring Cloud Stream是在Spring Integration项目之上构建的。Spring Integration扩展了Spring编程模型,以支持众所周知的企业集成模式(EIP)。EIP定义了许多在分布式系统中经常使用的经典组件。您可能已经听说过诸如消息通道、路由器、聚合器或endpoints之类的模式。让我们回到上面的例子。让我们从order-service开始,它负责接收订单,将它们发布在shared topic上,然后从下游服务收集异步响应。下面是@service,它使用Sourcebean来构建消息并将其发布到远程topic。
这个 @Service 是由controller调用,controller暴露提交新订单和通过 id获得订单状态的HTTP endpoints。
现在,让我们更仔细地看看消费端。来自order-service的OrderSender bean所发送的消息是由 account-service和product-service接收。为了从 topic exchange中接收消息,我们只需要在入参为Order的方法上添加 @StreamListener注解。我们还必须为监听器定义目标通道——在这种情况下,它是Processor.INPUT。譬如:
@StreamListener(Processor.INPUT)
public void receiveOrder(Order order) throws JsonProcessingException {
LOGGER.info("Order received: {}", mapper.writeValueAsString(order));
service.process(order);
}
接收订单由AccountServicebean处理。account-service会根据客户账户上是否有足够的资金来实现订单接受或拒绝订单。验收状态的响应通过OrderSenderbean调用的输出通道发回order-service 。
最后一步是配置。它是在 application.yml中提供的。我们必须正确地定义通道的destination。而order-service则将orders-outdestination分配给输出通道,而orders-indestination则是输入通道,account-service和 product-service则恰恰相反。这是合乎逻辑的,因为通过其输出destination通过 order-service发送的消息是通过其输入destination接收的服务接收的。但在shared broker’s exchange中,它仍然是相同的destination。下面是 order-service的配置设置。
spring:
application:
name: order-service
rabbitmq:
host: 192.168.99.100
port: 5672
cloud:
stream:
bindings:
output:
destination: orders-out
producer:
partitionKeyExpression: payload.customerId
partitionCount: 2
input:
destination: orders-in
rabbit:
bindings:
input:
consumer:
exchangeType: direct
这是为 account-service和product-service提供的配置。
最后,您可以运行上面示例中的微服务。现在,我们只需要运行每个微服务的单个实例。您可以通过运行JUnit测试类OrderControllerTest来轻松地生成一些测试请求,这是在我的源代码库中提供的 order-service中提供的。这种情况下很简单。在下一篇文章中,我们将学习更高级的示例,其中包含多个正在运行的消费服务实例。
扩展
为了扩展我们的Spring Cloud Stream应用程序,我们只需要启动每个微服务的附加实例。他们仍然会侦听与当前正在运行的实例相同的 topic exchange 中的传入消息。在添加了一个 account-service和 product-service的实例之后,我们可以发送一个测试订单。这个测试的结果对我们来说是不令人满意的… 为什么?每个微服务运行的所有实例都接收到了这个订单。这正是 topic exchanges 的工作方式——发送到topic的消息被所有的消费者接收,他们正在侦听这个topic。幸运的是,Spring Cloud Stream能够通过提供称为 consumer group的解决方案来解决这个问题。它负责保证一个消息只被一个实例处理,如果它们被放置在一个相互竞争的消费者关系中。在运行多项服务实例时,对consumer group机制的转换已经在下图中可视化了。
stream-2
一个 consumer group 机制的配置不是很困难。我们只需要设定group参数,并给出给定destination的组名。下面是account-service的当前binding配置。orders-indestination地是一个为直接与order-service通信而创建的队列,因此只有orders-out被分组使用spring.cloud.stream.bindings..group属性。
Consumer group机制是Apache Kafka的一个概念,它也在Spring Cloud Stream中实现,也适用于RabbitMQ broker,它本身并不支持它。因此,我认为它在RabbitMQ上的配置非常有趣。如果您在destination运行两个服务实例,而没有在destination设置组名,那么就会有两个为单个交易所创建的bindings(每个实例一个bindings),如下图所示。因为有两个应用程序在这个exchange中监听,总共有四个binding分配给那个exchange。
stream-3
如果您为选定的destination Spring Cloud Stream设置组名,则将为给定服务的所有运行实例创建单一binding。binding的名称将以组名为后缀。
B08597_11_06
因为,我们已经在项目依赖项中包含了 spring-cloud-starter-sleuth ,在实现 order-service POST endpoint的单个请求时,在交换的所有异步请求之间发送相同的 traceId 头部。由于这个原因,我们可以使用Elastic Stack (Kibana)轻松地将所有日志关联起来。
B08597_11_05
自动化测试
您可以轻松地测试您的微服务,而不需要连接到message broker。要实现它,您需要将 spring-cloud-stream-test-support包含到您的项目依赖项中。它包含 TestSupportBinderbean,它允许您与绑定通道进行交互,并检查应用程序发送和接收的任何消息。
在测试类中,我们需要声明 MessageCollectorbean,它负责接收由TestSupportBinder保留的消息。这是我的account-service测试类。使用Processorbean,我将测试订单发送到输入通道。然后,MessageCollector接收到通过输出通道发送回 order-service 的消息。测试方法的 testAccepted创建了应该被帐户服务接受的顺序,而testRejected方法则设置了过高的订单价格,从而导致拒绝订单。
总结
当您不需要来自API的同步响应时,Message-driven的微服务是一个不错的选择。在本文中,我展示了在您的微服务之间的跨服务通信中发布/订阅模型的示例用例。源代码在GitHub上是常见的(https://github.com/helloworldtang/sample-message-driven-microservices.git【原文源码maven不能运行,这个项目fork原代码并修复了错误】)。对于使用Spring Cloud Stream库、Apache Kafka的更有趣的例子,您可以参考我的书中第11章, Mastering Spring Cloud(https://www.packtpub.com/application-development/mastering-spring-cloud)。
关注社区公号,加入社区纯技术微信群
原文地址:https://www.cnblogs.com/softidea/p/9310889.html