(12)Reactor 3 自定义数据流——响应式Spring的道法术器

本系列文章索引《响应式Spring的道法术器》
前情提要 响应式流 | Reactor 3快速上手 | 响应式流规范
本文源码

2.2 自定义数据流

这一小节介绍如何通过定义相应的事件(onNextonErroronComplete) 创建一个 Flux 或 Mono。Reactor提供了generatecreatepushhandle等方法,所有这些方法都使用 sink(池)来生成数据流。

sink,顾名思义,就是池子,可以想象一下厨房水池的样子。如下图所示:

下面介绍到的方法都有一个sink提供给方法使用者,通常至少会暴露三个方法给我们,nexterrorcomplete。next和error相当于两个下水口,我们不断将自定义的数据放到next口,Reactor就会帮我们串成一个Publisher数据流,直到有一个错误数据放到error口,或按了一下complete按钮,数据流就会终止了。

2.2.1 generate

generate是一种同步地,逐个地发出数据的方法。因为它提供的sink是一个SynchronousSink, 而且其next()方法在每次回调的时候最多只能被调用一次。

generate方法有三种签名:

    public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)

    public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) 

    public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)

1)使用SynchronousSink生成数据流

    @Test
    public void testGenerate1() {
        final AtomicInteger count = new AtomicInteger(1);   // 1
        Flux.generate(sink -> {
            sink.next(count.get() + " : " + new Date());   // 2
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (count.getAndIncrement() >= 5) {
                sink.complete();     // 3
            }
        }).subscribe(System.out::println);  // 4
    }
  1. 用于计数;
  2. 向“池子”放自定义的数据;
  3. 告诉generate方法,自定义数据已发完;
  4. 触发数据流。

输出结果为每1秒钟打印一下时间,共打印5次。

2)增加一个伴随状态

对于上边的例子来说,count用于记录状态,当值达到5之后就停止计数。由于在lambda内部使用,因此必须是final类型的,且不能是原生类型(如int)或不可变类型(如Integer)。

如果使用第二个方法签名,上边的例子可以这样改:

    @Test
    public void testGenerate2() {
        Flux.generate(
                () -> 1,    // 1
                (count, sink) -> {      // 2
                    sink.next(count + " : " + new Date());
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (count >= 5) {
                        sink.complete();
                    }
                    return count + 1;   // 3
                }).subscribe(System.out::println);
    }
  1. 初始化状态值;
  2. 第二个参数是BiFunction,输入为状态和sink;
  3. 每次循环都要返回新的状态值给下次使用。

3)完成后处理

第三个方法签名除了状态、sink外,还有一个Consumer,这个Consumer在数据流发完后执行。

        Flux.generate(
                () -> 1,
                (count, sink) -> {
                    sink.next(count + " : " + new Date());
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (count >= 5) {
                        sink.complete();
                    }
                    return count + 1;
                }, System.out::println)     // 1
                .subscribe(System.out::println);
    }
  1. 最后将count值打印出来。

如果 state 使用了数据库连接或者其他需要进行清理的资源,这个 Consumer lambda 可以用来在最后完成资源清理任务。

2.2.2 create

create是一个更高级的创建Flux的方法,其生成数据流的方式既可以是同步的,也可以是异步的,并且还可以每次发出多个元素。

create用到了FluxSink,后者同样提供 next,error 和 complete 等方法。 与generate不同的是,create不需要状态值,另一方面,它可以在回调中触发多个事件(即使事件是发生在未来的某个时间)。

create 常用的场景就是将现有的 API 转为响应式,比如监听器的异步方法。

先编写一个事件源:

    public class MyEventSource {

        private List<MyEventListener> listeners;

        public MyEventSource() {
            this.listeners = new ArrayList<>();
        }

        public void register(MyEventListener listener) {    // 1
            listeners.add(listener);
        }

        public void newEvent(MyEvent event) {
            for (MyEventListener listener :
                    listeners) {
                listener.onNewEvent(event);     // 2
            }
        }

        public void eventStopped() {
            for (MyEventListener listener :
                    listeners) {
                listener.onEventStopped();      // 3
            }
        }

        @Data
        @NoArgsConstructor
        @AllArgsConstructor
        public static class MyEvent {   // 4
            private Date timeStemp;
            private String message;
        }
    }
  1. 注册监听器;
  2. 向监听器发出新事件;
  3. 告诉监听器事件源已停止;
  4. 事件类,使用了lombok注解。

