Spring Cloud Stream 进行服务之间的通讯

Spring Cloud Stream

Srping cloud Bus的底层实现就是Spring Cloud Stream,Spring Cloud Stream的目的是用于构建基于消息驱动(或事件驱动)的微服务架构。Spring Cloud Stream本身对Spring Messaging、Spring Integration、Spring Boot Actuator、Spring Boot Externalized Configuration等模块进行封装(整合)和扩展,下面我们实现两个服务之间的通讯来演示Spring Cloud Stream的使用方法。

整体概述


服务要想与其他服务通讯要定义通道,一般会定义输出通道和输入通道,输出通道用于发送消息,输入通道用于接收消息,每个通道都会有个名字(输入和输出只是通道类型,可以用不同的名字定义很多很多通道),不同通道的名字不能相同否则会报错(输入通道和输出通道不同类型的通道名称也不能相同),绑定器是操作RabbitMQ或Kafka的抽象层,为了屏蔽操作这些消息中间件的复杂性和不一致性,绑定器会用通道的名字在消息中间件中定义主题,一个主题内的消息生产者来自多个服务,一个主题内消息的消费者也是多个服务,也就是说消息的发布和消费是通过主题进行定义和组织的,通道的名字就是主题的名字,在RabbitMQ中主题使用Exchanges实现,在Kafka中主题使用Topic实现。

准备环境

创建两个项目spring-cloud-stream-a和spring-cloud-stream-b,spring-cloud-stream-a我们用Spring Cloud Stream实现通讯,spring-cloud-stream-b我们用Spring Cloud Stream的底层模块Spring Integration实现通讯。
两个项目的POM文件依赖都是:

<dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

spring-cloud-stream-binder-rabbit是指绑定器的实现使用RabbitMQ。

项目配置内容application.properties:

spring.application.name=spring-cloud-stream-a
server.port=9010

#设置默认绑定器
spring.cloud.stream.defaultBinder = rabbit

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.application.name=spring-cloud-stream-b
server.port=9011

#设置默认绑定器
spring.cloud.stream.defaultBinder = rabbit

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

启动一个rabbitmq:

docker pull rabbitmq:3-management

docker run -d --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

编写A项目代码

在A项目中定义一个输入通道一个输出通道,定义通道在接口中使用@Input和@Output注解定义,程序启动的时候Spring Cloud Stream会根据接口定义将实现类自动注入(Spring Cloud Stream自动实现该接口不需要写代码)。
A服务输入通道,通道名称ChatExchanges.A.Input,接口定义输入通道必须返回SubscribableChannel:

public interface ChatInput {

    String INPUT = "ChatExchanges.A.Input";

    @Input(ChatInput.INPUT)
    SubscribableChannel input();
}

A服务输出通道,通道名称ChatExchanges.A.Output,输出通道必须返回MessageChannel:

public interface ChatOutput {

    String OUTPUT = "ChatExchanges.A.Output";

    @Output(ChatOutput.OUTPUT)
    MessageChannel output();
}

定义消息实体类:

public class ChatMessage implements Serializable {

    private String name;
    private String message;
    private Date chatDate;

    //没有无参数的构造函数并行化会出错
    private ChatMessage(){}

    public ChatMessage(String name,String message,Date chatDate){
        this.name = name;
        this.message = message;
        this.chatDate = chatDate;
    }

    public String getName(){
        return this.name;
    }

    public String getMessage(){
        return this.message;
    }

    public Date getChatDate() { return this.chatDate; }

    public String ShowMessage(){
        return String.format("聊天消息:%s的时候,%s说%s。",this.chatDate,this.name,this.message);
    }
}

在业务处理类上用@EnableBinding注解绑定输入通道和输出通道,这个绑定动作其实就是创建并注册输入和输出通道的实现类到Bean中,所以可以直接是使用@Autowired进行注入使用,另外消息的串行化默认使用application/json格式(com.fastexml.jackson),最后用@StreamListener注解进行指定通道消息的监听:

//ChatInput.class的输入通道不在这里绑定,监听到数据会找不到AClient类的引用。
//Input和Output通道定义的名字不能一样,否则程序启动会抛异常。
@EnableBinding({ChatOutput.class,ChatInput.class})
public class AClient {

