附2:Reactor 3 之选择合适的操作符——响应式Spring的道法术器

本系列文章索引《响应式Spring的道法术器》
前情提要 Reactor Operators

本节的内容来自我翻译的Reactor 3 参考文档——如何选择操作符。由于部分朋友打开github.io网速比较慢或上不去,贴出来方便大家查阅。

如果一个操作符是专属于 FluxMono 的,那么会给它注明前缀。
公共的操作符没有前缀。如果一个具体的用例涉及多个操作符的组合,这里以方法调用的方式展现,
会以一个点(.)开头,并将参数置于圆括号内,比如: .methodCall(parameter)

1)创建一个新序列,它...

  • 发出一个 T,我已经有了:just

    • ...基于一个 Optional<T>Mono#justOrEmpty(Optional<T>)
    • ...基于一个可能为 null 的 T:Mono#justOrEmpty(T)
  • 发出一个 T,且还是由 just 方法返回
    • ...但是“懒”创建的:使用 Mono#fromSupplier 或用 defer 包装 just
  • 发出许多 T,这些元素我可以明确列举出来:Flux#just(T...)
  • 基于迭代数据结构:
    • 一个数组:Flux#fromArray
    • 一个集合或 iterable:Flux#fromIterable
    • 一个 Integer 的 range:Flux#range
    • 一个 Stream 提供给每一个订阅:Flux#fromStream(Supplier<Stream>)
  • 基于一个参数值给出的源:
    • 一个 Supplier<T>Mono#fromSupplier
    • 一个任务:Mono#fromCallableMono#fromRunnable
    • 一个 CompletableFuture<T>Mono#fromFuture
  • 直接完成:empty
  • 立即生成错误:error
    • ...但是“懒”的方式生成 Throwableerror(Supplier<Throwable>)
  • 什么都不做:never
  • 订阅时才决定:defer
  • 依赖一个可回收的资源:using
  • 可编程地生成事件(可以使用状态):
    • 同步且逐个的:Flux#generate
    • 异步(也可同步)的,每次尽可能多发出元素:Flux#create
      Mono#create 也是异步的,只不过只能发一个)

