Reactive Stream 响应式流

初识Reactive Stream

Reactive Stream (响应式流/反应流) 是JDK9引入的一套标准,是一套基于发布/订阅模式的数据处理规范。响应式流从2013年开始,作为提供非阻塞背压的异步流处理标准的倡议。 它旨在解决处理元素流的问题——如何将元素流从发布者传递到订阅者,而不需要发布者阻塞,或订阅者有无限制的缓冲区或丢弃。更确切地说,Reactive流目的是“找到最小的一组接口,方法和协议,用来描述必要的操作和实体以实现这样的目标:以非阻塞背压方式实现数据的异步流”。

“背压(反压)back pressure”概念很关键。首先异步消费者会向生产者订阅接收消息,然后当有新的信息可用时,消费者会通过之前订阅时提供的回调函数被再次激活调用。如果生产者发出的信息比消费者能够处理消息最大量还要多,消费者可能会被迫一直在抓消息,耗费越来越多的资源,埋下潜在的崩溃风险。为了防止这一点,需要有一种机制使消费者可以通知生产者,降低消息的生成速度。生产者可以采用多种策略来实现这一要求,这种机制称为背压。

响应式流模型非常简单——订阅者向发布者发送多个元素的异步请求,发布者向订阅者异步发送多个或稍少的元素。响应式流会在pull模型和push模型流处理机制之间动态切换。 当订阅者较慢时,它使用pull模型,当订阅者更快时使用push模型。

简单来说,在响应式流下订阅者可以与发布者沟通,如果使用JMS就应该知道,订阅者只能被动接收发布者所产生的消息数据。这就好比没有水龙头的水管一样,我只能被动接收水管里流过来的水,无法关闭也无法减少。而响应式流就相当于给水管加了个水龙头,在消费者这边可以控制水流的增加、减少及关闭。

响应式流模型图:

发布者(Publisher)是潜在的无限数量的有序元素的生产者。发布者可能有多个来自订阅者的待处理请求。

  • 根据收到的要求向当前订阅者发布(或发送)元素。

订阅者(Subscriber)从发布者那里订阅并接收元素。订阅者可以请求更多的元素。

  • 发布者向订阅者发送订阅令牌(Subscription)。
  • 使用订阅令牌,订阅者从发布者那里请求多个元素。
  • 当元素准备就绪时,发布者向订阅者发送多个或更少的元素。

Reactive Stream主要接口

JDK9 通过java.util.concurrent.Flow 和java.util.concurrent.SubmissionPublisher 类来实现响应式流。在JDK9里Reactive Stream的主要接口声明在Flow类里,Flow 类中定义了四个嵌套的静态接口,用于建立流量控制的组件,发布者在其中生成一个或多个供订阅者使用的数据项:

  • Publisher:数据项发布者、生产者
  • Subscriber:数据项订阅者、消费者
  • Subscription:发布者与订阅者之间的关系纽带,订阅令牌
  • Processor:数据处理器

Flow类结构如下:

Publisher是能够发出元素的发布者,Subscriber是接收元素并做出响应的订阅者。当执行Publisher里的subscribe方法时,发布者会回调订阅者的onSubscribe方法,这个方法中,通常订阅者会借助传入的Subscription向发布者请求n个数据。然后发布者通过不断调用订阅者的onNext方法向订阅者发出最多n个数据。如果数据全部发完,则会调用onComplete告知订阅者流已经发完;如果有错误发生,则通过onError发出错误数据,同样也会终止流。

其中,Subscription相当于是连接PublisherSubscriber的“纽带”。因为当发布者调用subscribe方法注册订阅者时,会通过订阅者的回调方法onSubscribe传入Subscription对象,之后订阅者就可以使用这个Subscription对象的request方法向发布者“要”数据了。背压机制正是基于此来实现的。

如下图:

Processor则是集PublisherSubscriber于一身,相当于是发布者与订阅者之间的一个”中间人“,可以通过Processor进行一些中间操作:

/**
 * A component that acts as both a Subscriber and Publisher.
 *
 * @param <T> the subscribed item type
 * @param <R> the published item type
 */
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

如下图:

参考:

https://blog.csdn.net/rickiyeat/article/details/78175962


响应流使用示例

1.以下代码简单演示了SubmissionPublisher 和这套发布-订阅框架的基本使用方式:

package com.example.demo;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

/**
 * @program: demo
 * @description: Flow Demo
 * @author: 01
 * @create: 2018-10-04 13:25
 **/
public class FlowDemo {

    public static void main(String[] args) throws Exception {
        // 1. 定义发布者, 发布的数据类型是 Integer
        // 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<>();

        // 2. 定义订阅者
        Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {

            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                // 保存订阅关系, 需要用它来给发布者响应
                this.subscription = subscription;

                // 请求一个数据
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                // 接受到一个数据, 处理
                System.out.println("接受到数据: " + item);

                // 处理完调用request再请求一个数据
                this.subscription.request(1);

                // 或者已经达到了目标, 可以调用cancel告诉发布者不再接受数据了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出现了异常(例如处理数据的时候产生了异常)
                throwable.printStackTrace();

                // 我们可以告诉发布者, 后面不接受数据了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部数据处理完了(发布者关闭了)
                System.out.println("处理完了!");
            }

        };

