(19)Reactor Processors——响应式Spring的道法术器

本系列文章索引《响应式Spring的道法术器》
前情提要 响应式流 | Reactor 3快速上手 | 响应式流规范

2.9 Processor

Processor既是一种特别的发布者(Publisher)又是一种订阅者(Subscriber)。 所以你能够订阅一个Processor,也可以调用它们提供的方法来手动插入数据到序列,或终止序列。

前面一直在聊响应式流的四个接口中的三个:PublisherSubscriberSubscription,唯独Processor迟迟没有提及。原因在于想用好它们不太容易,多数情况下,我们应该进行避免使用Processor,通常来说仅用于一些特殊场景。

2.9.1 使用 Sink 来线程安全地生成流

比起直接使用Processor,更好的方式是通过调用sink()来得到它的Sink。这个Sink是线程安全的,可以用于在应用程序中多线程并发地生成数据。例如,通过UnicastProcessor得到一个线程安全的 sink:

    UnicastProcessor<Integer> processor = UnicastProcessor.create();
    FluxSink<Integer> sink = processor.sink(overflowStrategy);

多个线程可以并发地通过下边的方法生成数据到sink。

    sink.next(n);

看到这里是不是感觉跟generate生成数据流的方式很像?所以Reactor官方建议,当你想要使用Processor的时候,首先看看能否用generate实现同样的功能,或者看看是否有相应的操作符可以达到你想要的效果

2.9.2 Reactor 内置的 Processor

Reactor Core 内置多种 Processor。这些 processor 具有不同的语法,大概分为三类。

  • 直接的(direct)(DirectProcessor 和 UnicastProcessor):这些 processors 只能通过直接 调用 Sink 的方法来推送数据。
  • 同步的(synchronous)(EmitterProcessor 和 ReplayProcessor):这些 processors 既可以直接调用 Sink 方法来推送数据,也可以通过订阅到一个上游的发布者来同步地产生数据。
  • 异步的(asynchronous)(WorkQueueProcessor 和 TopicProcessor):这些 processors 可以将从多个上游发布者得到的数据推送下去。由于使用了 RingBuffer 的数据结构来缓存多个来自上游的数据,因此更加有健壮性。

异步的 processor 在实例化的时候最复杂,因为有许多不同的选项。因此它们暴露出一个 Builder 接口。 而简单的 processors 有静态的工厂方法。

1)DirectProcessor

DirectProcessor可以将信号分发给零到多个订阅者(Subscriber)。它是最容易实例化的,使用静态方法 create() 即可。另一方面,它的不足是无法处理背压。所以,当DirectProcessor推送的是 N 个元素,而至少有一个订阅者的请求个数少于 N 的时候,就会发出一个IllegalStateException

一旦 Processor 结束(通常通过调用它的 Sink 的 error(Throwable) 或 complete() 方法), 虽然它允许更多的订阅者订阅它,但是会立即向它们重新发送终止信号。

2)UnicastProcessor

UnicastProcessor可以使用一个内置的缓存来处理背压。代价就是它最多只能有一个订阅者(上一节的例子通过publish转换成了ConnectableFlux,所以可以接入两个订阅者)。

UnicastProcessor有多种选项,因此提供多种不同的create静态方法。例如,它默认是 无限的(unbounded) :如果你在在订阅者还没有请求数据的情况下让它推送数据,它会缓存所有数据。

可以通过提供一个自定义的 Queue 的具体实现传递给 create 工厂方法来改变默认行为。如果给出的队列是有限的(bounded), 并且缓存已满,而且未收到下游的请求,processor 会拒绝推送数据。

在上边“有限的”例子中,还可以在构造 processor 的时候提供一个回调方法,这个回调方法可以在每一个 被拒绝推送的元素上调用,从而让开发者有机会清理这些元素。

3)EmitterProcessor

EmitterProcessor能够向多个订阅者发送数据,并且可以对每一个订阅者进行背压处理。它本身也可以订阅一个发布者并同步获得数据。

最初如果没有订阅者,它仍然允许推送一些数据到缓存,缓存大小由bufferSize定义。 之后如果仍然没有订阅者订阅它并消费数据,对onNext的调用会阻塞,直到有订阅者接入 (这时只能并发地订阅了)。

