JDK9新特性 Reactive Stream 响应式流

JDK9新特性 Reactive Stream 响应式流

 本篇主要讲解 JDK9特性 Reactive Stream 响应式流,介绍 Reactive Stream是什么 背压是什么,以及JDK9中提供的关于Reactive Stream的接口和 2个使用案例包括如何使用Processor。

 1.Reactive Stream 概念

 Reactive Stream (响应式流/反应流) 是JDK9引入的一套标准,是一套基于发布/订阅模式的数据处理规范。响应式流从2013年开始,作为提供非阻塞背压的异步流处理标准的倡议。 它旨在解决处理元素流的问题——如何将元素流从发布者传递到订阅者,而不需要发布者阻塞,或订阅者有无限制的缓冲区或丢弃。更确切地说,Reactive流目的是“找到最小的一组接口,方法和协议,用来描述必要的操作和实体以实现这样的目标:以非阻塞背压方式实现数据的异步流”。

 反应式流 (Reactive Stream) 规范诞生,定义了如下四个接口:

Subscription 接口定义了连接发布者和订阅者的方法
Publisher<T> 接口定义了发布者的方法
Subscriber<T> 接口定义了订阅者的方法
Processor<T,R> 接口定义了处理器

 Reactive Stream 规范诞生后,RxJava 从 RxJava 2 开始实现 Reactive Stream 规范 , 同时 Spring提供的Reactor 框架(WebFlux的基础) 等也相继实现了 Reactive Stream 规范

 下图展示了订阅者和发布者之间的交互

 2.背压(back pressure)概念

 如果生产者发出的信息比消费者能够处理消息最大量还要多,消费者可能会被迫一直在抓消息,耗费越来越多的资源,埋下潜在的崩溃风险。为了防止这一点,需要有一种机制使消费者可以通知生产者,降低消息的生成速度。生产者可以采用多种策略来实现这一要求,这种机制称为背压。

 简单来说就是

  • 背压指的发布者和订阅者之间的互动
  • 订阅者可以告诉发布者自己需要多少数据,可以调节数据流量,不会导致发布者发布数据过多导致数据浪费或压垮订阅者

 3.JDK9中 Reactive Stream规范的实现

 JDK9中Reactive Stream的实现规范 通常被称为 Flow API ,通过java.util.concurrent.Flow 和java.util.concurrent.SubmissionPublisher 类来实现响应式流

 在JDK9里Reactive Stream的主要接口声明在Flow类里,Flow 类中定义了四个嵌套的静态接口,用于建立流量控制的组件,发布者在其中生成一个或多个供订阅者使用的数据项:

  • Publisher:数据项发布者、生产者
  • Subscriber:数据项订阅者、消费者
  • Subscription:发布者与订阅者之间的关系纽带,订阅令牌
  • Processor:数据处理器

  3.1 发布者 Publisher

  Publisher 将数据流发布给注册的 Subscriber。 它通常使用 Executor 异步发布项目给订阅者。 Publisher 需要确保每个订阅的 Subscriber 方法严格按顺序调用。

  • subscribe:订阅者订阅发布者

      @FunctionalInterface
      public static interface Flow.Publisher<T> {
        public void subscribe(Subscriber<? super T> subscriber);
      }
    

  3.2 订阅者 Subscriber

  Subscriber 订阅 Publisher 的数据流,并接受回调。 如果 Subscriber 没有发出请求,就不会收到数据。对于给定 订阅合同(Subscription),调用 Subscriber 的方法是严格按顺序的。

  • onSubscribe:发布者调用订阅者的这个方法来异步传递订阅 , 这个方法在 publisher.subscribe方法调用后被执行
  • onNext:发布者调用这个方法传递数据给订阅者
  • onError:当 Publisher 或 Subscriber 遇到不可恢复的错误时调用此方法,之后不会再调用其他方法
  • onComplete:当数据已经发送完成,且没有错误导致订阅终止时,调用此方法,之后不再调用其他方法

  3.3 订阅合同 Subscription

  Subscription 用于连接 Publisher 和 Subscriber。Subscriber 只有在请求时才会收到项目,并可以通过 Subscription 取消订阅。Subscription 主要有两个方法:

  • request:订阅者调用此方法请求数据
  • cancel:订阅者调用这个方法来取消订阅,解除订阅者与发布者之间的关系
      public static interface Flow.Subscription {
        public void request(long n);
        public void cancel();
      }
    

  3.4 处理器 Processor

  Processor 位于 Publisher 和 Subscriber 之间,用于做数据转换。可以有多个 Processor 同时使用,组成一个处理链,链中最后一个处理器的处理结果发送给 Subscriber。JDK 没有提供任何具体的处理器。处理器同时是订阅者和发布者,接口的定义也是继承了两者 即作为订阅者也作为发布者 ,作为订阅者接收数据,然后进行处理,处理完后作为发布者,再发布出去。

