SpringCloud学习之Stream消息驱动【默认通道】(十)

在实际开发过程中,服务与服务之间通信经常会使用到消息中间件,而以往使用了中间件比如RabbitMQ,那么该中间件和系统的耦合性就会非常高,如果我们要替换为Kafka那么变动会比较大,这时我们可以使用SpringCloudStream来整合我们的消息中间件,来降低系统和中间件的耦合性。

一、消息中间的几大应用场景

1、异步处理

比如用户在电商网站下单,下单完成后会给用户推送短信或邮件,发短信和邮件的过程就可以异步完成。因为下单付款是核心业务,发邮件和短信并不属于核心功能,并且可能耗时较长,所以针对这种业务场景可以选择先放到消息队列中,有其他服务来异步处理。

2、应用解耦:

假设公司有几个不同的系统,各系统在某些业务有联动关系,比如 A 系统完成了某些操作,需要触发 B 系统及 C 系统。如果 A 系统完成操作,主动调用 B 系统的接口或 C 系统的接口,可以完成功能,但是各个系统之间就产生了耦合。用消息中间件就可以完成解耦,当 A 系统完成操作将数据放进消息队列,B 和 C 系统去订阅消息就可以了。这样各系统只要约定好消息的格式就好了。

3、流量削峰

比如秒杀活动,一下子进来好多请求,有的服务可能承受不住瞬时高并发而崩溃,所以针对这种瞬时高并发的场景,在中间加一层消息队列,把请求先入队列,然后再把队列中的请求平滑的推送给服务,或者让服务去队列拉取。

4、日志处理

kafka 最开始就是专门为了处理日志产生的。

当碰到上面的几种情况的时候,就要考虑用消息队列了。如果你碰巧使用的是 RabbitMQ 或者 kafka ,而且同样也是在使用 Spring Cloud ,那可以考虑下用 Spring Cloud Stream。

二、什么是SpringCloudStream

  官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。
  应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
  通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前仅支持RabbitMQKafka

三、Stream 解决了什么问题?

  Stream解决了开发人员无感知的使用消息中间件的问题,因为Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程

官网结构图

?

组成 说明
Middleware 中间件,目前只支持RabbitMQ和Kafka
Binder Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现
@Input 注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的消息接收
@EnableBinding 指信道channel和exchange绑定在一起

以下实战代码是基于RabbitMQ的,不清楚如何安装RabbitMQ请查看我的另一篇文章最简单的RabbitMQ消息队列搭建(windows环境下安装),项目的三个模块如下:

?

(一)创建消息生产者【service-sender-stream-8089】

pom.xml文件

?

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.xu</groupId>
    <artifactId>service-sender-stream-8089</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>service-sender-stream-8089</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Hoxton.M3</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.5.0</version>
        </dependency>
        <!-- swagger-ui -->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.5.0</version>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
        </repository>
    </repositories>

</project>

?

application.yml

server:
  port: 8089
spring:
  application:
    name: spring-cloud-stream-sender
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment: #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost
                username: guest
                password: guest
                virtual-host: /
      bindings:
        output:       #指定输入通道对应的主题名
          destination: stream-demo       #exchange名称,交换模式默认是topic
          content-type: text/plain       #消息发送的格式,接收端不用指定格式,但是发送端要

?

IMessageSender.java

package com.xu.serviceconsumer.interfaces;

public interface IMessageSender {

    void sendMessage(String message);
}

?

MessageSenderImpl.java

package com.xu.serviceconsumer.services;

import com.xu.serviceconsumer.interfaces.IMessageSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;

/**
 * 这个注解给我们绑定消息通道的,Source是Stream给我们提供的,可以点进去看源码,
 * 可以看到output和input,这和配置文件中的output,input对应的。
 */
@EnableBinding(Source.class)
public class MessageSenderImpl implements IMessageSender {

    private final static Logger logger = LoggerFactory.getLogger(MessageSenderImpl.class);

    //注入Source
    @Autowired
    private Source source;

    @Override
    public void sendMessage(String message) {
        boolean sendStatus = source.output().send(MessageBuilder.withPayload(message).build());
        logger.info("发送数据:{},sendStatus: {}",message,sendStatus);

    }
}

