SpringCloud学习之Stream消息驱动【自定义通道】(十一)

如果不清楚本篇内容的,请务必先去看完上一篇再看本篇,否则阅读起来可能会有部分障碍和困难;

上一篇文章《SpringCloud学习之Stream消息驱动【默认通道】(十)》我们简单用自定义通道实现了消息发送和接收,但是用的是Stream给我们提供的默认Source,Sink,接下来我们要自己进行自定义,这种方式在工作中还是用的比较多的,因为我们要往不同的消息通道发消息,必然不能全都叫input,output的,那样的话就乱套了

(一)创建消息生产者【service-sender-stream-8089】

?

MySource.java

package com.xu.serviceconsumer.interfaces;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface MySource {

    String INPUT_CHANNEL = "myInput";
    String OUTPUT_CHANNEL = "myOutput";

    /**
     * 输入通道
     * @return
     */
    @Input(MySource.INPUT_CHANNEL)
    SubscribableChannel input();

    /**
     * 输出通道
     * @return
     */
    @Output(MySource.OUTPUT_CHANNEL)
    MessageChannel output();

}

?

application.yml

server:
  port: 8089
spring:
  application:
    name: spring-cloud-stream-sender
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment: #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost
                username: guest
                password: guest
                virtual-host: /
      bindings:
        myOutput:       #自定义输出通道
          destination: stream-demo       #exchange名称,交换模式默认是topic
          content-type: text/plain       #消息发送的格式,接收端不用指定格式,但是发送端要

发送消息接口不变

?

接口实现需要重新改造:

?

(二)消息消费者【service-consumer-stream-8090和service-consumer-stream-8091】

这两个消费客户端的配置基本一模一样的,只是application.yml中的端口略有不同

?

因为我们这个测试项目没有公共依赖模块,所以暂时把消息生产端中的MySource.java这个自定义通道类文件复制放到两个客户端模块里

?

我们还要重新改造消息消费者里的代码,接口定义不变:

?

?

重新Rebuild三个项目模块,然后重新启动三个模块,打开消息生产者swagger页面http://localhost:8089/swagger-ui.html重新生产一个消息到消息队列,我们依旧可以看到两个客户端也接收到了发送过来的消息:

?

?

?

至此我们完成了自定义通道消息发送和接收,spring cloud stream还有很多东西(比如分组group和分区partition),后面有空我再深入了解后补充说明,谢谢大家。

原文地址:https://www.cnblogs.com/xulijun137/p/12209766.html

时间: 2024-10-09 15:52:11

SpringCloud学习之Stream消息驱动【自定义通道】(十一)的相关文章

SpringCloud学习之Stream消息驱动【默认通道】(十)

在实际开发过程中,服务与服务之间通信经常会使用到消息中间件,而以往使用了中间件比如RabbitMQ,那么该中间件和系统的耦合性就会非常高,如果我们要替换为Kafka那么变动会比较大,这时我们可以使用SpringCloudStream来整合我们的消息中间件,来降低系统和中间件的耦合性. 一.消息中间的几大应用场景 1.异步处理 比如用户在电商网站下单,下单完成后会给用户推送短信或邮件,发短信和邮件的过程就可以异步完成.因为下单付款是核心业务,发邮件和短信并不属于核心功能,并且可能耗时较长,所以针对

SpringCloud Stream消息驱动

问题: 8801 POM: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org

(十七)JAVA springcloud ssm b2b2c多用户商城系统-消息驱动 Spring Cloud Stream

在使用spring cloud云架构的时候,我们不得不使用Spring cloud Stream,因为消息中间件的使用在项目中无处不在,我们公司后面做了娱乐方面的APP,在使用spring cloud做架构的时候,其中消息的异步通知,业务的异步处理都需要使用消息中间件机制.spring cloud的官方给出的集成建议(使用rabbit mq和kafka),我看了一下源码和配置,只要把rabbit mq集成,kafka只是换了一个pom配置jar包而已,闲话少说,我们就直接进入配置实施: 1. 简

SpringCloud学习第八篇:Stream学习(Greenwich.SR1版本)

一.Stream简介 应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与消息中间件交互.所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式.通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动.Spring Cloud Stream 为一些供应商的

第十章 消息驱动的微服务: Spring Cloud Stream

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架. 它可以基于Spring Boot 来创建独立的. 可用于生产的 Spring 应用程序. 它通过使用 Spring Integration 来连接消息代理中间件以实现消息事件驱动. Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅. 消费组以及分区这三个核心概念. 简单地说, Spring Cloud Stream 本质上就是整合了 Spr

SpringCloud学习系列之五-----配置中心(Config)和消息总线(Bus)完美使用版

前言 在上篇中介绍了SpringCloud Config的使用,本篇则介绍基于SpringCloud(基于SpringBoot2.x,.SpringCloud Finchley版)中的分布式配置中心(SpringCloud Config)的配置刷新和消息总线(RabbitMQ和Kafka)使用教程. SpringCloud Config Refresh 在上一篇中我们介绍了springcloud配置中心的本地使用和Git使用的用法,但是当重新修改配置文件提交后,客户端获取的仍然是修改前的信息,需

SpringCloud学习之SpringCloudStream&amp;集成kafka

一.关于Spring-Cloud-Stream Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架.通过使用Spring Cloud Stream,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理. 在这里我先放一张官网的图: 应用程序通过Spring Cloud Stream注入到其中的输入和输出通道与外界进行通信,应用程序并不关心到底和

Spring Cloud构建微服务架构 消息驱动的微服务(消费分区)【Dalston版】

通过上一篇<消息驱动的微服务(消费组)>的学习,我们已经能够在多实例环境下,保证同一消息只被一个消费者实例进行接收和处理.但是,对于一些特殊场景,除了要保证单一实例消费之外,还希望那些具备相同特征的消息都能够被同一个实例进行消费.这时候我们就需要对消息进行分区处理. 使用消息分区 在Spring Cloud Stream中实现消息分区非常简单,我们可以根据消费组示例做一些配置修改就能实现,具体如下: 在消费者应用SinkReceiver中,我们对配置文件做一些修改,具体如下: spring.c

SpringCloud学习系列之七 ----- Zuul路由网关的过滤器和异常处理

前言 在上篇中介绍了SpringCloud Zuul路由网关的基本使用版本,本篇则介绍基于SpringCloud(基于SpringBoot2.x,.SpringCloud Finchley版)中的路由网关的过滤器Filter以及异常处理的教程. SpringCloud Zuul Filter 介绍 过滤器概述 Zuul的中心是一系列过滤器,能够在HTTP请求和响应的路由过程中执行一系列操作. 以下是Zuul过滤器的主要特征: 类型:通常在应用过滤器时在路由流程中定义阶段(尽管它可以是任何自定义字