通过Spring Boot Webflux实现Reactor Kafka

在Apache Kafka简介中,我们研究了分布式流媒体平台Apache Kafka。这一次,我们将关注Reactor Kafka,这个库可以创建从Project Reactor到Kafka Topics的Reactive Streams,反之亦然。

我们将使用两个小型示例应用程序,Paymentprocessor Gateway和PaymentValidator。这些应用程序的代码可以在这里找到。

Paymentprocessor网关提供了一个小网页,可以生成一个随机的信用卡号码(显然是伪造的),以及支付金额。当用户单击提交按钮时,表单将提交给网关的API。API具有针对Kafka群集上的未确认事务主题的反应流,这个未确认事务的主题的另外一边消费者是PaymentValidator,监听要验证的传入消息。然后,这些消息通过响应管道,验证方法将其打印到命令行。

通过Reactive Streams向Kafka发送消息

我们的应用程序构建在Spring 5和Spring Boot 2之上,使我们能够快速设置和使用Project Reactor。

Gateway应用程序的目标是设置从Web控制器到Kafka集群的Reactive流。这意味着我们需要特定的依赖关系来弹簧webflux和reactor-kafka。

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>

Spring Webflux RestController提供支付API,为paymentGateway类的doPayment方法创建一个Reactive流。

/ **
     *调用返回的Mono将被发送到Spring Webflux,后者依赖于multi-reactor 事件循环和NIO
     *以非阻塞方式处理请求,从而实现更多的并发请求。结果将
     通过一个名为Server Sent Events 发送。
     ** /
@PostMapping(value = "/payment")
    public Mono<Void> doPayment(@RequestBody CreatePaymentCommand payment) {
    / **
             当调用doPayment方法时,我们发送付款信息,获得Mono <Void>作为响应。
             当我们的付款成功发送事件到Kafka主题
             ** /
            return paymentGateway.doPayment(payment);
}

paymentGateway需要一个kafkaProducer,它使我们能够将消息作为管道的一部分放在Kafka主题中。它可以使用KafkaSender.create方法轻松创建,传递许多生产者选项。

public PaymentGatewayImpl() {
    final Map<String, Object> producerProps = new HashMap<>();
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    final SenderOptions<Integer, String> producerOptions = SenderOptions.create(producerProps);
    kafkaProducer = KafkaSender.create(producerOptions);
}

创建之后,kafkaProducer可以用来轻松地将我们的消息发送到选择的Kafka主题,成为控制器中启动的管道的一部分。因为消息是以非阻塞方式发送到Kafka集群的,所以我们可以使用项目Reactor的事件循环接收并将来自Web API的大量并发消息路由到Kafka。

@Override
    public Mono<Void> doPayment(final CreatePaymentCommand createPayment) {
    final PaymentEvent payment = new PaymentEvent(createPayment.getId(), createPayment.getCreditCardNumber(), createPayment.getAmount(), gatewayName);
    String payload = toBinary(payment);
    SenderRecord<Integer, String, Integer> message = SenderRecord.create(new ProducerRecord<>("unconfirmed-transactions", payload), 1);
    return kafkaProducer.send(Mono.just(message)).next();
}
private String toBinary(Object object) {
    try {
        return objectMapper.writeValueAsString(object);
    }
    catch (JsonProcessingException e) {
        throw new IllegalArgumentException(e);
    }
}

从Kafka主题创建反应流

当没有消费者监听时,向主题发送消息没有多大意义,因此我们的第二个应用程序将使用一个反应管道来监听未确认的事务主题。为此,使用KafkaReceiver.create方法创建kafkaReceiver对象,类似于我们之前创建kafkaProducer的方法。

通过使用kafkaReceiver.receive方法,我们可以获得receiverRecords的Flux。进入我们读取的主题中每条消息都放入receiverRecord中。流入应用程序后,它们会进一步通过反应管道。然后,这些消息传递processEvent方法,该方法调用paymentValidator,该方法将一些信息输出到控制台。最后,在receiverOffset上调用acknowledge方法,向Kafka集群发送一条消息已被处理的确认。

public PaymentValidatorListenerImpl(PaymentValidator paymentValidator) {
    this.paymentValidator = paymentValidator;
    final Map<String, Object> consumerProps = new HashMap<>();
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "payment-validator-1");
    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-validator");
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    ReceiverOptions<Object, Object> consumerOptions = ReceiverOptions.create(consumerProps)
                    .subscription(Collections.singleton("unconfirmed-transactions"))
                    .addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
                    .addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
    kafkaReceiver = KafkaReceiver.create(consumerOptions);
    /**
         * We create a receiver for new unconfirmed transactions
         */
    ((Flux<ReceiverRecord>) kafkaReceiver.receive())
                    .doOnNext(r -> {
        /**
                     * Each unconfirmed payment we receive, we convert to a PaymentEvent and process it
                     */
        final PaymentEvent paymentEvent = fromBinary((String) r.value(), PaymentEvent.class);
        processEvent(paymentEvent);
        r.receiverOffset().acknowledge();
    }
    )
                    .subscribe();
}
private void processEvent(PaymentEvent paymentEvent) {
    paymentValidator.calculateResult(paymentEvent);
}
private <T> T fromBinary(String object, Class<T> resultType) {
    try {
        return objectMapper.readValue(object, resultType);
    }
    catch (IOException e) {
        throw new IllegalArgumentException(e);
    }
}