/**
 * A component that acts as both a Subscriber and Publisher.
 *
 * [@param](https://my.oschina.net/u/2303379) <T> the subscribed item type
 * [@param](https://my.oschina.net/u/2303379) <R> the published item type
 */
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

 4.JDK9 中Reactive Stream(Flow API )规范调用流程

 Publisher是能够发出元素的发布者,Subscriber是接收元素并做出响应的订阅者。当执行Publisher里的subscribe方法时,发布者会回调订阅者的onSubscribe方法,这个方法中,通常订阅者会借助传入的Subscription向发布者请求n个数据。然后发布者通过不断调用订阅者的onNext方法向订阅者发出最多n个数据。如果数据全部发完,则会调用onComplete告知订阅者流已经发完;如果有错误发生,则通过onError发出错误数据,同样也会终止流。

 其中,Subscription相当于是连接Publisher和Subscriber的“纽带(合同)”。因为当发布者调用subscribe方法注册订阅者时,会通过订阅者的回调方法onSubscribe传入Subscription对象,之后订阅者就可以使用这个Subscription对象的request方法向发布者“要”数据了。背压机制正是基于此来实现的。

 5.案例一 响应式基础使用案例

  5.1 以下代码简单演示了SubmissionPublisher 和这套发布-订阅框架的基本使用方式:

  注意要使用JDK9以上的版本

/**
* [@author](https://my.oschina.net/arthor) johnny
* [@create](https://my.oschina.net/u/192469) 2020-02-24 下午5:44
**/
@Slf4j
public class ReactiveStreamTest {

public static void main(String[] args) throws InterruptedException {

    //1.创建 生产者Publisher JDK9自带的 实现了Publisher接口
    SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

    //2.创建 订阅者 Subscriber,需要自己去实现内部方法

    Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {

        private Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            System.out.println("订阅成功。。");
            subscription.request(1);
            System.out.println("订阅方法里请求一个数据");
        }

        @Override
        public void onNext(Integer item) {
            log.info("【onNext 接受到数据 item : {}】 ", item);
            subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) {
            log.info("【onError 出现异常】");
            subscription.cancel();
        }

        @Override
        public void onComplete() {
            log.info("【onComplete 所有数据接收完成】");
        }
    };

    //3。发布者和订阅者 建立订阅关系 就是回调订阅者的onSubscribe方法传入订阅合同
    publisher.subscribe(subscriber);

    //4.发布者 生成数据
    for (int i = 1; i <= 5; i++) {
        log.info("【生产数据 {} 】", i );
        //submit是一个阻塞方法,此时会调用订阅者的onNext方法
        publisher.submit(i);
    }

    //5.发布者 数据都已发布完成后,关闭发送,此时会回调订阅者的onComplete方法
    publisher.close();

    //主线程睡一会
    Thread.currentThread().join(100000);

  }
}

 打印输出结果

 看结果好像我们看不出来Reactive Stream有什么用 ,其实关键点在 publisher.submit(i); submit它是一个阻塞方法 让我们把代码修改一点

1.将onNext添加耗时操作,模拟业务耗时逻辑 2.增加发布者发布数据的数量,模拟真实场景 无限数据

    @Override
        public void onNext(Integer item) {
            log.info("【onNext 接受到数据 item : {}】 ", item);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            subscription.request(1);
        }

    //发布者 生成数据
    for (int i = 1; i <= 1000; i++) {
        log.info("【生产数据 {} 】", i );
        //submit是一个阻塞方法,此时会调用订阅者的onNext方法
        publisher.submit(i);
    }

 直接看打印

 会发现发布者 生成数据到256后就会停止生产,这是因为publisher.submit(i)方法是阻塞的, 内部有个缓冲数组最大容量就是256,只有当订阅者发送 subscription.request(1); 请求后,才会从缓冲数组里拿按照顺序拿出数据传给 onNext方法 供订阅者处理,当subscription.request(1)这个方法被调用后,发布者发现数组里没有满才会再生产数据,这样就防止了生产者一次生成过多的数据把订阅者压垮,从而实现了背压机制

 6.案例二 响应式带 Processor 使用案例

  6.1创建自定义Processor