2)对序列进行转化

  • 我想转化一个序列:

    • 1对1地转化(比如字符串转化为它的长度):map
    • ...类型转化:cast
    • ...为了获得每个元素的序号:Flux#index
    • 1对n地转化(如字符串转化为一串字符):flatMap + 使用一个工厂方法
    • 1对n地转化可自定义转化方法和/或状态:handle
    • 对每一个元素执行一个异步操作(如对 url 执行 http 请求):flatMap + 一个异步的返回类型为 Publisher 的方法
    • ...忽略一些数据:在 flatMap lambda 中根据条件返回一个 Mono.empty()
    • ...保留原来的序列顺序:Flux#flatMapSequential(对每个元素的异步任务会立即执行,但会将结果按照原序列顺序排序)
    • ...当 Mono 元素的异步任务会返回多个元素的序列时:Mono#flatMapMany
  • 我想添加一些数据元素到一个现有的序列:
    • 在开头添加:Flux#startWith(T...)
    • 在最后添加:Flux#concatWith(T...)
  • 我想将 Flux 转化为集合(一下都是针对 Flux 的)
    • 转化为 List:collectListcollectSortedList
    • 转化为 Map:collectMapcollectMultiMap
    • 转化为自定义集合:collect
    • 计数:count
    • reduce 算法(将上个元素的reduce结果与当前元素值作为输入执行reduce方法,如sum) reduce
    • ...将每次 reduce 的结果立即发出:scan
    • 转化为一个 boolean 值:
    • 对所有元素判断都为true:all
    • 对至少一个元素判断为true:any
    • 判断序列是否有元素(不为空):hasElements
    • 判断序列中是否有匹配的元素:hasElement
  • 我想合并 publishers...
    • 按序连接:Flux#concat.concatWith(other)
    • ...即使有错误,也会等所有的 publishers 连接完成:Flux#concatDelayError
    • ...按订阅顺序连接(这里的合并仍然可以理解成序列的连接):Flux#mergeSequential
    • 按元素发出的顺序合并(无论哪个序列的,元素先到先合并):Flux#merge / .mergeWith(other)
    • ...元素类型会发生变化:Flux#zip / Flux#zipWith
    • 将元素组合:
    • 2个 Monos 组成1个 Tuple2Mono#zipWith
    • n个 Monos 的元素都发出来后组成一个 Tuple:Mono#zip
    • 在终止信号出现时“采取行动”:
    • 在 Mono 终止时转换为一个 Mono<Void>Mono#and
    • 当 n 个 Mono 都终止时返回 Mono<Void>Mono#when
    • 返回一个存放组合数据的类型,对于被合并的多个序列:
      • 每个序列都发出一个元素时:Flux#zip
      • 任何一个序列发出元素时:Flux#combineLatest
    • 只取各个序列的第一个元素:Flux#firstMono#firstmono.or<br/>(otherMono).or(thirdMono),`flux.or(otherFlux).or(thirdFlux)
    • 由一个序列触发(类似于 flatMap,不过“喜新厌旧”):switchMap
    • 由每个新序列开始时触发(也是“喜新厌旧”风格):switchOnNext
  • 我想重复一个序列:repeat
    • ...但是以一定的间隔重复:Flux.interval(duration).flatMap(tick -&gt; myExistingPublisher)
  • 我有一个空序列,但是...
    • 我想要一个缺省值来代替:defaultIfEmpty
    • 我想要一个缺省的序列来代替:switchIfEmpty
  • 我有一个序列,但是我对序列的元素值不感兴趣:ignoreElements
    • ...并且我希望用 Mono 来表示序列已经结束:then
    • ...并且我想在序列结束后等待另一个任务完成:thenEmpty
    • ...并且我想在序列结束之后返回一个 MonoMono#then(mono)
    • ...并且我想在序列结束之后返回一个值:Mono#thenReturn(T)
    • ...并且我想在序列结束之后返回一个 FluxthenMany
  • 我有一个 Mono 但我想延迟完成...
    • ...当有1个或N个其他 publishers 都发出(或结束)时才完成:Mono#delayUntilOther
    • ...使用一个函数式来定义如何获取“其他 publisher”:Mono#delayUntil(Function)
  • 我想基于一个递归的生成序列的规则扩展每一个元素,然后合并为一个序列发出:
    • ...广度优先:expand(Function)
    • ...深度优先:expandDeep(Function)

3)“窥视”(只读)序列

  • 再不对序列造成改变的情况下,我想:

    • 得到通知或执行一些操作:
    • 发出元素:doOnNext
    • 序列完成:Flux#doOnCompleteMono#doOnSuccess
    • 因错误终止:doOnError
    • 取消:doOnCancel
    • 订阅时:doOnSubscribe
    • 请求时:doOnRequest
    • 完成或错误终止:doOnTerminate(Mono的方法可能包含有结果)
      • 但是在终止信号向下游传递 之后doAfterTerminate
    • 所有类型的信号(Signal):Flux#doOnEach
    • 所有结束的情况(完成complete、错误error、取消cancel):doFinally
    • 记录日志:log
  • 我想知道所有的事件:
    • 每一个事件都体现为一个 single 对象:
    • 执行 callback:doOnEach
    • 每个元素转化为 single 对象:materialize
      • ...在转化回元素:dematerialize
    • 转化为一行日志:log

