Spring Cloud - 8 (Spring Cloud Stream) 򃲖

原文:
http://blog.gqylpy.com/gqy/497

置顶:来自一名75后老程序员的武林秘籍——必读(博主推荐)



来,先呈上武林秘籍链接:http://blog.gqylpy.com/gqy/401/

你好,我是一名极客!一个 75 后的老工程师!

我将花两分钟,表述清楚我让你读这段文字的目的!

如果你看过武侠小说,你可以把这个经历理解为,你失足落入一个山洞遇到了一位垂暮的老者!而这位老者打算传你一套武功秘籍!

没错,我就是这个老者!

干研发 20 多年了!我也年轻过,奋斗过!我会画原理图,会画 PCB,会模拟,会数字!玩过 PLC,玩过单片机,会用汇编,会用 C!玩过 ARM,比如
PLC,STM32,和时下正在起飞的 NXP RT1052!搞过 DSP,比如 TMS320F28335!搞过 FPGA,不管 Xilinx 还是 Altera,也不管是 Verilog 还是
VHDL,或者直接画数字电路图!我懂嵌入式系统,比如 uCOS 和 Linux!我懂开源的硬件,比如 Arduino
和树莓派!我也搞软件,学了一堆上位机的语言C#,JAVA,Python,Kotlin,Swift!会写爬虫工具,又自学写APP,不管Android 还是 IOS!

可是这一切有什么用呢?土鸡瓦狗!不值一提!干技术的永远就是最苦逼的那个人!

我相信看到这里的你,应该是个 IT
圈的人!或许是个学生,在学习某个技能!或者是个初入职场的年轻人,在啃某个技术!或者是个工程师,被项目困住,想找个资料快速突破阻碍!反正不管怎么样,你们都不会是泛泛之辈,不可能轻易交出智商税!

所以我把这份资料放进我的收费资源里,以证明接下去我要跟你讲的这本武功秘籍是可以真真实实的帮你赚到钱的!

我不知道叫它什么好,我把它写的像武林秘籍!所以我姑且叫它《武林秘籍》或者叫《赚钱秘籍》!

《武林秘籍》里封装了一个本人近期创造的一个可以一劳永逸的赚钱方法!你可以理解为躺着赚钱,或者挂机赚钱!请你放心,不是让你去违法!

我是一个IT男,从来不忽悠别人,这是我做人的原则。若此举能帮助你付起房子首付与月供,减轻一些目前高房价的压力,何乐而不为呢!

我提取里边几个要点:

  1. 将你手里有的资源按照说明书一步一步完成所有动作就可以躺着赚钱。
  2. 你不可能不劳而获,但是用这个方法确实是可以一劳永逸!
  3. 我用业余时间操作这个项目三个月,现在每天稳定收入300+。
  4. 里边会告诉你哪些是资源,怎么源源不断的获取资源。
  5. 里边会告诉你怎么获取爆炸的流量。
  6. 里边会告诉你很多黑技能(不是干坏事)。
  7. 总之,里边字字如金,有些东西我不告诉你可能这辈子都不会知道!

交了这波智商税,你的能力会爆涨,我说的不是你的专业能力,而是在这个社会生存的基础能力!

以上所有的东西可以规为武功的招式,但如果你想短期就实现目标,我还在说明书的最后留下了一些现成资源的下载链接,包括一些稀缺的资源,保证物有所值。这部分内容可以规为内功,继不继承由你自已决定!

好了,最后跟所有的老者不一样的是:这个老人要问你收取一点点小费,才会把无比珍贵的秘籍交到你手中!

以下是付款链接,付款后你将获取《武林秘籍》的访问密码。随后你将解锁另外一个谋生技能,在工作挣着死工资的同时,该技能也能同时帮你赚另一份钱,终身受用!

http://www.gqylpy.com/get_wlmj_pwd

能在此遇见是我们的缘分,我愿意帮助你,祝你取得成功!

传说中的武林秘籍:http://blog.gqylpy.com/gqy/401/

