(15)Reactor 3 Operators——响应式Spring的道法术器

本系列文章索引《响应式Spring的道法术器》
前情提要 Reactor 3快速上手 | 响应式流规范

2.5 Reactor 3 Operators

虽然响应式流规范中对Operator(以下均称作”操作符“)并未做要求,但是与RxJava等响应式开发库一样,Reactor也提供了非常丰富的操作符。

2.5.1 丰富的操作符

本系列前边的文章中,陆续介绍了一些常用的操作符。但那也只是冰山之一角,Reactor 3提供了丰富的操作符,如果要一个一个介绍,那篇幅大了去了,授人以鱼不如授人以渔,我们可以通过以下几种途径了解操作符的应用场景,熟悉它们的使用方法:

  1. 附2是《Reactor 3 参考文档》中关于“如何选择合适的操作符”一节的翻译,介绍了如何选择合适的操作符。
  2. 参考Javadoc中对Flux和Mono的解释和示意图。
  3. 如果想通过实战的方式上手试一下各种操作符,强烈推荐来自Reactor官方的lite-rx-api-hands-on项目。拿到项目后,你要做的就是使用操作符,完成“TODO”的代码,让所有的@Test绿灯就OK了。相信完成这些测试之后,对于常见的操作符就能了然于胸了。
  4. 此外,在日常的开发过程中,通过IDE也可以随时查阅,比如IntelliJ:

由于Project Reactor的核心开发团队也有来自RxJava的大牛,并且Reactor本身在开发过程中也借鉴了大多数RxJava的操作符命名(对于RxJava中少量命名不够清晰的操作符进行了优化),因此对于熟悉RxJava的朋友来说,使用Reactor基本没有学习成本。同样的,学习了Reactor之后,再去使用RxJava也没有问题。

2.5.2 “打包”操作符

我们在开发过程中,为了保持代码的简洁,通常会将经常使用的一系列操作封装到方法中,以备调用。

Reactor也提供了类似的对操作符的“打包”方法。

1)使用 transform 操作符

transform可以将一段操作链打包为一个函数式。这个函数式能在组装期将被封装的操作符还原并接入到调用transform的位置。这样做和直接将被封装的操作符加入到链上的效果是一样的。示例如下:

    @Test
    public void testTransform() {
        Function<Flux<String>, Flux<String>> filterAndMap =
                f -> f.filter(color -> !color.equals("orange"))
                        .map(String::toUpperCase);

        Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
                .doOnNext(System.out::println)
                .transform(filterAndMap)
                .subscribe(d -> System.out.println("Subscriber to Transformed MapAndFilter: "+d));
    }

这个例子,通过名为filterAndMap的函数式将filtermap操作符进行了打包,然后交给transform拼装到操作链中。输出如下:

blue
Subscriber to Transformed MapAndFilter: BLUE
green
Subscriber to Transformed MapAndFilter: GREEN
orange
purple
Subscriber to Transformed MapAndFilter: PURPLE

2)使用 compose 操作符

compose 操作符与 transform 类似,也能够将几个操作符封装到一个函数式中。主要的区别就是,这个函数式是针对每一个订阅者起作用的。这意味着它对每一个 subscription 可以生成不同的操作链。举个例子:

    public void testCompose() {
        AtomicInteger ai = new AtomicInteger();
        Function<Flux<String>, Flux<String>> filterAndMap = f -> {
            if (ai.incrementAndGet() == 1) {
                return f.filter(color -> !color.equals("orange"))
                        .map(String::toUpperCase);
            }
            return f.filter(color -> !color.equals("purple"))
                    .map(String::toUpperCase);
        };

        Flux<String> composedFlux =
                Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
                        .doOnNext(System.out::println)
                        .compose(filterAndMap);

        composedFlux.subscribe(d -> System.out.println("Subscriber 1 to Composed MapAndFilter :" + d));
        composedFlux.subscribe(d -> System.out.println("Subscriber 2 to Composed MapAndFilter: " + d));
    }

这个例子中,filterAndMap函数式有一个名为ai的会自增的状态值。每次调用subscribe方法进行订阅的时候,compose会导致ai自增,从而两次订阅的操作链是不同的。输出如下:

blue
Subscriber 1 to Composed MapAndFilter :BLUE
green
Subscriber 1 to Composed MapAndFilter :GREEN
orange
purple
Subscriber 1 to Composed MapAndFilter :PURPLE
blue
Subscriber 2 to Composed MapAndFilter: BLUE
green
Subscriber 2 to Composed MapAndFilter: GREEN
orange
Subscriber 2 to Composed MapAndFilter: ORANGE
purple

也就是说,compose中打包的函数式可以是有状态的(stateful):

transform打包的函数式是无状态的。将compose换成transform再次执行,发现两次订阅的操作链是一样的,都会过滤掉orange

原文地址:http://blog.51cto.com/liukang/2094071

时间: 2024-12-26 22:08:30