4)过滤序列

  • 我想过滤一个序列

    • 基于给定的判断条件:filter
    • ...异步地进行判断:filterWhen
    • 仅限于指定类型的对象:ofType
    • 忽略所有元素:ignoreElements
    • 去重:
    • 对于整个序列:Flux#distinct
    • 去掉连续重复的元素:Flux#distinctUntilChanged
  • 我只想要一部分序列:
    • 只要 N 个元素:
    • 从序列的第一个元素开始算:Flux#take(long)
      • ...取一段时间内发出的元素:Flux#take(Duration)
      • ...只取第一个元素放到 Mono 中返回:Flux#next()
      • ...使用 request(N) 而不是取消:Flux#limitRequest(long)
    • 从序列的最后一个元素倒数:Flux#takeLast
    • 直到满足某个条件(包含):Flux#takeUntil(基于判断条件),Flux#takeUntilOther(基于对 publisher 的比较)
    • 直到满足某个条件(不包含):Flux#takeWhile
    • 最多只取 1 个元素:
    • 给定序号:Flux#elementAt
    • 最后一个:.takeLast(1)
      • ...如果为序列空则发出错误信号:Flux#last()
      • ...如果序列为空则返回默认值:Flux#last(T)
    • 跳过一些元素:
    • 从序列的第一个元素开始跳过:Flux#skip(long)
      • ...跳过一段时间内发出的元素:Flux#skip(Duration)
    • 跳过最后的 n 个元素:Flux#skipLast
    • 直到满足某个条件(包含):Flux#skipUntil(基于判断条件),Flux#skipUntilOther (基于对 publisher 的比较)
    • 直到满足某个条件(不包含):Flux#skipWhile
    • 采样:
    • 给定采样周期:Flux#sample(Duration)
      • 取采样周期里的第一个元素而不是最后一个:sampleFirst
    • 基于另一个 publisher:Flux#sample(Publisher)
    • 基于 publisher“超时”:Flux#sampleTimeout (每一个元素会触发一个 publisher,如果这个 publisher 不被下一个元素触发的 publisher 覆盖就发出这个元素)
  • 我只想要一个元素(如果多于一个就返回错误)...
    • 如果序列为空,发出错误信号:Flux#single()
    • 如果序列为空,发出一个缺省值:Flux#single(T)
    • 如果序列为空就返回一个空序列:Flux#singleOrEmpty

5)错误处理

  • 我想创建一个错误序列:error...

    • ...替换一个完成的 Flux.concat(Flux.error(e))
    • ...替换一个完成的 Mono.then(Mono.error(e))
    • ...如果元素超时未发出:timeout
    • ...“懒”创建:error(Supplier&lt;Throwable&gt;)
  • 我想要类似 try/catch 的表达方式:
    • 抛出异常:error
    • 捕获异常:
    • 然后返回缺省值:onErrorReturn
    • 然后返回一个 FluxMonoonErrorResume
    • 包装异常后再抛出:.onErrorMap(t -&gt; new RuntimeException(t))
    • finally 代码块:doFinally
    • Java 7 之后的 try-with-resources 写法:using 工厂方法
  • 我想从错误中恢复...
    • 返回一个缺省的:
    • 的值:onErrorReturn
    • PublisherFlux#onErrorResumeMono#onErrorResume
    • 重试:retry
    • ...由一个用于伴随 Flux 触发:retryWhen
  • 我想处理回压错误(向上游发出“MAX”的 request,如果下游的 request 比较少,则应用策略)...
    • 抛出 IllegalStateExceptionFlux#onBackpressureError
    • 丢弃策略:Flux#onBackpressureDrop
    • ...但是不丢弃最后一个元素:Flux#onBackpressureLatest
    • 缓存策略(有限或无限):Flux#onBackpressureBuffer
    • ...当有限的缓存空间用满则应用给定策略:Flux#onBackpressureBuffer 带有策略 BufferOverflowStrategy

6) 基于时间的操作

  • 我想将元素转换为带有时间信息的 Tuple2&lt;Long, T&gt;...

    • 从订阅时开始:elapsed
    • 记录时间戳:timestamp
  • 如果元素间延迟过长则中止序列:timeout
  • 以固定的周期发出元素:Flux#interval
  • 在一个给定的延迟后发出 0:static Mono.delay.
  • 我想引入延迟:
    • 对每一个元素:Mono#delayElementFlux#delayElements
    • 延迟订阅:delaySubscription