Spring Cloud Stream



?

Kafka



官方网页

http://kafka.apache.org/

?

主要用途

  • 消息中间件
  • 流式计算处理
  • 日志

?

下载地址:http://kafka.apache.org/downloads

?

执行脚本目录 /bin

windows 在其单独的目录

?

快速上手

下载并且解压kafka压缩包

运行服务

以Windows为例,首先打开cmd:

1.? 启动zookeeper:

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

2.? 启动kafka:

bin\windows\kafka-server-start.bat config\server.properties

?

创建主题topic

bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic cloud

cloud

?

生产者:发送消息

\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic cloud
>hello

?

消费者:接收消息

\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic cloud --from-beginning
hello

同类产品比较

  • ActiveMQ:JMS(Java Message Service)规范实现
  • RabbitMQ:AMQP(Advanced Message Queue Protocol)规范实现
  • Kafka:并非某种规范实现,它的灵活和性能相对是优势

?

Spring Kafka


import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

public class TestKafka {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Properties properties = new Properties();

        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());

        // 创建 Kafka Producer
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer(properties);

        // 创建Kafka消息 = ProducerRecord
        String topic = "cloud";

        Integer partition = 0;

        Long timestamp = System.currentTimeMillis();

        String key = "message-key";
        String value = "how are you!";

        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, partition, timestamp, key, value);

        // 发送kafka消息
        Future<RecordMetadata> metadataFuture = kafkaProducer.send(record);

        // 强制执行
        metadataFuture.get();

    }

}

?

官方文档:https://docs.spring.io/spring-kafka/reference/html/

?

设计模式

Spring社区对data(Spring-data)操作,有一个基本的模式,Template模式:

  • JDBC:JdbcTemplate
  • Redis:RedisTemplate
  • Kafka:KafkaTemplate
  • JMS:JmsTemplate
  • Rest:RestTemplate

XXXTemplate一定实现XXXOpeations

KafkaTemplate implements?KafkaOpeations

?

Maven依赖

<dependency>

?<groupId>org.springframework.kafka</groupId>

<artifactId>spring-kafka</artifactId>

</dependency>

?

自动装配器: KafkaAutoConfiguration

其中KafkaTemplate会被自动装配:

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.ProducerListener;

@Configuration
public class KafkaAutoConfiguration {

     private final KafkaProperties properties;

    public KafkaAutoConfiguration(KafkaProperties properties) {
        this.properties = properties;
    }

    @Bean
    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?,?> kafkaTemplate(ProducerFactory<Object,Object> kafkaProducerFactory,

        ProducerListener<Object,Object> kafkaProducerListener){
        KafkaTemplate<Object,Object> kafkaTemplate = new KafkaTemplate<Object,Object>(kafkaProducerFactory);

        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());

        return kafkaTemplate;
    }
}

?

创建生产者

增加生产者配置

application.properties

? ?全局配置:

? ? ### Kafka生产者配置? ? ?

? ? spring.kafka.producer.bootstrapServers = localhost:9092

### Kafka生产者配置

# spring.kafka.producer.bootstrapServers = localhost:9092

spring.kafka.producer.keySerializer = org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.valueSerializer = org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaProducerController {

    private final KafkaTemplate<String, String> kafkaTemplate;

    private final String topic;

    @Autowired
    public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate, @Value("${kafka.topic}") String topic) {

        this.kafkaTemplate = kafkaTemplate;
        this.topic = topic;
    }

    @PostMapping("/message/send")
    public Boolean sendMessage(@RequestParam(required=false)String message) {
        kafkaTemplate.send(topic, message);
        return true;
    }

}

?

创建消费者

增加消费者配置

### Kafka消费者配置
spring.kafka.consumer.groupId = cloud-1

spring.kafka.consumer.keyDeserializer = org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.valueDeserializer = org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumerListener {

    @KafkaListener(topics = "${kafka.topic}")
    public void onMessage(String message) {

        System.out.print("kafka 消费者监听器,接收到消息:" + message);

    }
}