        // 3. 发布者和订阅者 建立订阅关系
        publiser.subscribe(subscriber);

        // 4. 生产数据, 并发布
        // 这里忽略数据生产过程
        for (int i = 0; i < 3; i++) {
            System.out.println("生成数据:" + i);
            // submit是个block方法
            publiser.submit(i);
        }

        // 5. 结束后 关闭发布者
        // 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
        publiser.close();

        // 主线程延迟停止, 否则数据没有消费就会退出
        Thread.currentThread().join(1000);
        // debug的时候, 下面这行需要有断点
        // 否则主线程结束无法debug
        System.out.println();
    }
}

运行结果如下:

上文中提到过可以调节发布者的数据产出速度,那么这个速度是如何调节的呢?关键就在于submit方法,该方法是一个阻塞方法。需要先说明的是SubmissionPublisher里有一个数据缓冲区,用于缓冲发布者产生的数据,而这个缓冲区是利用一个Object数组实现的,缓冲区最大长度为256。我们可以在onSubscribe方法里打上断点,查看到这个缓冲区:

当这个缓冲区的数据满了之后,submit方法就会进入阻塞状态,发布者数据的产生速度就会变慢,以此实现调节发布者的数据产出速度。



2.第二个例子演示了结合Processor的使用方式,代码如下:

package com.example.demo;

import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;

/**
 * Processor, 需要继承SubmissionPublisher并实现Processor接口
 *
 * 输入源数据 integer, 过滤掉小于0的, 然后转换成字符串发布出去
 */
class MyProcessor extends SubmissionPublisher<String>
        implements Processor<Integer, String> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        // 保存订阅关系, 需要用它来给发布者响应
        this.subscription = subscription;

        // 请求一个数据
        this.subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        // 接受到一个数据, 处理
        System.out.println("处理器接受到数据: " + item);

        // 过滤掉小于0的, 然后发布出去
        if (item > 0) {
            this.submit("转换后的数据:" + item);
        }

        // 处理完调用request再请求一个数据
        this.subscription.request(1);

        // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
        // this.subscription.cancel();
    }

    @Override
    public void onError(Throwable throwable) {
        // 出现了异常(例如处理数据的时候产生了异常)
        throwable.printStackTrace();

        // 我们可以告诉发布者, 后面不接受数据了
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        // 全部数据处理完了(发布者关闭了)
        System.out.println("处理器处理完了!");
        // 关闭发布者
        this.close();
    }

}

/**
 * 带 process 的 flow demo
 * @author 01
 */
public class FlowDemo2 {

    public static void main(String[] args) throws Exception {
        // 1. 定义发布者, 发布的数据类型是 Integer
        // 直接使用jdk自带的SubmissionPublisher
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<>();

        // 2. 定义处理器, 对数据进行过滤, 并转换为String类型
        MyProcessor processor = new MyProcessor();

        // 3. 发布者 和 处理器 建立订阅关系
        publiser.subscribe(processor);

        // 4. 定义最终订阅者, 消费 String 类型数据
        Subscriber<String> subscriber = new Subscriber<>() {

            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                // 保存订阅关系, 需要用它来给发布者响应
                this.subscription = subscription;

                // 请求一个数据
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                // 接受到一个数据, 处理
                System.out.println("接受到数据: " + item);

                // 处理完调用request再请求一个数据
                this.subscription.request(1);

                // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出现了异常(例如处理数据的时候产生了异常)
                throwable.printStackTrace();

                // 我们可以告诉发布者, 后面不接受数据了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部数据处理完了(发布者关闭了)
                System.out.println("处理完了!");
            }

        };

        // 5. 处理器 和 最终订阅者 建立订阅关系
        processor.subscribe(subscriber);

        // 6. 生产数据, 并发布
        // 这里忽略数据生产过程
        publiser.submit(-111);
        publiser.submit(111);

        // 7. 结束后 关闭发布者
        // 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
        publiser.close();

        // 主线程延迟停止, 否则数据没有消费就退出
        Thread.currentThread().join(1000);
    }
}

运行结果如下:

原文地址:http://blog.51cto.com/zero01/2293823

时间: 2024-10-09 04:17:23

Reactive Stream 响应式流的相关文章

JDK9新特性 Reactive Stream 响应式流

JDK9新特性 Reactive Stream 响应式流  本篇主要讲解 JDK9特性 Reactive Stream 响应式流,介绍 Reactive Stream是什么 背压是什么,以及JDK9中提供的关于Reactive Stream的接口和 2个使用案例包括如何使用Processor.  1.Reactive Stream 概念  Reactive Stream (响应式流/反应流) 是JDK9引入的一套标准,是一套基于发布/订阅模式的数据处理规范.响应式流从2013年开始,作为提供非阻

Reactive(2) 响应式流与制奶厂业务