原文地址:https://blog.51cto.com/13754022/2383786

时间: 2024-07-30 22:04:37

通过Spring Boot Webflux实现Reactor Kafka的相关文章

Spring Boot (十四): 响应式编程以及 Spring Boot Webflux 快速入门

1. 什么是响应式编程 在计算机中,响应式编程或反应式编程(英语:Reactive programming)是一种面向数据流和变化传播的编程范式.这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播. 例如,在命令式编程环境中,a=b+c 表示将表达式的结果赋给 a,而之后改变 b 或 c 的值不会影响 a .但在响应式编程中,a 的值会随着 b 或 c 的更新而更新. 响应式编程是基于异步和事件驱动的非阻塞程序,只需要在程序内启动少量线程扩

Kafka 入门和 Spring Boot 集成

Kafka 入门和 Spring Boot 集成 标签:博客 [TOC] 概述 kafka 是一个高性能的消息队列,也是一个分布式流处理平台(这里的流指的是数据流).由java 和 Scala 语言编写,最早由 LinkedIn 开发,并 2011年开源,现在由 Apache 开发维护. 应用场景 下面列举了一些kafka常见的应用场景. 消息队列 : Kafka 可以作为消息队列使用,可用于系统内异步解耦,流量削峰等场景. 应用监控:利用 Kafka 采集应用程序和服务器健康相关的指标,如应用

spring boot+kafka整合(未完待续)

springboot版本是2.0.4 在整合过程中,spring boot帮我们把kafka的大部分属性直接带出来了,但是有些不常用的属性,需要通过 spring.kafka.consumer.properties.* 来设置,例如max.partition.fetch.bytes,一次fetch请求,从一个partition中取得的records最大值. 在application.properties中添加kafka扩展属性, #设置一次fetch记录的最大值2M(2*1024*1024),默

20191114 Spring Boot官方文档学习(4.7)

4.7.开发Web应用程序 Spring Boot非常适合于Web应用程序开发.您可以使用嵌入式Tomcat,Jetty,Undertow或Netty创建独立的HTTP服务器.大多数Web应用程序都使用该spring-boot-starter-web模块来快速启动和运行.您还可以选择使用spring-boot-starter-webflux模块来构建反应式Web应用程序. 4.7.1.Spring Web MVC框架 在Spring Web MVC框架(通常简称为"Spring MVC"

Spring Boot 2.0 WebFlux 教程 (一) | 入门篇

目录 一.什么是 Spring WebFlux 二.WebFlux 的优势&性能 三.WebFlux 应用场景 四.适用性 五.快速入门 5.1 添加 webflux 依赖 5.2 定义接口 5.3 测试接口 六.总结 七.GitHub 示例代码 一.什么是 Spring WebFlux 下图截自 Spring Boot 官方网站: 结合上图,在了解 Spring WebFlux 之前,我们先来对比说说什么是 Spring MVC,这更有益我们去理解 WebFlux,图右边对 Spring MV

Spring Kafka和Spring Boot整合实现消息发送与消费简单案例

本文主要分享下Spring Boot和Spring Kafka如何配置整合,实现发送和接收来自Spring Kafka的消息. 先前我已经分享了Kafka的基本介绍与集群环境搭建方法.关于Kafka的介绍请阅读Apache Kafka简介与安装(一),关于Kafka安装请阅读Apache Kafka安装,关于Kafka集群环境搭建请阅读Apache Kafka集群环境搭建 .这里关于服务器环境搭建不在赘述. Spring Kafka整合Spring Boot创建生产者客户端案例 创建一个kafk

spring boot 整合kafka 报错 Exception thrown when sending a message with key=&#39;null&#39; and payload=JSON to topic proccess_trading_end: TimeoutException: Failed to update metadata after 60000 ms.

org.springframework.kafka.support.LoggingProducerListener- Exception thrown when sending a message with key='null' and payload='{"dataDts":["20180329","20180328","20180327","20180326","20180323"]

Kafka的安装及与Spring Boot的集成

安装JDK 下载jdk-8u202-ea-bin-b03-linux-x64-07_nov_2018.tar.gz 解压 配置 $ vi /etc/profile,在最后加入下面两行 export JAVA_HOME=/usr/local/bigdata/jdk1.8.0_202 export PATH=$JAVA_HOME/bin:$PATH 重新登录执行 java,验证JDK配置成功 安装Kafka 下载kafka_2.11-1.0.2.tgz,这里主要1.0.2这个Kafka Server

Spring boot 集成Kafka

搭建Kafka集群,参考: https://www.cnblogs.com/jonban/p/kafka.html 源码示例如下: 1.新建 Maven 项目 kafka 2.pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http:/