TestController.java

?

package com.xu.serviceconsumer.controller;

/**
 *
 */

import com.xu.serviceconsumer.interfaces.IMessageSender;
import io.swagger.annotations.Api;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author mazhen
 *
 */
@Api(description = "提交给MQ")
@RestController
public class TestController {

    private final static Logger logger = LoggerFactory.getLogger(TestController.class);

    @Autowired
    private IMessageSender iMessageSender;

    @GetMapping("send")
    public void send() {
        iMessageSender.sendMessage("Ronnie O‘Sullivan");
    }

}

附上swagger部分代码

?

SwaggerApp.java

package com.xu.serviceconsumer;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
@EnableSwagger2
public class SwaggerApp {
    @Bean
    public Docket createRestApi() {

        return new Docket(DocumentationType.SWAGGER_2)
                .apiInfo(apiInfo())
                .select()
                //为当前包路径
                .apis(RequestHandlerSelectors.basePackage("com.xu.serviceconsumer.controller"))
                .paths(PathSelectors.any())
                .build();
    }

    //构建 api文档的详细信息函数,注意这里的注解引用的是哪个
    private ApiInfo apiInfo() {
        return new ApiInfoBuilder()
                //页面标题
                .title("Spring Boot 使用 Swagger2 构建RESTful API")
                //创建人
                .contact(new Contact("Bryan", "http://blog.bianxh.top/", ""))
                //版本号
                .version("1.0")
                //描述
                .description("API 描述")
                .build();
    }
}

项目的启动类如下,没有什么特殊的处理:

?

启动项目后,输入地址http://localhost:8089/swagger-ui.html打开swagger页面,然后点击try it out发送消息

?

在后台我们可以看到发送消息成功了

?

(二)消息消费者【service-consumer-stream-8090】

pom.xml文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.xu</groupId>
    <artifactId>service-consumer-stream-8090</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>service-consumer-stream-8090</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Hoxton.M3</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.48</version>
        </dependency>

    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
        </repository>
    </repositories>

</project>

?

application.yml

server:
  port: 8090
spring:
  application:
    name: spring-cloud-stream-receiver-2
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment:                                      #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost
                username: guest
                password: guest
                virtual-host: /
      bindings:
        input:
          destination: stream-demo   #exchange名称,交换模式默认是topic

定义一个消息接收接口

?

ReceviceMsg.java

package com.xu.serviceconsumer.interfaces;

public interface ReceviceMsg {

    void receive(String message);
}

ReceviceMsgImpl.java

package com.xu.serviceconsumer.services;

import com.xu.serviceconsumer.interfaces.ReceviceMsg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@EnableBinding(value = {Sink.class})
public class ReceviceMsgImpl implements ReceviceMsg {

    private static Logger logger = LoggerFactory.getLogger(ReceviceMsgImpl.class);

    @StreamListener(Sink.INPUT)
    @Override
    public void receive(String message) {

        logger.info("8090客户端接收消息:"+message);
    }
}

启动类如下,也没有任何特殊处理:

?

启动项目,然后再次用上面的消息发送控制器TestController.java发送消息到消息队列,然后可以看到消息消费端也收到了队列的消息如下:

?

(三)消息消费者【service-consumer-stream-8091】

这是复制上面的消费者创建的另一个消息消费者,基本配置跟上面service-consumer-stream-8090基本一模一样,只是application.yml中的部分配置略有不同(主要是端口不同),如下:

?

application.yml

server:
  port: 8091
spring:
  application:
    name: spring-cloud-stream-receiver-1
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment:                                      #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost
                username: guest
                password: guest
                virtual-host: /
      bindings:
        input:
          destination: stream-demo   #exchange名称,交换模式默认是topic

启动这个消息消费者之后,用上面的消息生产者TestController生产一个消息,然后可以看到这个消费者也接收到了消息队列的消息了

?

这里我们就用默认的通道完成了消息的发送和接收,下一篇我们将说一下自定义通道实现消息的发送和接收,下次再见!

 

原文地址:https://www.cnblogs.com/xulijun137/p/12209761.html

时间: 2024-11-05 23:31:12

SpringCloud学习之Stream消息驱动【默认通道】(十)的相关文章