准备一个监听器接口,它可以监听上边第2和3的两种事件:(1)新的MyEvent到来;(2)事件源停止。如下:

    public interface MyEventListener {
        void onNewEvent(MyEventSource.MyEvent event);
        void onEventStopped();
    }

下面的测试方法逻辑是:创建一个监听器注册到事件源,这个监听器再收到事件回调的时候通过Flux.create的sink将一系列事件转换成异步的事件流:

    @Test
    public void testCreate() throws InterruptedException {
        MyEventSource eventSource = new MyEventSource();    // 1
        Flux.create(sink -> {
                    eventSource.register(new MyEventListener() {    // 2
                        @Override
                        public void onNewEvent(MyEventSource.MyEvent event) {
                            sink.next(event);       // 3
                        }

                        @Override
                        public void onEventStopped() {
                            sink.complete();        // 4
                        }
                    });
                }
        ).subscribe(System.out::println);       // 5

        for (int i = 0; i < 20; i++) {  // 6
            Random random = new Random();
            TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
            eventSource.newEvent(new MyEventSource.MyEvent(new Date(), "Event-" + i));
        }
        eventSource.eventStopped(); // 7
    }
  1. 事件源;
  2. 向事件源注册用匿名内部类创建的监听器;
  3. 监听器在收到事件回调的时候通过sink将事件再发出;
  4. 监听器再收到事件源停止的回调的时候通过sink发出完成信号;
  5. 触发订阅(这时候还没有任何事件产生);
  6. 循环产生20个事件,每个间隔不超过1秒的随机时间;
  7. 最后停止事件源。

运行一下这个测试方法,20个MyEvent陆续打印出来。

如果将上边的create方法换成generate方法,则会报出异常:

java.lang.IllegalStateException: The generator didn‘t call any of the SynchronousSink method

证明generate并不支持异步的方式。

create方法还有一个变体方法push,适合生成事件流。与 create类似,push 也可以是异步地, 并且能够使用以上各种回压策略。所以上边的例子可以替换为push方法。区别在于,push方法中,调用nextcompleteerror的必须是同一个线程。