    private static Logger logger = LoggerFactory.getLogger(AClient.class);

    @Autowired
    private ChatOutput chatOutput;

    //StreamListener自带了Json转对象的能力,收到B的消息打印并回复B一个新的消息。
    @StreamListener(ChatInput.INPUT)
    public void PrintInput(ChatMessage message) {

        logger.info(message.ShowMessage());

        ChatMessage replyMessage = new ChatMessage("ClientA","A To B Message.", new Date());

        chatOutput.output().send(MessageBuilder.withPayload(replyMessage).build());
    }
}

到此A项目代码编写完成。

编写B项目代码

B项目使用Spring Integration实现消息的发布和消费,定义通道时我们要交换输入通道和输出通道的名称:

public interface ChatProcessor {

    String OUTPUT = "ChatExchanges.A.Input";
    String INPUT  = "ChatExchanges.A.Output";

    @Input(ChatProcessor.INPUT)
    SubscribableChannel input();

    @Output(ChatProcessor.OUTPUT)
    MessageChannel output();
}

消息实体类:

public class ChatMessage {
    private String name;
    private String message;
    private Date chatDate;

    //没有无参数的构造函数并行化会出错
    private ChatMessage(){}

    public ChatMessage(String name,String message,Date chatDate){
        this.name = name;
        this.message = message;
        this.chatDate = chatDate;
    }

    public String getName(){
        return this.name;
    }

    public String getMessage(){
        return this.message;
    }

    public Date getChatDate() { return this.chatDate; }

    public String ShowMessage(){
        return String.format("聊天消息:%s的时候,%s说%s。",this.chatDate,this.name,this.message);
    }
}

业务处理类用@ServiceActivator注解代替@StreamListener,用@InboundChannelAdapter注解发布消息:

@EnableBinding(ChatProcessor.class)
public class BClient {

    private static Logger logger = LoggerFactory.getLogger(BClient.class);

    //@ServiceActivator没有Json转对象的能力需要借助@Transformer注解
    @ServiceActivator(inputChannel=ChatProcessor.INPUT)
    public void PrintInput(ChatMessage message) {

        logger.info(message.ShowMessage());
    }

    @Transformer(inputChannel = ChatProcessor.INPUT,outputChannel = ChatProcessor.INPUT)
    public ChatMessage transform(String message) throws Exception{
        ObjectMapper objectMapper = new ObjectMapper();
        return objectMapper.readValue(message,ChatMessage.class);
    }

    //每秒发出一个消息给A
    @Bean
    @InboundChannelAdapter(value = ChatProcessor.OUTPUT,poller = @Poller(fixedDelay="1000"))
    public GenericMessage<ChatMessage> SendChatMessage(){
        ChatMessage message = new ChatMessage("ClientB","B To A Message.", new Date());
        GenericMessage<ChatMessage> gm = new GenericMessage<>(message);
        return gm;
    }
}

运行程序

启动A项目和B项目:

源码

Github仓库:https://github.com/sunweisheng/spring-cloud-example

原文地址:https://www.cnblogs.com/bluersw/p/11675139.html

时间: 2024-11-06 07:41:50

Spring Cloud Stream 进行服务之间的通讯的相关文章

spring cloud中微服务之间的调用以及eureka的自我保护机制

上篇讲了spring cloud注册中心及客户端的注册,所以这篇主要讲一下服务和服务之间是怎样调用的 不会搭建的小伙伴请参考我上一篇博客:idea快速搭建spring cloud-注册中心与注册 基于上一篇的搭建我又自己搭建了一个客户端微服务: 所以现在有两个微服务,我们所实现的就是微服务1和微服务2之间的调用 注册中心就不用多说了,具体看一下两个微服务 application.yml配置也不用说了,不知道怎么配置的请参考我上篇博客 在project-solr中的constroller中: @R

spring cloud各个微服务之间如何相互调用(Feign、Feign带token访问服务接口)