?

?

Spring Cloud Stream



基本概念

  • Source:来源,近义词:Producer、Publisher
  • Sink:接收器,近义词:Consumer、Subscriber
  • Processor:对于上流而言是Sink,对于下流而言是Source

?

Reactive Streams:

  • Publisher
  • Subscriber
  • Processor

?

Spring Cloud Stream Binder:kafka

?

定义标准消息发送源

@Component
@EnableBinding(Source.class)
public class MessageProducerBean {

?? [email protected]
?? [email protected](Source.OUTPUT)
?? ?private MessageChannel messageChannel;?
?? ?
?? [email protected]
?? ?private Source source;
?? ?
?? ?
?? ?public void send(String message) {
?? ??? ?messageChannel.send(MessageBuilder.withPayload(message).build());
?? ??? ?source.output().send(MessageBuilder.withPayload(message).build());
?? ?}

}

?

注入

@RestController
public class KafkaProducerController {

?? ?private final KafkaTemplate<String, String> kafkaTemplate;

?? ?private final MessageProducerBean messageProducerBean;

?? ?private final String topic;

?? [email protected]
?? ?public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate,?
?? ??? ??? [email protected]("${kafka.topic}") String topic,
?? ??? ??? ?MessageProducerBean messageProducerBean) {

?? ??? ?this.kafkaTemplate = kafkaTemplate;
?? ??? ?this.messageProducerBean = messageProducerBean;
?? ??? ?this.topic = topic;
?? ?}

?? [email protected]("/message/send")
?? ?public Boolean sendMessage(@RequestParam(required=false)String message) {
?? ??? ?kafkaTemplate.send(topic, message);
?? ??? ?return true;
?? ?}
?? ?
?? [email protected]("/message/output/send")
?? ?public Boolean outputSend(@RequestParam String message) {
?? ??? ?messageProducerBean.send(message);
?? ??? ?return true;
?? ?}?? ?
?? ?

}

?

实现标准Sink监听

@Component
@EnableBinding(Sink.class)
public class MessageConsumerBean {

?? [email protected]
?? [email protected](Sink.INPUT)
?? ?private SubscribableChannel subscribableChannel;
?? ?
?? [email protected]
?? ?private Sink sink;

?? [email protected]
?? ?public void init() {
?? ??? ?subscribableChannel.subscribe(new MessageHandler() {
?? ??? ??? ?
?? ??? ??? [email protected]
?? ??? ??? ?public void handleMessage(Message<?> message) throws MessagingException {
?? ??? ??? ??? ?System.out.println("init: "+message.getPayload());
?? ??? ??? ??? ?
?? ??? ??? ?}
?? ??? ?});
?? ?}
?? ?
?? [email protected](inputChannel = Sink.INPUT)
?? ?public void serviceActivator(Object message) {
?? ??? ?System.out.println("serviceActivator: "+message);
?? ?}
?? ?
?? [email protected](Sink.INPUT)
?? ?public void streamListener(String message) {
?? ??? ?System.out.println("streamListener: "+message);
?? ?}
?? ?
}

自定义标准消息发送源

public interface MySource {
?? ?
?? ?String MYOUTPUT = "myoutput";

?? ?
?? [email protected](MySource.MYOUTPUT)
?? ?MessageChannel myoutput();

}
@Component
@EnableBinding(MySource.class)
public class MyMessageProducerBean {

    @Autowired
    @Qualifier(MySource.MYOUTPUT)
    private MessageChannel messageChannel; 

    @Autowired
    private MySource mySource;

    public void send(String message) {
        messageChannel.send(MessageBuilder.withPayload(message).build());
        mySource.myoutput().send(MessageBuilder.withPayload(message).build());
    }

}

?

自定义Sink监听

public interface MySink {

?? ?String MYINPUT = "myinput";

?? [email protected](MySink.MYINPUT)
?? ?SubscribableChannel input();

}