7)拆分 Flux

  • 我想将一个 Flux&lt;T&gt; 拆分为一个 Flux&lt;Flux&lt;T&gt;&gt;

    • 以个数为界:window(int)
    • ...会出现重叠或丢弃的情况:window(int, int)
    • 以时间为界:window(Duration)
    • ...会出现重叠或丢弃的情况:window(Duration, Duration)
    • 以个数或时间为界:windowTimeout(int, Duration)
    • 基于对元素的判断条件:windowUntil
    • ...触发判断条件的元素会分到下一波(cutBefore 变量):.windowUntil(predicate, true)
    • ...满足条件的元素在一波,直到不满足条件的元素发出开始下一波:windowWhile (不满足条件的元素会被丢弃)
    • 通过另一个 Publisher 的每一个 onNext 信号来拆分序列:window(Publisher)windowWhen
  • 我想将一个 Flux&lt;T&gt; 的元素拆分到集合...
    • 拆分为一个一个的 List:
    • 以个数为界:buffer(int)
      • ...会出现重叠或丢弃的情况:buffer(int, int)
    • 以时间为界:buffer(Duration)
      • ...会出现重叠或丢弃的情况:buffer(Duration, Duration)
    • 以个数或时间为界:bufferTimeout(int, Duration)
    • 基于对元素的判断条件:bufferUntil(Predicate)
      • ...触发判断条件的元素会分到下一个buffer:.bufferUntil(predicate, true)
      • ...满足条件的元素在一个buffer,直到不满足条件的元素发出开始下一buffer:bufferWhile(Predicate)
    • 通过另一个 Publisher 的每一个 onNext 信号来拆分序列:buffer(Publisher)bufferWhen
    • 拆分到指定类型的 "collection":buffer(int, Supplier&lt;C&gt;)
  • 我想将 Flux&lt;T&gt; 中具有共同特征的元素分组到子 Flux:groupBy(Function&lt;T,K&gt;)(注意返回值是 Flux&lt;GroupedFlux&lt;K, T&gt;&gt;,每一个 GroupedFlux 具有相同的 key 值 K,可以通过 key() 方法获取)。

8)回到同步的世界

  • 我有一个 Flux&lt;T&gt;,我想:

    • 在拿到第一个元素前阻塞:Flux#blockFirst
    • ...并给出超时时限:Flux#blockFirst(Duration)
    • 在拿到最后一个元素前阻塞(如果序列为空则返回 null):Flux#blockLast
    • ...并给出超时时限:Flux#blockLast(Duration)
    • 同步地转换为 Iterable&lt;T&gt;Flux#toIterable
    • 同步地转换为 Java 8 Stream&lt;T&gt;Flux#toStream
  • 我有一个 Mono&lt;T&gt;,我想:
    • 在拿到元素前阻塞:Mono#block
    • ...并给出超时时限:Mono#block(Duration)
    • 转换为 CompletableFuture&lt;T&gt;Mono#toFuture

原文地址:http://blog.51cto.com/liukang/2094073

时间: 2024-11-08 08:46:05

附2:Reactor 3 之选择合适的操作符——响应式Spring的道法术器的相关文章

(15)Reactor 3 Operators——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 Reactor 3快速上手 | 响应式流规范 2.5 Reactor 3 Operators 虽然响应式流规范中对Operator(以下均称作"操作符")并未做要求,但是与RxJava等响应式开发库一样,Reactor也提供了非常丰富的操作符. 2.5.1 丰富的操作符 本系列前边的文章中,陆续介绍了一些常用的操作符.但那也只是冰山之一角,Reactor 3提供了丰富的操作符,如果要一个一个介绍,那篇幅大了去了,授人以