1.首先先看什么是Feign. 这里引用“大漠知秋”的博文https://blog.csdn.net/wo18237095579/article/details/83343915 2.若其他服务的接口未做权限处理,参照上文第1点的博文即可. 3.若其他服务的接口做了权限的处理(例如OAuth 2)时该如何访问? a.有做权限处理的服务接口直接调用会造成调用时出现http 401未授权的错误,继而导致最终服务的http 500内部服务器错误 b.解决方式:最方便的就是往请求头里加上token,一起

写给新手的Spring Cloud的微服务入门教程

1. 微服务简介 1.1 什么是微服务架构 微服务架构是系统架构上的一种设计风格 将大系统拆分成N个小型服务 这些小型服务都在各自的线程中运行 小服务间通过HTTP协议进行通信 有自己的数据存储.业务开发.自动化测试和独立部署机制 可以由不同语言编写 小结:微服务架构的思想,不只是停留在开发阶段,它贯穿了设计,研发,测试,发布,运维等各个软件生命周期. 2. 架构体系 架构样例: 2.1 微服务发布--持续集成 3. 微服务架构九大特性 服务组件化-- 组件是可独立更换.升级的单元.就像PC中的

译:基于Spring Cloud Stream构建和测试 message-driven 微服务

原文链接:https://piotrminkowski.wordpress.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/ 作者: Piotr Mińkowski 译者: helloworldtang img Spring Boot和Spring Cloud为您提供了一个利用不同的通信方式快速构建微服务的解决方案.您可以基于Spring Cloud Netfli

第十章 消息驱动的微服务: Spring Cloud Stream

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架. 它可以基于Spring Boot 来创建独立的. 可用于生产的 Spring 应用程序. 它通过使用 Spring Integration 来连接消息代理中间件以实现消息事件驱动. Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅. 消费组以及分区这三个核心概念. 简单地说, Spring Cloud Stream 本质上就是整合了 Spr

java版电子商务spring cloud分布式微服务b2b2c社交电商(一)组件和概念介绍

一:什么是微服务(Microservice) 微服务英文名称Microservice,Microservice架构模式就是将整个Web应用组织为一系列小的Web服务.这些小的Web服务可以独立地编译及部署,并通过各自暴露的API接口相互通讯.它们彼此相互协作,作为一个整体为用户提供功能,却可以独立地进行扩. 微服务架构需要的功能或使用场景 1:我们把整个系统根据业务拆分成几个子系统. 2:每个子系统可以部署多个应用,多个应用之间使用负载均衡. 3:需要一个服务注册中心,所有的服务都在注册中心注册

Spring Cloud Stream教程(三)持续发布 - 订阅支持

应用之间的通信遵循发布订阅模式,其中通过共享主题广播数据.这可以在下图中看到,它显示了一组交互式的Spring Cloud Stream应用程序的典型部署. SCSt传感器图6. Spring Cloud Stream Publish-Subscribe传感器向HTTP端点报告的数据将发送到名为raw-sensor-data的公共目标.从目的地,它由微服务应用程序独立处理,该应用程序计算时间窗口平均值,以及另一个将原始数据导入HDFS的微服务应用程序. 为了处理数据,两个应用程序在运行时将主题声

基于Spring Cloud的微服务落地

请点击此处输入图片描述 微服务架构模式的核心在于如何识别服务的边界,设计出合理的微服务.但如果要将微服务架构运用到生产项目上,并且能够发挥该架构模式的重要作用,则需要微服务框架的支持. 在Java生态圈,目前使用较多的微服务框架就是集成了包括Netfilix OSS以及Spring的Spring Cloud.它包括: Spring Cloud Config:配置管理工具,支持使用Git存储配置内容,可以实现应用配置的外部化存储,支持客户端配置信息刷新.加密/解密配置内容等. Spring Clo

Spring cloud stream【入门介绍】

案例代码:https://github.com/q279583842q/springcloud-e-book ??在实际开发过程中,服务与服务之间通信经常会使用到消息中间件,而以往使用了哪个中间件比如RabbitMQ,那么该中间件和系统的耦合性就会非常高,如果我们要替换为Kafka那么变动会比较大,这时我们可以使用SpringCloudStream来整合我们的消息中间件,来降低系统和中间件的耦合性. 一.什么是SpringCloudStream ??官方定义 Spring Cloud Strea