SpringCloud学习第八篇:Stream学习(Greenwich.SR1版本)

一、Stream简介

  应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前仅支持RabbitMQ、Kafka。

二、Stream学习

 2.1消息中间件

  使用kafka消息中间件来学习:https://www.cnblogs.com/yangk1996/p/10841588.html

 2.2、工程改造

  改造之前的工程:https://www.cnblogs.com/yangk1996/p/11069610.html

  • springcloud-api
@Data
public class User implements Serializable {

    /**
     * ID
     */
    private Long id;

    /**
     * 用户名称
     */
    private String name;

}
  • spring-cloud-api-client
    • maven
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
    • 增加消息输出
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * @Description: 用户信息输出
 * @date: 2019/6/23 13:05
 */
public interface UserMessage {
    @Output("user-message")
    MessageChannel output();
}
    • Controller
    @Autowired
    private UserMessage userMessage;
   @PostMapping("/user/save/message/stream")
    public boolean saveUserByRabbitMessage(@RequestBody User user){
        MessageChannel messageChannel = userMessage.output();
         return  messageChannel.send(MessageBuilder.withPayload(user).build());
    }
    • application.properties
## Kafka 生产者配置
spring.kafka.BOOTSTRAP-SERVERS=192.168.100.129:9092
spring.kafka.consumer.group-id= yangk
spring.kafka.consumer.clientId=spring-cloud-api-client

## Spring Cloud Stream Binding 配置
###  user-message 为输出管道名称 destination 指定 Topic
spring.cloud.stream.bindings.user-message.destination = springCloud
    • 启动入口增加EnableBinding
@EnableBinding(UserMessage.class)
  • spring-cloud-api-provider
    • Maven
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
    • 消息输入
public interface UserMessage {
    @Input("user-message")  //管道名称
    SubscribableChannel input();
}
    • UserMessageServiceImpl(三种消息监听实现)
    @Autowired
    private UserMessage userMessage;
    @Autowired
    private UserService userService;
    @Autowired
    private ObjectMapper objectMapper;

    @ServiceActivator(inputChannel = "user-message")
    public void listen(String data) throws IOException {
        System.out.println("ServiceActivator实现"+data);
       saveUser(data);
    }

    @StreamListener("user-message")
    public void onMessage(String data) throws IOException {
        System.out.println(" @StreamListeners实现"+data);
        saveUser(data);
    }
        private void saveUser(String data) throws IOException {
        User user = objectMapper.readValue(data, User.class);
       userService.saveUser(user);
    }
    @PostConstruct
    public void init() {
        SubscribableChannel subscribableChannel = userMessage.input();
        subscribableChannel.subscribe(message -> {
            System.out.println("SubscribableChannel实现"+message);
        });
    }
    • application.properties
#Kafka配置
spring.kafka.BOOTSTRAP-SERVERS=192.168.100.129:9092
spring.kafka.consumer.group-id=yangk
spring.kafka.consumer.clientId=spring-cloud-api-client

## Spring Cloud Stream Binding 配置
### userMessage 为输入管道名称  destination 指定 Topic
spring.cloud.stream.bindings.user-message.destination = springCloud
    • 启动入口激活Stream Binding
//激活 Stream Binding到UserMessage
@EnableBinding(UserMessage.class)

三、验证

  项目启动顺序:spring-cloud-eureka-server -> spring-cloud-api-client -> spring-cloud-api-provider

这里乱码的应该是kafka的其他属性没有转换过来,这里我也没有处理这些。

原文地址:https://www.cnblogs.com/yangk1996/p/11072712.html

时间: 2024-08-30 08:39:53

SpringCloud学习第八篇:Stream学习(Greenwich.SR1版本)的相关文章

C#学习(八)- Stream

1. 综述 流用于对IO处理,在System.IO名称空间中有以下类: BinaryReader/Writer TextReader/Writer Stream其中类Stream为抽象类.由此有三个派生类: MemoryStream:对内存进行读取与写入 BufferedStream:对缓冲器进行读取/写入 FileStream:对文件执行读取与写入 TextReader/Writer为抽象类.由此派生类:StreamReader/StreamWirterStringReader/StreamW