再谈响应式 在前一篇文章从Reactive编程到"好莱坞"中,谈到了响应式的一些概念,讲的有些发散. 但仅仅还是停留在概念的层面,对于实战性的东西并没有涉及.所以大家看了后,或许还是有些不痛不痒. 响应式编程强调的是异步化.面向流的处理方式,这两者也并非凭空生出,而是从大量的技术实践中总结提炼出来的概念,就比如: 我们谈异步化,容易联想到 Java 异步IO(Asynchronized IO),而且习惯于将其和 BIO.NIO等概念来做对比. 殊不知,老早出现的 Swing 框架(Ja

JVM平台上的响应式流(Reactive Streams)规范

// Reactive Streams // 响应式流是一个倡议,用来为具有非阻塞后压的异步流处理提供一个标准.大家努力的目标集中在运行时环境(JVM和JavaScript)和网络协议上. 注:响应式流其实就是一个规范,本文讲解的正是这个规范,且这个规范已经被引入到JDK9里了. 后压:就是下游出现了问题,得不到解决时,这个问题就会逆流而上,继而影响上游. 如果一个路口红绿灯坏了造成堵车,如果不管的话,用不了太长时间,车就会堵到上一个路口,如果再不管的话,整条路都会被赌满. // JDK9里的j

gateway&amp;reactive(响应式流)函数编程的webflux

springcloud.gateway是springcloud2的全新项目,该项目提供了一个构建在spring生态之上的API网关,包括spring5,springboot2,projectReactor.gateway旨在提高一种简单而有效的途径来转发请求,并为他们提供横切关注点,如安全性,监控/指标和弹性.在之前springcloud提供的网关是zull,zuul基于servlet2.5,使用阻塞架构,不支持长连接.zuul和negix相似,除了编程语言不同,zuul已经发布了zuul2,支

(2)响应式流——响应式Spring的道法术器

本系列文章索引:<响应式Spring的道法术器>.前情提要: 什么是响应式编程 1.2 响应式流 上一节留了一个坑--为啥不用Java Stream来进行数据流的操作? 原因在于,若将其用于响应式编程中,是有局限性的.比如如下两个需要面对的问题: Web 应用具有I/O密集的特点,I/O阻塞会带来比较大的性能损失或资源浪费,我们需要一种异步非阻塞的响应式的库,而Java Stream是一种同步API. 假设我们要搭建从数据层到前端的一个变化传递管道,可能会遇到数据层每秒上千次的数据更新,而显然

Reactive Cocoa 响应式编程开发实例讲解-中篇

上一篇文章作为开门篇讲述了Cocoa Reactive概述. 这里我们详细介绍一下CocoaReative在代码中的应用. 网上好多blog有人形容CocoaReative 中 signals是插座或者水龙头,感觉不是很好理解.我举个更贴近生活的,用电话订菜(餐馆是Signals,电话订阅是SubScriberNext). 1.概述 Create一个Signal我们视为是一个支持电话订餐的餐馆,他们有很多菜,油盐酱醋就更不用说,当一个电话打进来首先,这个Signal就开始执行,等菜做好了,菜馆要

SpringBoot2.0不容错过的新特性 WebFlux响应式编程

第1章 课程介绍 课程介绍及导学 第2章 函数式编程和lambda表达式 本章介绍函数式编程的概念,和lambda表达式的基础语法,并分析了惰性求值的应用和实现.最后同意反编译字节码,重点剖析了lambda表达式的底层实现原理 第3章 Stream流编程 本章介绍jdk8里面stream流编程的重要知识点,并剖析流的运行机制和实现原理 第4章 reactive stream 响应式流 本章介绍jdk9的响应式流的开发过程,重点讲解响应式流的4个接口,以及背压的概念和jdk实现背压的关键. 第5章

Java响应式编程 Springboot WebFlux基础与实战

第1章 课程介绍课程介绍及导学1-1 导学 第2章 函数式编程和lambda表达式本章介绍函数式编程的概念,和lambda表达式的基础语法,并分析了惰性求值的应用和实现.最后同意反编译字节码,重点剖析了lambda表达式的底层实现原理2-1 概念2-2 为什么要使用函数式编程-12-3 为什么要使用函数式编程-22-4 lambda初接触-12-5 lambda初接触-22-6 jdk8接口新特性-12-7 jdk8接口新特性-22-8 jdk8接口新特性-32-9 函数接口-12-10 函数接

5分钟理解 SpringBoot 响应式的核心-Reactor

目录 一.前言 二. Mono 与 Flux 构造器 三. 流计算 1. 缓冲 2. 过滤/提取 3. 转换 4. 合并 5. 合流 6. 累积 四.异常处理 五.线程调度 小结 参考阅读 一.前言 关于 响应式 Reactive,前面的两篇文章谈了不少概念,基本都离不开下面两点: 响应式编程是面向流的.异步化的开发方式 响应式是非常通用的概念,无论在前端领域.还是实时流.离线处理场景中都是适用的. 有兴趣的朋友可以看看这两篇文章: Reactive(1) 从响应式编程到"好莱坞" R