(15)Reactor 3 Operators——响应式Spring的道法术器的相关文章

附2:Reactor 3 之选择合适的操作符——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 Reactor Operators 本节的内容来自我翻译的Reactor 3 参考文档--如何选择操作符.由于部分朋友打开github.io网速比较慢或上不去,贴出来方便大家查阅. 如果一个操作符是专属于 Flux 或 Mono 的,那么会给它注明前缀.公共的操作符没有前缀.如果一个具体的用例涉及多个操作符的组合,这里以方法调用的方式展现,会以一个点(.)开头,并将参数置于圆括号内,比如: .methodCall(parameter).

(12)Reactor 3 自定义数据流——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 响应式流 | Reactor 3快速上手 | 响应式流规范本文源码 2.2 自定义数据流 这一小节介绍如何通过定义相应的事件(onNext.onError和onComplete) 创建一个 Flux 或 Mono.Reactor提供了generate.create.push和handle等方法,所有这些方法都使用 sink(池)来生成数据流. sink,顾名思义,就是池子,可以想象一下厨房水池的样子.如下图所示: 下面介绍到的方法都有一个

(14)Reactor调度器与线程模型——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 Spring WebFlux快速上手 | Spring WebFlux性能测试前情提要:Reactor 3快速上手 | 响应式流规范 | 自定义数据流本文测试源码 2.4 调度器与线程模型 在1.3.2节简单介绍了不同类型的调度器Scheduler,以及如何使用publishOn和subscribeOn切换不同的线程执行环境. 下边使用一个简单的例子再回忆一下: @Test public void testScheduling() { F

(19)Reactor Processors——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 响应式流 | Reactor 3快速上手 | 响应式流规范 2.9 Processor Processor既是一种特别的发布者(Publisher)又是一种订阅者(Subscriber). 所以你能够订阅一个Processor,也可以调用它们提供的方法来手动插入数据到序列,或终止序列. 前面一直在聊响应式流的四个接口中的三个:Publisher.Subscriber.Subscription,唯独Processor迟迟没有提及.原因在于想

响应式Spring的道法术器(Spring WebFlux 快速上手 + 全面介绍)

1. Spring WebFlux 2小时快速入门 Spring 5 之使用Spring WebFlux开发响应式应用. lambda与函数式(15min) Reactor 3 响应式编程库(60min) Spring Webflux和Spring Data Reactive开发响应式应用(45min) 通过以上内容相信可以对Spring 5.0 推出的响应式开发有了初步的体会.如果希望有更加深入的了解,欢迎阅读下边的系列文章-- 2. 响应式Spring的道法术器 这个系列的文章是为了记录下自

(2)响应式流——响应式Spring的道法术器

本系列文章索引:<响应式Spring的道法术器>.前情提要: 什么是响应式编程 1.2 响应式流 上一节留了一个坑--为啥不用Java Stream来进行数据流的操作? 原因在于,若将其用于响应式编程中,是有局限性的.比如如下两个需要面对的问题: Web 应用具有I/O密集的特点,I/O阻塞会带来比较大的性能损失或资源浪费,我们需要一种异步非阻塞的响应式的库,而Java Stream是一种同步API. 假设我们要搭建从数据层到前端的一个变化传递管道,可能会遇到数据层每秒上千次的数据更新,而显然

(3)lambda与函数式——响应式Spring的道法术器

本系列文章索引:<响应式Spring的道法术器>前情提要: 什么是响应式编程 | 响应式流本文源码 1.3 Hello,reactive world 前面两篇文章介绍了响应式编程和响应式流的特性,一味讲概念终是枯燥,还是上手敲一敲代码实在感受一下响应式编程的"手感"吧. 这一节,我们先了解一下lambda与函数式(已经了解的朋友可以直接跳到1.3.2),熟悉一下如何使用Reactor进行响应式编程,然后使用Spring Boot2,基于Spring 5的Webflux和Re

(10)响应式宣言、响应式系统与响应式编程——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 响应式编程 | 响应式流 1.5 响应式系统 1.5.1 响应式宣言 关注"响应式"的朋友不难搜索到关于"响应式宣言"的介绍,先上图: 这张图凝聚了许多大神的智慧和经验,见官网,中文版官网,如果你认可这个宣言的内容,还可以签下你的大名.虽然这些内容多概念而少实战,让人感觉是看教科书,但是字字千金,不时看一看都会有新的体会和收获. 这也是新时代男朋友的行为准则: Responsive,要及时响应,24

(12)自定义数据流(实战Docker事件推送的REST API)——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 Reactor 3快速上手 | Spring WebFlux快速上手 | 响应式流规范本文 测试源码 | 实战源码 2.2 自定义数据流 这一小节介绍如何通过定义相应的事件(onNext.onError和onComplete) 创建一个 Flux 或 Mono.Reactor提供了generate.create.push和handle等方法,所有这些方法都使用 sink(池)来生成数据流. sink,顾名思义,就是池子,可以想象一下厨房水