package com.johnny.webflux.webfluxlearn.reactivestream;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

/**
* 自定义 Processor
*
* @author johnny
* @create 2020-02-25 下午1:56
**/
@Slf4j
public class MyProcessor extends SubmissionPublisher<Integer> implements Flow.Processor<Integer, Integer> {

private Flow.Subscription subscription;

@Override
public void onSubscribe(Flow.Subscription subscription) {
    log.info("【Processor 收到订阅请求】");
    //保存订阅关系,需要用它来给发布者 相应
    this.subscription = subscription;

    this.subscription.request(1);
}

@Override
public void onNext(Integer item) {
    log.info("【onNext 收到发布者数据  : {} 】", item);

    //做业务处理。。
    if (item % 2 == 0) {
        //筛选偶数 发送给 订阅者
        this.submit(item);
    }
    this.subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
    // 我们可以告诉发布者, 后面不接受数据了
    this.subscription.cancel();
 }

@Override
public void onComplete() {
    log.info("【处理器处理完毕】");
    this.close();
 }
}

  6.2 运行demo 关联publisher 和 Processor 和 subscriber

package com.johnny.webflux.webfluxlearn.reactivestream;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

/**
* 带Processor的案例
*
* @author johnny
* @create 2020-02-25 下午2:17
**/
@Slf4j
public class ProcessorDemo {

public static void main(String[] args) throws InterruptedException {

    //创建发布者
    SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

    //创建 Processor 即是发布者也是订阅者
    MyProcessor myProcessor = new MyProcessor();

    //创建最终订阅者
    Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {

        private Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            this.subscription.request(1);
        }

        @Override
        public void onNext(Integer item) {
            log.info("【onNext 从Processor 接受到过滤后的 数据 item : {}】 ", item);
            this.subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) {
            log.info("【onError 出现异常】");
            subscription.cancel();
        }

        @Override
        public void onComplete() {
            log.info("【onComplete 所有数据接收完成】");
        }
    };

    //建立关系 发布者和处理器, 此时处理器扮演 订阅者
    publisher.subscribe(myProcessor);

    //建立关系 处理器和订阅者  此时处理器扮演
    myProcessor.subscribe(subscriber);

    //发布者发布数据

    publisher.submit(1);
    publisher.submit(2);
    publisher.submit(3);
    publisher.submit(4);

    publisher.close();

    TimeUnit.SECONDS.sleep(2);

  }
}

 7.总结

 本篇主要讲解 JDK9特性 Reactive Stream 响应式流,介绍 Reactive Stream是什么 背压是什么,以及JDK9中提供的关于Reactive Stream的接口和 2个使用案例包括如何使用Processor。

 只需要关注JDK9提供的 4个接口,以及内部的方法,对着案例敲一遍代码 其实流程还是很简单的 加油吧!!!来源:35分类目录

原文地址:https://www.cnblogs.com/1994july/p/12384558.html

时间: 2024-11-09 06:27:34

JDK9新特性 Reactive Stream 响应式流的相关文章

Reactive Stream 响应式流

初识Reactive Stream Reactive Stream (响应式流/反应流) 是JDK9引入的一套标准,是一套基于发布/订阅模式的数据处理规范.响应式流从2013年开始,作为提供非阻塞背压的异步流处理标准的倡议. 它旨在解决处理元素流的问题--如何将元素流从发布者传递到订阅者,而不需要发布者阻塞,或订阅者有无限制的缓冲区或丢弃.更确切地说,Reactive流目的是"找到最小的一组接口,方法和协议,用来描述必要的操作和实体以实现这样的目标:以非阻塞背压方式实现数据的异步流".

Reactive(2) 响应式流与制奶厂业务