因此第一个订阅者会收到最多bufferSize个元素。然而之后,后续接入的订阅者只能获取到它们开始订阅之后推送的数据。这个内部的缓存会继续用于背压的目的。

默认情况下,如果所有的订阅者都取消了订阅,它会清空内部缓存,并且不再接受更多的订阅者。这一点可以通过 create 静态工厂方法的 autoCancel 参数来配置。

4)ReplayProcessor

ReplayProcessor会缓存直接通过自身的 Sink 推送的元素,以及来自上游发布者的元素, 并且后来的订阅者也会收到重发(replay)的这些元素。

可以通过多种配置方式创建它:

  • 缓存一个元素(cacheLast)。
  • 缓存一定个数的历史元素(create(int)),所有的历史元素(create())。
  • 缓存基于时间窗期间内的元素(createTimeout(Duration))。
  • 缓存基于历史个数和时间窗的元素(createSizeOrTimeout(int, Duration))。

5)TopicProcessor

TopicProcessor是一个异步的 processor,它能够重发来自多个上游发布者的元素, 这需要在创建它的时候配置shared(build() 的 share(boolean) 配置)。

如果你企图在并发环境下通过并发的上游发布者调用TopicProcessoronNextonComplete,或onError方法,就必须配置shared。否则,并发调用就是非法的,从而 processor 是完全兼容响应式流规范的。

TopicProcessor能够对多个订阅者发送数据。它通过对每一个订阅者关联一个线程来实现这一点, 这个线程会一直执行直到 processor 发出onErroronComplete信号,或关联的订阅者被取消。 最多可以接受的订阅者个数由构造者方法executor指定,通过提供一个有限线程数的 ExecutorService来限制这一个数。

这个 processor 基于一个RingBuffer数据结构来存储已发送的数据。每一个订阅者线程 自行管理其相关的数据在RingBuffer中的索引。

这个 processor 也有一个autoCancel构造器方法:如果设置为true(默认的),那么当 所有的订阅者取消之后,上游发布者也就被取消了。

6)WorkQueueProcessor

WorkQueueProcessor也是一个异步的 processor,也能够重发来自多个上游发布者的元素, 同样在创建时需要配置shared(它多数构造器配置与TopicProcessor相同)。

它放松了对响应式流规范的兼容,但是好处就在于相对于TopicProcessor来说需要更少的资源。 它仍然基于RingBuffer,但是不再要求每一个订阅者都关联一个线程,因此相对于TopicProcessor来说更具扩展性。

代价在于分发模式有些区别:来自订阅者的请求会汇总在一起,并且这个 processor 每次只对一个 订阅者发送数据,因此需要循环(round-robin)对订阅者发送数据,而不是一次全部发出的模式(无法保证完全公平的循环分发)。

WorkQueueProcessor多数构造器方法与TopicProcessor相同,比如autoCancelshare, 以及waitStrategy。下游订阅者的最大数目同样由构造器executor配置的ExecutorService 决定。

注意:最好不要有太多订阅者订阅WorkQueueProcessor,因为这会锁住 processor。如果你需要限制订阅者数量,最好使用一个ThreadPoolExecutorForkJoinPool。这个 processor 能够检测到(线程池)容量并在订阅者过多时抛出异常。



本文的介绍并未给出示例,在下一章我们编写“响应式Netty”的时候会介绍到Processor的使用。

原文地址:http://blog.51cto.com/liukang/2103560

时间: 2024-11-07 13:35:44

(19)Reactor Processors——响应式Spring的道法术器的相关文章

(12)Reactor 3 自定义数据流——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 响应式流 | Reactor 3快速上手 | 响应式流规范本文源码 2.2 自定义数据流 这一小节介绍如何通过定义相应的事件(onNext.onError和onComplete) 创建一个 Flux 或 Mono.Reactor提供了generate.create.push和handle等方法,所有这些方法都使用 sink(池)来生成数据流. sink,顾名思义,就是池子,可以想象一下厨房水池的样子.如下图所示: 下面介绍到的方法都有一个