除了nextcompleteerror方法外,FluxSink还有onRequest方法,这个方法可以用来响应下游订阅者的请求事件。从而不仅可以像上一个例子那样,上游在数据就绪的时候将其推送到下游,同时下游也可以从上游拉取已经就绪的数据。这是一种推送/拉取混合的模式。比如:

    Flux<String> bridge = Flux.create(sink -> {
        myMessageProcessor.register(
          new MyMessageListener<String>() {

            public void onMessage(List<String> messages) {
              for(String s : messages) {
                sink.next(s);   // 1
              }
            }
        });
        sink.onRequest(n -> {   // 2
            List<String> messages = myMessageProcessor.request(n);  // 3
            for(String s : message) {
               sink.next(s);
            }
        });
        ...
    }
  1. push方式,主动向下游发出数据;
  2. 在下游发出请求时被调用;
  3. 响应下游的请求,查询是否有可用的message。

原文地址:http://blog.51cto.com/liukang/2092712

时间: 2024-11-09 20:40:06

(12)Reactor 3 自定义数据流——响应式Spring的道法术器的相关文章

(12)自定义数据流(实战Docker事件推送的REST API)——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 Reactor 3快速上手 | Spring WebFlux快速上手 | 响应式流规范本文 测试源码 | 实战源码 2.2 自定义数据流 这一小节介绍如何通过定义相应的事件(onNext.onError和onComplete) 创建一个 Flux 或 Mono.Reactor提供了generate.create.push和handle等方法,所有这些方法都使用 sink(池)来生成数据流. sink,顾名思义,就是池子,可以想象一下厨房水

(14)Reactor调度器与线程模型——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 Spring WebFlux快速上手 | Spring WebFlux性能测试前情提要:Reactor 3快速上手 | 响应式流规范 | 自定义数据流本文测试源码 2.4 调度器与线程模型 在1.3.2节简单介绍了不同类型的调度器Scheduler,以及如何使用publishOn和subscribeOn切换不同的线程执行环境. 下边使用一个简单的例子再回忆一下: @Test public void testScheduling() { F

(19)Reactor Processors——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 响应式流 | Reactor 3快速上手 | 响应式流规范 2.9 Processor Processor既是一种特别的发布者(Publisher)又是一种订阅者(Subscriber). 所以你能够订阅一个Processor,也可以调用它们提供的方法来手动插入数据到序列,或终止序列. 前面一直在聊响应式流的四个接口中的三个:Publisher.Subscriber.Subscription,唯独Processor迟迟没有提及.原因在于想

附2:Reactor 3 之选择合适的操作符——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 Reactor Operators 本节的内容来自我翻译的Reactor 3 参考文档--如何选择操作符.由于部分朋友打开github.io网速比较慢或上不去,贴出来方便大家查阅. 如果一个操作符是专属于 Flux 或 Mono 的,那么会给它注明前缀.公共的操作符没有前缀.如果一个具体的用例涉及多个操作符的组合,这里以方法调用的方式展现,会以一个点(.)开头,并将参数置于圆括号内,比如: .methodCall(parameter).

(15)Reactor 3 Operators——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 Reactor 3快速上手 | 响应式流规范 2.5 Reactor 3 Operators 虽然响应式流规范中对Operator(以下均称作"操作符")并未做要求,但是与RxJava等响应式开发库一样,Reactor也提供了非常丰富的操作符. 2.5.1 丰富的操作符 本系列前边的文章中,陆续介绍了一些常用的操作符.但那也只是冰山之一角,Reactor 3提供了丰富的操作符,如果要一个一个介绍,那篇幅大了去了,授人以

响应式Spring的道法术器(Spring WebFlux 快速上手 + 全面介绍)

1. Spring WebFlux 2小时快速入门 Spring 5 之使用Spring WebFlux开发响应式应用. lambda与函数式(15min) Reactor 3 响应式编程库(60min) Spring Webflux和Spring Data Reactive开发响应式应用(45min) 通过以上内容相信可以对Spring 5.0 推出的响应式开发有了初步的体会.如果希望有更加深入的了解,欢迎阅读下边的系列文章-- 2. 响应式Spring的道法术器 这个系列的文章是为了记录下自

(2)响应式流——响应式Spring的道法术器

本系列文章索引:<响应式Spring的道法术器>.前情提要: 什么是响应式编程 1.2 响应式流 上一节留了一个坑--为啥不用Java Stream来进行数据流的操作? 原因在于,若将其用于响应式编程中,是有局限性的.比如如下两个需要面对的问题: Web 应用具有I/O密集的特点,I/O阻塞会带来比较大的性能损失或资源浪费,我们需要一种异步非阻塞的响应式的库,而Java Stream是一种同步API. 假设我们要搭建从数据层到前端的一个变化传递管道,可能会遇到数据层每秒上千次的数据更新,而显然

(3)lambda与函数式——响应式Spring的道法术器

本系列文章索引:<响应式Spring的道法术器>前情提要: 什么是响应式编程 | 响应式流本文源码 1.3 Hello,reactive world 前面两篇文章介绍了响应式编程和响应式流的特性,一味讲概念终是枯燥,还是上手敲一敲代码实在感受一下响应式编程的"手感"吧. 这一节,我们先了解一下lambda与函数式(已经了解的朋友可以直接跳到1.3.2),熟悉一下如何使用Reactor进行响应式编程,然后使用Spring Boot2,基于Spring 5的Webflux和Re

(10)响应式宣言、响应式系统与响应式编程——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 响应式编程 | 响应式流 1.5 响应式系统 1.5.1 响应式宣言 关注"响应式"的朋友不难搜索到关于"响应式宣言"的介绍,先上图: 这张图凝聚了许多大神的智慧和经验,见官网,中文版官网,如果你认可这个宣言的内容,还可以签下你的大名.虽然这些内容多概念而少实战,让人感觉是看教科书,但是字字千金,不时看一看都会有新的体会和收获. 这也是新时代男朋友的行为准则: Responsive,要及时响应,24