?

@Component
@EnableBinding(MySink.class)
public class MyMessageConsumerBean {

?? [email protected]
?? [email protected](MySink.MYINPUT)
?? ?private SubscribableChannel subscribableChannel;
?? ?
?? [email protected]
?? ?private MySink mySink;

?? [email protected]
?? ?public void init() {
?? ??? ?subscribableChannel.subscribe(new MessageHandler() {
?? ??? ??? ?
?? ??? ??? [email protected]
?? ??? ??? ?public void handleMessage(Message<?> message) throws MessagingException {
?? ??? ??? ??? ?System.out.println("my - Sink: "+message.getPayload());
?? ??? ??? ??? ?
?? ??? ??? ?}
?? ??? ?});
?? ?}
?? ?
?? [email protected](inputChannel = MySink.MYINPUT)
?? ?public void serviceActivator(Object message) {
?? ??? ?System.out.println("my - serviceActivator: "+message);
?? ?}
?? ?
?? [email protected](MySink.MYINPUT)
?? ?public void streamListener(String message) {
?? ??? ?System.out.println("my - streamListener: "+message);
?? ?}
?? ?
}

配置项

kafka.topic.my = mytopic
spring.cloud.stream.bindings.myoutput.destination=${kafka.topic.my}
spring.cloud.stream.bindings.myinput.destination=${kafka.topic.my}

?

Spring Cloud Stream Binder:rabbit



重构Kafka工程,删除强依赖

?

Stream-kafka实现源码:https://pan.baidu.com/s/1RX5W2wMj4h_SKDkjlPQHkA?提取码:lwak?
?

Stream-rabbit实现源码:https://pan.baidu.com/s/1AX6asvmATN9-dYrhIIfS7w?提取码:dsc5?
?

原文:
http://blog.gqylpy.com/gqy/497

原文地址:https://www.cnblogs.com/mypath1/p/11408956.html

时间: 2024-11-08 07:12:36

Spring Cloud - 8 (Spring Cloud Stream) 򃲖的相关文章

一句话概括下spring框架及spring cloud框架主要组件

作为java的屌丝,基本上跟上spring屌丝的步伐,也就跟上了主流技术.spring 顶级项目:Spring IO platform:用于系统部署,是可集成的,构建现代化应用的版本平台,具体来说当你使用maven dependency引入spring jar包时它就在工作了.Spring Boot:旨在简化创建产品级的 Spring 应用和服务,简化了配置文件,使用嵌入式web服务器,含有诸多开箱即用微服务功能,可以和spring cloud联合部署.Spring Framework:即通常所

转:一句话概括下spring框架及spring cloud框架主要组件

作为java的屌丝,基本上跟上spring屌丝的步伐,也就跟上了主流技术. spring 顶级项目:Spring IO platform:用于系统部署,是可集成的,构建现代化应用的版本平台,具体来说当你使用maven dependency引入spring jar包时它就在工作了.Spring Boot:旨在简化创建产品级的 Spring 应用和服务,简化了配置文件,使用嵌入式web服务器,含有诸多开箱即用微服务功能,可以和spring cloud联合部署.Spring Framework:即通常

漫谈spring cloud 与 spring boot 基础架构

详情请交流  QQ  709639943 01.漫谈spring cloud 与 spring boot 基础架构 02.漫谈spring cloud分布式服务架构 03.Node.js入门到企业Web开发中的应用 04.精通高级RxJava 2响应式编程思想 05.Java秒杀系统方案优化 高性能高并发实战 06.Java深入微服务原理改造房产销售平台 07.快速上手Linux 玩转典型应用 08.快速上手Ionic3 多平台开发企业级问答社区 09.Java Spring Security开

Spring Initializr 构建Spring Boot/Cloud工程