(14)Reactor调度器与线程模型——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 Spring WebFlux快速上手 | Spring WebFlux性能测试前情提要:Reactor 3快速上手 | 响应式流规范 | 自定义数据流本文测试源码 2.4 调度器与线程模型 在1.3.2节简单介绍了不同类型的调度器Scheduler,以及如何使用publishOn和subscribeOn切换不同的线程执行环境. 下边使用一个简单的例子再回忆一下: @Test public void testScheduling() { F

附2:Reactor 3 之选择合适的操作符——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 Reactor Operators 本节的内容来自我翻译的Reactor 3 参考文档--如何选择操作符.由于部分朋友打开github.io网速比较慢或上不去,贴出来方便大家查阅. 如果一个操作符是专属于 Flux 或 Mono 的,那么会给它注明前缀.公共的操作符没有前缀.如果一个具体的用例涉及多个操作符的组合,这里以方法调用的方式展现,会以一个点(.)开头,并将参数置于圆括号内,比如: .methodCall(parameter).

(15)Reactor 3 Operators——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 Reactor 3快速上手 | 响应式流规范 2.5 Reactor 3 Operators 虽然响应式流规范中对Operator(以下均称作"操作符")并未做要求,但是与RxJava等响应式开发库一样,Reactor也提供了非常丰富的操作符. 2.5.1 丰富的操作符 本系列前边的文章中,陆续介绍了一些常用的操作符.但那也只是冰山之一角,Reactor 3提供了丰富的操作符,如果要一个一个介绍,那篇幅大了去了,授人以

(2)响应式流——响应式Spring的道法术器

本系列文章索引:<响应式Spring的道法术器>.前情提要: 什么是响应式编程 1.2 响应式流 上一节留了一个坑--为啥不用Java Stream来进行数据流的操作? 原因在于,若将其用于响应式编程中,是有局限性的.比如如下两个需要面对的问题: Web 应用具有I/O密集的特点,I/O阻塞会带来比较大的性能损失或资源浪费,我们需要一种异步非阻塞的响应式的库,而Java Stream是一种同步API. 假设我们要搭建从数据层到前端的一个变化传递管道,可能会遇到数据层每秒上千次的数据更新,而显然

(3)lambda与函数式——响应式Spring的道法术器

本系列文章索引:<响应式Spring的道法术器>前情提要: 什么是响应式编程 | 响应式流本文源码 1.3 Hello,reactive world 前面两篇文章介绍了响应式编程和响应式流的特性,一味讲概念终是枯燥,还是上手敲一敲代码实在感受一下响应式编程的"手感"吧. 这一节,我们先了解一下lambda与函数式(已经了解的朋友可以直接跳到1.3.2),熟悉一下如何使用Reactor进行响应式编程,然后使用Spring Boot2,基于Spring 5的Webflux和Re

响应式Spring的道法术器(Spring WebFlux 快速上手 + 全面介绍)

1. Spring WebFlux 2小时快速入门 Spring 5 之使用Spring WebFlux开发响应式应用. lambda与函数式(15min) Reactor 3 响应式编程库(60min) Spring Webflux和Spring Data Reactive开发响应式应用(45min) 通过以上内容相信可以对Spring 5.0 推出的响应式开发有了初步的体会.如果希望有更加深入的了解,欢迎阅读下边的系列文章-- 2. 响应式Spring的道法术器 这个系列的文章是为了记录下自

(10)响应式宣言、响应式系统与响应式编程——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 响应式编程 | 响应式流 1.5 响应式系统 1.5.1 响应式宣言 关注"响应式"的朋友不难搜索到关于"响应式宣言"的介绍,先上图: 这张图凝聚了许多大神的智慧和经验,见官网,中文版官网,如果你认可这个宣言的内容,还可以签下你的大名.虽然这些内容多概念而少实战,让人感觉是看教科书,但是字字千金,不时看一看都会有新的体会和收获. 这也是新时代男朋友的行为准则: Responsive,要及时响应,24

(12)自定义数据流(实战Docker事件推送的REST API)——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 Reactor 3快速上手 | Spring WebFlux快速上手 | 响应式流规范本文 测试源码 | 实战源码 2.2 自定义数据流 这一小节介绍如何通过定义相应的事件(onNext.onError和onComplete) 创建一个 Flux 或 Mono.Reactor提供了generate.create.push和handle等方法,所有这些方法都使用 sink(池)来生成数据流. sink,顾名思义,就是池子,可以想象一下厨房水