SpringCloud学习之SpringCloudStream&集成kafka

一、关于Spring-Cloud-Stream

  Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。通过使用Spring Cloud Stream,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。

  在这里我先放一张官网的图:

  应用程序通过Spring Cloud Stream注入到其中的输入和输出通道与外界进行通信,应用程序并不关心到底和哪个消息中间件通讯,它主要通过inputs与outputs进行消息的订阅和发送,根据此规则如果我们很容易的实现消息传递,订阅消息与消息中转,并且当需要切换消息中间件时,几乎不需要修改代码,只需要变更配置就行了。

  其中 Inputs代表了应用程序监听消息 、outputs代表发送消息、binder的话大家可以理解为将应用程序与消息中间件隔离的抽象,类似于dao模式下dao的屏蔽service与数据库的实现类似。当然springcloud默认提供了rabbitmq与kafka的实现。

二、springcloud集成kafka

1、添加gradle依赖:

dependencies{
    compile(‘org.springframework.cloud:spring-cloud-stream‘)
    compile(‘org.springframework.cloud:spring-cloud-stream-binder-kafka‘)
    compile(‘org.springframework.kafka:spring-kafka‘)
}

2、定义一个接口:

  spring-cloud-stream已经给我们定义了最基本的输入与输出接口,他们分别是 Source,Sink, Processor

  Sink接口:

package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}

  Source接口:

package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}

  Processor接口:

package org.springframework.cloud.stream.messaging;

public interface Processor extends Source, Sink {
}

  这里面Processor这个接口既定义输入通道又定义了输出通道。同时我们也可以自己定义通道接口,代码如下:

package com.bdqn.lyrk.shop.channel;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface ShopChannel {

    /**
     * 发消息的通道名称
     */
    String SHOP_OUTPUT = "shop_output";

    /**
     * 消息的订阅通道名称
     */
    String SHOP_INPUT = "shop_input";

    /**
     * 发消息的通道
     *
     * @return
     */
    @Output(SHOP_OUTPUT)
    MessageChannel sendShopMessage();

    /**
     * 收消息的通道
     *
     * @return
     */
    @Input(SHOP_INPUT)
    SubscribableChannel recieveShopMessage();

}

3、定义服务类

package com.bdqn.lyrk.shop.server;

import com.bdqn.lyrk.shop.channel.ShopChannel;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class ShopService {

    @Resource(name = ShopChannel.SHOP_OUTPUT)
    private MessageChannel sendShopMessageChannel;

    @GetMapping("/sendMsg")
    public String sendShopMessage(String content) {
        boolean isSendSuccess = sendShopMessageChannel.
                send(MessageBuilder.withPayload(content).build());
        return isSendSuccess ? "发送成功" : "发送失败";
    }

    @StreamListener(ShopChannel.SHOP_INPUT)
    public void receive(Message<String> message) {
        System.out.println(message.getPayload());
    }
}

  这里面大家注意 @StreamListener 这个注解可以监听输入通道里的消息内容,这里面要指定我们刚才定义的输入通道名称,而MessageChannel则可以通过

输出通道发送消息,使用@Resource注入时也要指定我们刚才定义的输出通道名称

4、定义启动类

package com.bdqn.lyrk.shop;

import com.bdqn.lyrk.shop.channel.ShopChannel;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableBinding(ShopChannel.class)
public class ShopServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ShopServerApplication.class, args);
    }
}

  注意@EnableBinding注解,这个注解指定刚才我们定义通道的接口名称,当然这里也可以传多个相关的接口

5、定义application.yml文件

spring:
  application:
    name: shop-server
  cloud:
    stream:
      bindings:
        #配置自己定义的通道与哪个中间件交互
        shop_input: #ShopChannel里Input和Output的值
          destination: zhibo #目标主题
        shop_output:
          destination: zhibo
      default-binder: kafka #默认的binder是kafka
  kafka:
    bootstrap-servers: localhost:9092 #kafka服务地址
    consumer:
      group-id: consumer1
    producer:
      key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      client-id: producer1
server:
  port: 8100

  这里是重头戏,我们必须指定在接口里定义的所有通道对应的消息主题,同时指定默认的binder为kafka,紧接着定义Spring-kafka的外部化配置,在这里指定producer的序列化类为ByteArraySerializer

启动程序成功后,我们访问 http://localhost:8100/sendMsg?content=2 即可得到如下结果

原文地址:https://www.cnblogs.com/niechen/p/8687206.html

时间: 2024-09-29 08:09:04

SpringCloud学习之SpringCloudStream&集成kafka的相关文章

SpringCloud学习之Ribbon

一.负载均衡与Ribbon 负载均衡,在集群中是很常见的一个“名词”,顾名思义是根据一定的算法将请求分摊至对应的服务节点上,常见的算法有如下几种: 轮询法:所有请求被依次分发到每台应用服务器上,每台服务器需要处理的请求数目都相同,适合所有服务器硬件都相同的场景 随机法:请求被随机分配到各个应用服务器,在许多场合下,这种方案都很简单实用. 源地址哈希(Hash)法:将请求来源的IP地址进行Hash计算,得到对应的服务器,这样来自同一个IP的请求总在同一个服务器上处理 加权法:根据应用服务器配置的情