SpringCloud学习之Stream消息驱动【自定义通道】(十一)

如果不清楚本篇内容的,请务必先去看完上一篇再看本篇,否则阅读起来可能会有部分障碍和困难: 上一篇文章<SpringCloud学习之Stream消息驱动[默认通道](十)>我们简单用自定义通道实现了消息发送和接收,但是用的是Stream给我们提供的默认Source,Sink,接下来我们要自己进行自定义,这种方式在工作中还是用的比较多的,因为我们要往不同的消息通道发消息,必然不能全都叫input,output的,那样的话就乱套了 (一)创建消息生产者[service-sender-stream-8

SpringCloud Stream消息驱动

问题: 8801 POM: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org

(十七)JAVA springcloud ssm b2b2c多用户商城系统-消息驱动 Spring Cloud Stream

在使用spring cloud云架构的时候,我们不得不使用Spring cloud Stream,因为消息中间件的使用在项目中无处不在,我们公司后面做了娱乐方面的APP,在使用spring cloud做架构的时候,其中消息的异步通知,业务的异步处理都需要使用消息中间件机制.spring cloud的官方给出的集成建议(使用rabbit mq和kafka),我看了一下源码和配置,只要把rabbit mq集成,kafka只是换了一个pom配置jar包而已,闲话少说,我们就直接进入配置实施: 1. 简

SpringCloud学习第八篇:Stream学习(Greenwich.SR1版本)

一.Stream简介 应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与消息中间件交互.所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式.通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动.Spring Cloud Stream 为一些供应商的

第十章 消息驱动的微服务: Spring Cloud Stream

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架. 它可以基于Spring Boot 来创建独立的. 可用于生产的 Spring 应用程序. 它通过使用 Spring Integration 来连接消息代理中间件以实现消息事件驱动. Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅. 消费组以及分区这三个核心概念. 简单地说, Spring Cloud Stream 本质上就是整合了 Spr

SpringCloud学习系列之五-----配置中心(Config)和消息总线(Bus)完美使用版

前言 在上篇中介绍了SpringCloud Config的使用,本篇则介绍基于SpringCloud(基于SpringBoot2.x,.SpringCloud Finchley版)中的分布式配置中心(SpringCloud Config)的配置刷新和消息总线(RabbitMQ和Kafka)使用教程. SpringCloud Config Refresh 在上一篇中我们介绍了springcloud配置中心的本地使用和Git使用的用法,但是当重新修改配置文件提交后,客户端获取的仍然是修改前的信息,需

SpringCloud学习之SpringCloudStream&amp;集成kafka

一.关于Spring-Cloud-Stream Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架.通过使用Spring Cloud Stream,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理. 在这里我先放一张官网的图: 应用程序通过Spring Cloud Stream注入到其中的输入和输出通道与外界进行通信,应用程序并不关心到底和

Spring Cloud构建微服务架构 消息驱动的微服务(消费分区)【Dalston版】

通过上一篇<消息驱动的微服务(消费组)>的学习,我们已经能够在多实例环境下,保证同一消息只被一个消费者实例进行接收和处理.但是,对于一些特殊场景,除了要保证单一实例消费之外,还希望那些具备相同特征的消息都能够被同一个实例进行消费.这时候我们就需要对消息进行分区处理. 使用消息分区 在Spring Cloud Stream中实现消息分区非常简单,我们可以根据消费组示例做一些配置修改就能实现,具体如下: 在消费者应用SinkReceiver中,我们对配置文件做一些修改,具体如下: spring.c

SpringCloud学习系列之七 ----- Zuul路由网关的过滤器和异常处理

前言 在上篇中介绍了SpringCloud Zuul路由网关的基本使用版本,本篇则介绍基于SpringCloud(基于SpringBoot2.x,.SpringCloud Finchley版)中的路由网关的过滤器Filter以及异常处理的教程. SpringCloud Zuul Filter 介绍 过滤器概述 Zuul的中心是一系列过滤器,能够在HTTP请求和响应的路由过程中执行一系列操作. 以下是Zuul过滤器的主要特征: 类型:通常在应用过滤器时在路由流程中定义阶段(尽管它可以是任何自定义字