从.Net到Java学习第八篇——SpringBoot实现session共享和国际化

SpringBoot Session共享 修改pom.xml添加依赖 <!--spring session--> <dependency> <groupId>org.springframework.session</groupId> <artifactId>spring-session-data-redis</artifactId> </dependency> 添加配置类RedisSessionConfig @Config

python 学习 第八篇 jquery

简介: jQuery是一个javascript库.极大滴简化了javascript编程. 包含内容: HTML 元素选取 HTML 元素操作 CSS 操作 HTML 事件函数 JavaScript 特效和动画 HTML DOM 遍历和修改 AJAX 1:下载 jQuery 共有两个版本的 jQuery 可供下载:一份是生产版本jQuery.min.js(最小化和压缩过的),另一份是开发版jQuery.js(未压缩的供调试或阅读). 这两个版本都可http://jquery.com/downloa

iOS学习第八篇 ——NSString的使用

IOS字符串的常用方法和使用 NSString 1. NSString的四中创建方法 (1) NSString *str1 = @"方式一"; (2) NSString *str2 = [ [NSString alloc] initWithString:@"方式二"]; (3) NSString *str3 = [ NSString stringWithFormat:@"%@",@"方式三"]; (4) NSString *s

Linux学习第八篇之文件搜索命令find

一.find命令:(Windows搜索小工具推荐——Everything) 命令名称:find 命令所在路径:/bin/find 执行权限:所有用户 语法:find [搜索范围] [匹配条件] 功能描述:文件搜索 二.find命令的例子: 1.find /etc -name init 在目录/etc中查找文件init(会在子目录下的文件继续搜索init),-name 搜索条件的选项,文件名是全匹配的,模糊搜索可用通配符处理,如find /etc -name *init*,如果报find: pat

ActiveMQ学习第八篇:Messaage

Messaage Properties: ??ActiveMQ支持很多消息属性,具体可以参考 http://activemq.apache.org/activemq-message-properties.html ??常见得一些属性说明: queue得消息默认是持久化得 消息得优先级默认是4. 消息发送时设置了时间戳. 消息的过期时间默认是永不过期,过期的消息进入DLQ,可以配置DLQ及其处理策略. 如果消息是重发的,将会被标记出来. JMSReplyTo标识响应消息发送到哪个queue. JM

SpringCloud教程 | 第八篇: 消息总线(Spring Cloud Bus)

一.安装rabbitmq 二.pom父文件 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.ap

跟我学SpringCloud | 第十八篇:微服务 Docker 化之基础环境

1. 容器化 Docker 的横空出世,给了容器技术带来了质的飞跃,Docker 标准化了服务的基础设施,统一了应用的打包分发,部署以及操作系统相关类库等,解决了测试生产部署时环境差异的问题.对于运维来讲,由于镜像的不可变性,更容易进行服务部署和回滚操作.利用各种第三方容器管理平台,实现一键部署.动态伸缩等操作变的轻而易举. 2. 基础镜像选择 在操作系统的选择上,可选择传统的 CentOS . Ubuntu 或者更为轻量化的 Alpine .比如 CentOS 或者 Ubuntu 的镜像都在

业余草 SpringCloud教程 | 第五篇: 路由网关(zuul)(Finchley版本)

在微服务架构中,需要几个基础的服务治理组件,包括服务注册与发现.服务消费.负载均衡.断路器.智能路由.配置管理等,由这几个基础组件相互协作,共同组建了一个简单的微服务系统.一个简答的微服务系统如下图:  注意:A服务和B服务是可以相互调用的,作图的时候忘记了.并且配置服务也是注册到服务注册中心的. 在Spring Cloud微服务系统中,一种常见的负载均衡方式是,客户端的请求首先经过负载均衡(zuul.Ngnix),再到达服务网关(zuul集群),然后再到具体的服.,服务统一注册到高可用的服务注