在之前的所有Spring Boot和Spring Cloud相关博文中,都会涉及Spring Boot工程的创建.而创建的方式多种多样,我们可以通过Maven来手工构建或是通过脚手架等方式快速搭建,也可以通过<Spring Boot快速入门>一文中提到的SPRING INITIALIZR页面工具来创建,相信每位读者都有自己最喜欢和最为熟练的创建方式. 本文我们将介绍嵌入的Intellij中的Spring Initializr工具,它同Web提供的创建功能一样,可以帮助我们快速的构建出一个基础的

基于Spring Boot和Spring Cloud实现微服务架构学习(四)

Spring Cloud介绍 Spring Cloud是一个基于Spring Boot实现的云应用开发工具,它为基于JVM的云应用开发中的配置管理.服务发现.断路器.智能路由.微代理.控制总线.全局锁.决策竞选.分布式会话和集群状态管理等操作提供了一种简单的开发方式. Spring Cloud与Dubbo对比 提到Dubbo,我想顺便提下ESB,目前央视新华社也在用ESB来做任务编排,这里先比较下Dubbo和ESB: ESB(企业数据总线),一般采用集中式转发请求,适合大量异构系统集成,侧重任务

基于Spring Boot和Spring Cloud实现微服务架构学习

Spring Cloud介绍 Spring Cloud是一个基于Spring Boot实现的云应用开发工具,它为基于JVM的云应用开发中的配置管理.服务发现.断路器.智能路由.微代理.控制总线.全局锁.决策竞选.分布式会话和集群状态管理等操作提供了一种简单的开发方式. Spring Cloud与Dubbo对比 提到Dubbo,我想顺便提下ESB,目前央视新华社也在用ESB来做任务编排,这里先比较下Dubbo和ESB: ESB(企业数据总线),一般采用集中式转发请求,适合大量异构系统集成,侧重任务

java电子商务系统源码 Spring MVC+mybatis+spring cloud+sprin

鸿鹄云商大型企业分布式互联网电子商务平台,推出PC+微信+APP+云服务的云商平台系统,其中包括B2B.B2C.C2C.O2O.新零售.直播电商等子平台. 分布式.微服务.云架构电子商务平台 java b2b2c o2o 技术解决方案 开发语言:?java.j2ee 数据库:mysql JDK支持版本:?JDK1.6.JDK1.7.JDK1.8版本 通用框架:maven+springmvc+mybatis+spring cloud+spring boot+redis 核心技术:分布式.云服务.微

java电子商务系统源码 Spring MVC+mybatis+spring cloud+spring boot+spring security

鸿鹄云商大型企业分布式互联网电子商务平台,推出PC+微信+APP+云服务的云商平台系统,其中包括B2B.B2C.C2C.O2O.新零售.直播电商等子平台. 分布式.微服务.云架构电子商务平台 java b2b2c o2o 技术解决方案 开发语言: java.j2ee 数据库:mysql JDK支持版本: JDK1.6.JDK1.7.JDK1.8版本 通用框架:maven+springmvc+mybatis+spring cloud+spring boot+redis 核心技术:分布式.云服务.微

Spring MVC+mybatis+spring cloud+spring boot+spring

分布式.微服务.云架构电子商务平台 java b2b2c o2o 技术解决方案 开发语言: java.j2ee 数据库:mysql JDK支持版本: JDK1.6.JDK1.7.JDK1.8版本 通用框架:maven+springmvc+mybatis+spring cloud+spring boot+redis 核心技术:分布式.云服务.微服务.服务编排 核心架构: 使用Spring Cloud分布式微服务云架构进行服务化开发,所有模块功能完全解耦,提供服务发现.注册.配置中心.消息总线.负载

spring cloud和spring boot两个完整项目

spring cloud和spring boot两个完整项目 spring cloud 是基于Spring Cloud的云分布式后台管理系统架构,核心技术采用Eureka.Fegin.Ribbon.Zuul.Hystrix.Security.OAth.Mybatis.Ace-cache等主要框架和中间件,UI采用Bootstrap.jquery等前端组件. spring boot项目是使用spring boot + thymeleaf 开发个人博客项目. CSDN下载地址: https://do