(14)Reactor调度器与线程模型——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 Spring WebFlux快速上手 | Spring WebFlux性能测试前情提要:Reactor 3快速上手 | 响应式流规范 | 自定义数据流本文测试源码 2.4 调度器与线程模型 在1.3.2节简单介绍了不同类型的调度器Scheduler,以及如何使用publishOn和subscribeOn切换不同的线程执行环境. 下边使用一个简单的例子再回忆一下: @Test public void testScheduling() { F

(12)Reactor 3 自定义数据流——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 响应式流 | Reactor 3快速上手 | 响应式流规范本文源码 2.2 自定义数据流 这一小节介绍如何通过定义相应的事件(onNext.onError和onComplete) 创建一个 Flux 或 Mono.Reactor提供了generate.create.push和handle等方法,所有这些方法都使用 sink(池)来生成数据流. sink,顾名思义,就是池子,可以想象一下厨房水池的样子.如下图所示: 下面介绍到的方法都有一个

(19)Reactor Processors——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 响应式流 | Reactor 3快速上手 | 响应式流规范 2.9 Processor Processor既是一种特别的发布者(Publisher)又是一种订阅者(Subscriber). 所以你能够订阅一个Processor,也可以调用它们提供的方法来手动插入数据到序列,或终止序列. 前面一直在聊响应式流的四个接口中的三个:Publisher.Subscriber.Subscription,唯独Processor迟迟没有提及.原因在于想

响应式编程库Reactor 3 Reference Guide参考文档中文版(v3.2.0)

Project Reactor 是 Spring WebFlux 的御用响应式编程库,与 Spring 是兄弟项目. 关于如何基于Spring的组件进行响应式应用的开发,欢迎阅读系列文章<响应式Spring的道法术器>. 官方参考文档地址:http://projectreactor.io/docs/core/release/reference/中文翻译文档地址:http://htmlpreview.github.io/?https://github.com/get-set/reactor-co

Reactor 响应式

1.3.2 Reactor Reactor与Spring是兄弟项目,侧重于Server端的响应式编程,主要 artifact 是 reactor-core,这是一个基于 Java 8 的实现了响应式流规范 (Reactive Streams specification)的响应式库. 本文对Reactor的介绍以基本的概念和简单的使用为主,深度以能够满足基本的Spring WebFlux使用为准.在下一章,我会结合Reactor的设计模式.并发调度模型等原理层面的内容系统介绍Reactor的使用.

Hadoop集群选择合适的硬件配置

为Hadoop集群选择合适的硬件配置 随着Apache Hadoop的起步,云客户的增多面临的首要问题就是如何为他们新的的Hadoop集群选择合适的硬件. 尽管Hadoop被设计为运行在行业标准的硬件上,提出一个理想的集群配置不想提供硬件规格列表那么简单. 选择硬件,为给定的负载在性能和经济性提供最佳平衡是需要测试和验证其有效性.(比如,IO密集型工作负载的用户将会为每个核心主轴投资更多). 在这个博客帖子中,你将会学到一些工作负载评估的原则和它在硬件选择中起着至关重要的作用.在这个过程中,你也

(转)NoSQL系列:选择合适的数据库

内容目录: 为什么使用NoSQL数据库? 键值数据库 文档数据库 列族数据库 图数据库 附思维导图 参考 NoSQL系列:选择合适的数据库 为什么使用NoSQL数据库? 阻抗失衡 关系模型和内存中的数据结构不匹配 采用更为方便的数据交互方式提升开发效率 待处理的数据量很大 数据量超过关系型数据库的承载能力 大集群的出现 在成本方面,集群中应用关系数据库,许可费用是一笔很大的支出: 横向扩展和纵向扩展:关系数据库一般只能是纵向扩展,通过对单机服务器的性能换代增强而实现:而对于扩展到多个服务器, D

如何在程序开发项目中选择合适的 JavaScript 框架,节省时间和成本的9款极佳的JavaScript框架介绍

从技术上来看,iOS,Android 和 Windows Phone 上的移动应用是使用不同的程序语言开发的,iOS 应用使用 Objective-C,Android 应用使用 Java,而 Windows Phone 应用使用 .NET. .随着 JavaScript,CSS 和 HTML 知识技能的提升,相信你也可以构建一个超赞的移动应用.在这篇博客里,我们将会介绍一些极好的 JavaScript 移动应用程序开发框架. 说到网络开发,就不得不说 JavaScript,这是一款很有前途的程序