SpringCloud学习心得—1.3—Eureka与REST API

SpringCloud学习心得—1.3—Eureka与REST API Eureka的REST API接口 API的基本访问 Eureka REST APIEureka 作为注册中心,其本质是存储了每个客户端的注册信息,Ribbon 在转发的时候会获取注册中心的服务列表,然后根据对应的路由规则来选择一个服务给 Feign 来进行调用. 如果我们不是Spring Cloud 技术选型,也想用 Eureka,可以吗?完全可以. 如果不是 Spring Cloud 技术栈,推荐用 Zookeeper,

Storm集成Kafka应用的开发

我们知道storm的作用主要是进行流式计算,对于源源不断的均匀数据流流入处理是非常有效的,而现实生活中大部分场景并不是均匀的数据流,而是时而多时而少的数据流入,这种情况下显然用批量处理是不合适的,如果使用storm做实时计算的话可能因为数据拥堵而导致服务器挂掉,应对这种情况,使用kafka作为消息队列是非常合适的选择,kafka可以将不均匀的数据转换成均匀的消息流,从而和storm比较完善的结合,这样才可以实现稳定的流式计算,那么我们接下来开发一个简单的案例来实现storm和kafka的结合 s

storm集成kafka的应用,从kafka读取,写入kafka

storm集成kafka的应用,从kafka读取,写入kafka by 小闪电 0前言 storm的主要作用是进行流式的实时计算,对于一直产生的数据流处理是非常迅速的,然而大部分数据并不是均匀的数据流,而是时而多时而少.对于这种情况下进行批处理是不合适的,因此引入了kafka作为消息队列,与storm完美配合,这样可以实现稳定的流式计算.下面是一个简单的示例实现从kafka读取数据,并写入到kafka,以此来掌握storm与kafka之间的交互. 1程序框图 实质上就是storm的kafkasp

Windows phone 8 学习笔记(9) 集成(转)

本节整理了之前并没有提到的Windows phone 8 系统相关集成支持,包括选择器.锁定屏幕的.联系人的访问等.选择器列举了若干内置应用提供的相关支持:锁定屏幕展示了我们可以对锁定屏幕提供背景图像,屏幕通知等功能:联系人访问演示了对联系人的管理维护和只读访问. 快速导航:一.选择器二.锁定屏幕三.联系人访问 一.选择器 选择器为手机内置应用对您的开发提供的相关支持,通过选择器你可以直接调用相机应用捕获一张照片到你的应用,或者调用联系人的相关资料,保存铃声等操作. 下面列举了大部分常用选择器:

spark streaming集成kafka

Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala编写,以可水平扩展和高吞吐率而被广泛使用.目前越来越多的开源分布式处理系统如Cloudera.Apache Storm.Spark等都支持与Kafka集成. Spark streaming集成kafka是企业应用中最为常见的一种场景. 一.安装kafka 参考文档: http://kafka.apache.org/quickstart#quickstart_createtopic 1.安

asp.net core mcroservices 架构之 分布式日志(三):集成kafka

一 kafka介绍 kafka是基于zookeeper的一个分布式流平台,既然是流,那么大家都能猜到它的存储结构基本上就是线性的了.硬盘大家都知道读写非常的慢,那是因为在随机情况下,线性下,硬盘的读写非常快.kafka官方文档,一直拿传统的消息队列来和kafka对比,这样大家会触类旁通更快了解kafka的特性.最熟悉的消息队列框架有ActiveMQ 和 RabbitMQ.熟悉消息队列的,最熟悉的特性就是队列和发布订阅功能,因为这是大家最常用的,kafka实现了一些特有的机制,去规避传统的消息队列

SpringCloud学习系列之七 ----- Zuul路由网关的过滤器和异常处理

前言 在上篇中介绍了SpringCloud Zuul路由网关的基本使用版本,本篇则介绍基于SpringCloud(基于SpringBoot2.x,.SpringCloud Finchley版)中的路由网关的过滤器Filter以及异常处理的教程. SpringCloud Zuul Filter 介绍 过滤器概述 Zuul的中心是一系列过滤器,能够在HTTP请求和响应的路由过程中执行一系列操作. 以下是Zuul过滤器的主要特征: 类型:通常在应用过滤器时在路由流程中定义阶段(尽管它可以是任何自定义字

SpringCloud学习系列之三----- 断路器Hystrix和断路器监控Dashboar

前言 本篇主要介绍的是SpringCloud中的断路器(Hystrix)和断路器指标看板(Dashboard)的相关使用知识. SpringCloud Hystrix Hystrix 介绍 Netflix创建了一个名为Hystrix的库,它实现了断路器模式.主要的目的是为了解决服务雪崩效应的一个组件,是保护服务高可用的最后一道防线. 开发准备 开发环境 JDK:1.8 SpringBoot:2.1.1.RELEASE SpringCloud:Finchley 注:不一定非要用上述的版本,可以根据