再谈响应式 在前一篇文章从Reactive编程到"好莱坞"中,谈到了响应式的一些概念,讲的有些发散. 但仅仅还是停留在概念的层面,对于实战性的东西并没有涉及.所以大家看了后,或许还是有些不痛不痒. 响应式编程强调的是异步化.面向流的处理方式,这两者也并非凭空生出,而是从大量的技术实践中总结提炼出来的概念,就比如: 我们谈异步化,容易联想到 Java 异步IO(Asynchronized IO),而且习惯于将其和 BIO.NIO等概念来做对比. 殊不知,老早出现的 Swing 框架(Ja

JVM平台上的响应式流(Reactive Streams)规范

// Reactive Streams // 响应式流是一个倡议,用来为具有非阻塞后压的异步流处理提供一个标准.大家努力的目标集中在运行时环境(JVM和JavaScript)和网络协议上. 注:响应式流其实就是一个规范,本文讲解的正是这个规范,且这个规范已经被引入到JDK9里了. 后压:就是下游出现了问题,得不到解决时,这个问题就会逆流而上,继而影响上游. 如果一个路口红绿灯坏了造成堵车,如果不管的话,用不了太长时间,车就会堵到上一个路口,如果再不管的话,整条路都会被赌满. // JDK9里的j

gateway&amp;reactive(响应式流)函数编程的webflux

springcloud.gateway是springcloud2的全新项目,该项目提供了一个构建在spring生态之上的API网关,包括spring5,springboot2,projectReactor.gateway旨在提高一种简单而有效的途径来转发请求,并为他们提供横切关注点,如安全性,监控/指标和弹性.在之前springcloud提供的网关是zull,zuul基于servlet2.5,使用阻塞架构,不支持长连接.zuul和negix相似,除了编程语言不同,zuul已经发布了zuul2,支

(2)响应式流——响应式Spring的道法术器

本系列文章索引:<响应式Spring的道法术器>.前情提要: 什么是响应式编程 1.2 响应式流 上一节留了一个坑--为啥不用Java Stream来进行数据流的操作? 原因在于,若将其用于响应式编程中,是有局限性的.比如如下两个需要面对的问题: Web 应用具有I/O密集的特点,I/O阻塞会带来比较大的性能损失或资源浪费,我们需要一种异步非阻塞的响应式的库,而Java Stream是一种同步API. 假设我们要搭建从数据层到前端的一个变化传递管道,可能会遇到数据层每秒上千次的数据更新,而显然

Java 8 新特性:Java 类库的新特性之 Stream类 ——诺诺&quot;涂鸦&quot;记忆

----------   诺诺学习技术交流博客.期待与您交流!    ---------- 详情请查看:http://blog.csdn.net/sun_promise  Java 类库的新特性之 Stream类 (注:此文中涉及到的一部分图片为网络图片,若有问题,请联系我将其删除.) 一.Java8对IO/NIO 的改进 Java 8 对 IO/NIO 也做了一些改进,主要包括: 改进了java.nio.charset.Charset 的实现,使编码和解码的效率得以提升: 精简了jre/lib

jdk1.8新特性之stream流

在jdk1.5的时候,我们需要掌握枚举:反射.注解.泛型.现在java14都出来了 jdk1.8的新特性:函数式接口.链式编程.stream流.lambda表达式 都掌握的怎么样了? 本篇将着重说明 Stream 流的用法 面试题: 按条件筛选用户,请你只用一行代码完成! 1.id 为偶数 2.年龄大于24 3.用户名大写 4.用户名倒排序 5.输出一个用户 代码(User 实体类省略): package stream; import com.coding.test.entity.User; i

Reactive Cocoa 响应式编程开发实例讲解-中篇

上一篇文章作为开门篇讲述了Cocoa Reactive概述. 这里我们详细介绍一下CocoaReative在代码中的应用. 网上好多blog有人形容CocoaReative 中 signals是插座或者水龙头,感觉不是很好理解.我举个更贴近生活的,用电话订菜(餐馆是Signals,电话订阅是SubScriberNext). 1.概述 Create一个Signal我们视为是一个支持电话订餐的餐馆,他们有很多菜,油盐酱醋就更不用说,当一个电话打进来首先,这个Signal就开始执行,等菜做好了,菜馆要

JAVA8新特性之 Stream API

重要的 Stream API  : java.util.Stream.* Stream提供了一种高效且简易处理数据的方式 注意:1.Stream自己不会存储元素 2.对Stream进行操作不会改变数据源,相反,会产生一个执有结果的新Stream 3.Stream操作是延迟执行的.只有进行了终止操作才会产生结果 (并行流就是把内容分成多个数据块,并用不同的线程分别处理每个数据块.在Java8 后,Stream API可以声明性的通过parallel()与sequential()在并行流与串行流(顺