Rxjava2 Observable的辅助操作详解及实例(二)

目录

  • 8. TimeInterval
  • 9. Timeout
    • 9.1 timeout(timeout, timeUnit)
    • 9.2 timeout(timeout, timeUnit, scheduler, other)
    • 9.3 timeout(Function itemTimeoutIndicator, ObservableSource other)
  • 10. Timestamp
  • 11. Using
  • 12. To
  • 小结

接续上篇: Rxjava2 Observable的辅助操作详解及实例(一)

8. TimeInterval

将一个发射数据的Observable转换为发射那些数据发射时间间隔的Observable。

TimeInterval 操作符拦截原始Observable发射的数据项,替换为发射表示相邻发射物时间间隔的对象。

这个操作符将原始 Observable 转换为另一个 Observable ,后者发射一个标志替换前者的数据项,这个标志表示前者的两个连续发射物之间流逝的时间长度。新的Observable的第一个发射物表示的是在观察者订阅原始 Observable 到原始 Observable 发射它的第一项数据之间流逝的时间长度。不存在与原始 Observable 发射最后一项数据和发射 onCompleted 通知之间时长对应的发射物。

示例代码:

    /**
     * 1. timeInterval(Scheduler scheduler)
     *  scheduler: 可选参数,指定调度线程
     *  接收原始数据项,发射射表示相邻发射物时间间隔的对象
     */
    Observable.intervalRange(1, 10, 100, 100, TimeUnit.MILLISECONDS)
            .timeInterval()
         // .timeInterval(Schedulers.newThread())       // 指定工作线程
            .subscribe(new Observer<Timed<Long>>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(1)");
                }

                @Override
                public void onNext(Timed<Long> longTimed) {
                    long time = longTimed.time();           // 连续数据间的间隔时间
                    TimeUnit unit = longTimed.unit();       // 连续数据间的时间间隔单位
                    Long value = longTimed.value();         // Observable发送的数据项
                    System.out.println("--> onNext(1): " + longTimed.toString());
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(1): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(1)");
                }
            });

    System.in.read();
    System.out.println("-------------------------------------------------");
    /**
     *  2. timeInterval(TimeUnit unit, Scheduler scheduler)
     *  指定时间间隔单位和指定工作线程,接收原始数据项,发射射表示相邻发射物时间间隔的对象
     */
    Observable.intervalRange(1, 10, 1000, 1200, TimeUnit.MILLISECONDS)
        //  .timeInterval(TimeUnit.SECONDS)                             // 指定时间间隔单位
            .timeInterval(TimeUnit.SECONDS, Schedulers.newThread())     // 指定时间间隔单位和指定工作线程
            .subscribe(new Observer<Timed<Long>>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(2)");
                }

                @Override
                public void onNext(Timed<Long> longTimed) {
                    System.out.println("--> onNext(2): " + longTimed.toString());
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(2): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(2)");
                }
            });

    System.in.read();

输出:

--> onSubscribe(1)
--> onNext(1): Timed[time=104, unit=MILLISECONDS, value=1]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=2]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=3]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=4]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=5]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=6]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=7]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=8]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=9]
--> onNext(1): Timed[time=100, unit=MILLISECONDS, value=10]
--> onComplete(1)
-------------------------------------------------
--> onSubscribe(2)
--> onNext(2): Timed[time=1, unit=SECONDS, value=1]
--> onNext(2): Timed[time=1, unit=SECONDS, value=2]
--> onNext(2): Timed[time=1, unit=SECONDS, value=3]
--> onNext(2): Timed[time=1, unit=SECONDS, value=4]
--> onNext(2): Timed[time=1, unit=SECONDS, value=5]
--> onNext(2): Timed[time=2, unit=SECONDS, value=6]
--> onNext(2): Timed[time=1, unit=SECONDS, value=7]
--> onNext(2): Timed[time=1, unit=SECONDS, value=8]
--> onNext(2): Timed[time=1, unit=SECONDS, value=9]
--> onNext(2): Timed[time=1, unit=SECONDS, value=10]
--> onComplete(2)

Javadoc: timeInterval()
Javadoc: timeInterval(Scheduler scheduler)
Javadoc: timeInterval(TimeUnit unit)
Javadoc: timeInterval(TimeUnit unit, Scheduler scheduler)

9. Timeout

对原始Observable的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知。

RxJava中的实现为 timeout 操作符,具有多个不同的变体。

9.1 timeout(timeout, timeUnit)

如果原始 Observable 过了指定的一段时长没有发射任何数据,Timeout操作符会以一个 onError 通知终止这个Observable。

示例代码:

    /**
     *  1. timeout(long timeout, TimeUnit timeUnit)
     *  接受一个时长参数,如果在指定的时间段内没有数据项发射,将会发射一个Error通知,
     *  或者每当原始Observable发射了一项数据, timeout 就启动一个计时器,
     *  如果计时器超过了指定指定的时长而原始Observable没有发射另一项数据,
     *  就抛出 TimeoutException ,以一个错误通知终止Observable。
     */
    Observable.create(new ObservableOnSubscribe<Long>() {
        @Override
        public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
            //  Thread.sleep(2000);     // 延迟2秒后发射数据,此时会有TimeoutException
            emitter.onNext(1L);
            Thread.sleep(2000);     // 延迟2秒后发射数据,此时会有TimeoutException
            emitter.onNext(2L);
            emitter.onComplete();
        }
    }).timeout(1, TimeUnit.SECONDS)     // 指定超时时间段为1秒
      .subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe(1)");
            }

            @Override
            public void onNext(Long aLong) {
                System.out.println("--> onNext(1): " + aLong);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(1): " + e);
            }

            @Override
            public void onComplete() {
                System.out.println("--> onComplete(1)");
            }
      });

    System.in.read();

输出:

--> onSubscribe(1)
--> onNext(1): 1
--> onError(1): java.util.concurrent.TimeoutException: The source did not signal an event for 1 seconds and has been terminated.

Javadoc: timeout(long timeout, TimeUnit timeUnit)

9.2 timeout(timeout, timeUnit, scheduler, other)

在指定时间段后超时时会切换到使用一个你指定的备用的 Observable,而不是发onError通知,可以通过scheduler 来指定工作线程。

示例代码:

    /**
     *  2. timeout(long timeout, TimeUnit timeUnit,
     *  Scheduler scheduler,        // 可选参数,指定线程调度器
     *  ObservableSource other      // 可选参数,超时备用Observable
     *  )
     *
     *  在指定时间段后超时时会切换到使用一个你指定的备用的Observable,而不是发onError通知。
     */
    Observable.create(new ObservableOnSubscribe<Long>() {
        @Override
        public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
            //  Thread.sleep(2000);     // 延迟2秒后发射数据,此时会有TimeoutException
            emitter.onNext(1L);
            Thread.sleep(2000);         // 延迟2秒后发射数据,此时会有TimeoutException
            emitter.onNext(2L);
            emitter.onComplete();
        }
    }).timeout(1, TimeUnit.SECONDS,             // 指定超时时间段为1秒
            Schedulers.newThread(),             // 指定工作线程为子线程
            Observable.just(888L))              // 超时后默认发射的Observable
            .subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(2)");
                }

                @Override
                public void onNext(Long aLong) {
                    System.out.println("--> onNext(2): " + aLong);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(2): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(2)");
                }
            });

    System.in.read();

输出:

--> onSubscribe(2)
--> onNext(2): 1
--> onNext(2): 888
--> onComplete(2)

Javadoc: timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler, ObservableSource other)

9.3 timeout(Function itemTimeoutIndicator, ObservableSource other)

使用一个函数 itemTimeoutIndicator 针对原始 Observable 的每一项返回一个 Observable,如果当这个 Observable 终止时原始 Observable 还没有发射另一项数据,就会认为是超时了,如果没有指定超时备用的 other,就抛出 TimeoutException,以一个错误通知终止 bservable,否则超时后发射备用的 Observable。

示例代码:

    /**
     *  3. timeout(Function<T, ObservableSource> itemTimeoutIndicator
     *  ObservableSource other      // 可选参数,当超时后发射的备用Observable
     *  )
     *  对原始Observable的每一项返回一个Observable,
     *  如果当这个Observable终止时原始Observable还没有发射另一项数据,就会认为是超时了,
     *  如果没有指定超时备用的Observable,就抛出TimeoutException,以一个错误通知终止Observable,
     *  否则超时后发射备用的Observable。
     */
    Observable.create(new ObservableOnSubscribe<Long>() {
        @Override
        public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
            emitter.onNext(1L);
            Thread.sleep(3000);     // 延迟3秒后发射数据,此时会有TimeoutException
            emitter.onNext(2L);
            emitter.onComplete();
        }
    }).timeout(new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long aLong) throws Exception {
            // 为每一个原始数据发射一个Observable来指示下一个数据发射的Timeout,这里指定1秒超时时间
            return Observable.timer(1, TimeUnit.SECONDS);
        }
    }, Observable.just(888L))  // 超时后默认发射的Observable
            .subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(3)");
                }

                @Override
                public void onNext(Long aLong) {
                    System.out.println("--> onNext(3): " + aLong);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(3): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(3)");
                }
            });

    System.in.read();

输出:

--> onSubscribe(3)
--> onNext(3): 1
--> onNext(3): 888
--> onComplete(3)

Javadoc: timeout(Function<T, ObservableSource> itemTimeoutIndicator)
Javadoc: timeout(Function<T, ObservableSource> itemTimeoutIndicator, ObservableSource other)

10. Timestamp

给Observable发射的数据项附加一个指定的时间戳。

timestamp ,它将一个发射Timed类型数据的Observable转换为一个发射类型为 Timestamped<Timed> 的数据的Observable,每一项都包含数据的原始发射时间信息和原始数据。

示例代码:

    /**
     *  1. timestamp(Scheduler scheduler)
     *  scheduler: 可选参数,指定线程调度器
     *
     *  给Observable发射的数据项附加一个时间戳信息
     */
    Observable.intervalRange(1, 5, 1, 100, TimeUnit.MILLISECONDS)
            .timestamp(Schedulers.newThread())      // 指定在子线程调度处理
            .subscribe(new Observer<Timed<Long>>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(1)");
                }

                @Override
                public void onNext(Timed<Long> longTimed) {
                    long time = longTimed.time();           // 连续数据间的间隔时间
                    TimeUnit unit = longTimed.unit();       // 连续数据间的时间间隔单位
                    Long value = longTimed.value();         // Observable发送的数据项
                    System.out.println("--> onNext(1): " + longTimed);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(1): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(1)");
                }
            });

    System.in.read();
    System.out.println("-------------------------------------------");
    /**
     *  2. timestamp(TimeUnit unit, Scheduler scheduler)
     *  scheduler: 可选参数,指定线程调度器
     *
     *  给Observable发射的数据项附加一个指定单位的时间戳信息
     */
    Observable.intervalRange(1, 5, 1, 1200, TimeUnit.MILLISECONDS)
            .timestamp(TimeUnit.SECONDS, Schedulers.newThread())    // 指定时间单位为秒,在子线程调度处理
            .subscribe(new Observer<Timed<Long>>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(2)");
                }

                @Override
                public void onNext(Timed<Long> longTimed) {
                    System.out.println("--> onNext(2): " + longTimed);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(2): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(2)");
                }
            });

    System.in.read();

输出:

--> onSubscribe(1)
--> onNext(1): Timed[time=1577455367446, unit=MILLISECONDS, value=1]
--> onNext(1): Timed[time=1577455367545, unit=MILLISECONDS, value=2]
--> onNext(1): Timed[time=1577455367645, unit=MILLISECONDS, value=3]
--> onNext(1): Timed[time=1577455367745, unit=MILLISECONDS, value=4]
--> onNext(1): Timed[time=1577455367845, unit=MILLISECONDS, value=5]
--> onComplete(1)
-------------------------------------------
--> onSubscribe(2)
--> onNext(2): Timed[time=1577455369, unit=SECONDS, value=1]
--> onNext(2): Timed[time=1577455370, unit=SECONDS, value=2]
--> onNext(2): Timed[time=1577455371, unit=SECONDS, value=3]
--> onNext(2): Timed[time=1577455373, unit=SECONDS, value=4]
--> onNext(2): Timed[time=1577455374, unit=SECONDS, value=5]
--> onComplete(2)

Javadoc: timestamp()
Javadoc: timestamp(Scheduler scheduler)
Javadoc: timestamp(TimeUnit unit)
Javadoc: timestamp(TimeUnit unit, Scheduler scheduler)

11. Using

创建一个只在Observable生命周期内存在的一次性资源。

Using 操作符让你可以指示Observable创建一个只在它的生命周期内存在的资源,当Observable终止时这个资源会被自动释放。

using 操作符接受三个参数:

  1. observableFactory:一个用户创建一次性资源的工厂函数
  2. resourceFactory:一个用于创建Observable的工厂函数
  3. disposeFunction:一个用于释放资源的函数

当一个观察者订阅 using 返回的Observable时, using 将会使用Observable工厂函数创建观察者要观察的Observable,同时使用资源工厂函数创建一个你想要创建的资源。当观察者取消订阅这个Observable时,或者当观察者终止时(无论是正常终止还是因错误而终止), using 使用第三个函数释放它创建的资源。

示例代码:

    /**
     * 用于在Observable的生命周期内存在的资源对象
     */
    class MyResource {
        private String resource;

        public MyResource(String resource) {
            this.resource = resource;
        }

        @Override
        public String toString() {
            return "MyResource{" +
                    "resource='" + resource + '\'' +
                    '}';
        }

        public void releaseResource() {
            System.out.println("----> MyResource resource is release. ");
            resource = null;
        }
    }

    /**
     *  1. using(Callable resourceSupplier, Function sourceSupplier, Consumer disposer, boolean eager)
     *
     *  resourceSupplier:   // 一个用户创建一次性资源的工厂函数
     *  sourceSupplier:     // 一个用于创建Observable的工厂函数
     *  disposer:           // 一个用于释放资源的函数
     *  eager:              // 可选参数,如果为true的话,则第三个函数disposer的处理在Observable的结束前执行
     *
     *  当一个观察者订阅 using 返回的Observable时, using 将会使用Observable工厂函数创建观察者要观察的Observable,
     *  同时使用资源工厂函数创建一个你想要创建的资源。
     *  当观察者取消订阅这个Observable时,或者当观察者终止时(无论是正常终止还是因错误而终止),
     *  using 使用第三个函数释放它创建的资源。
     */
    Observable.using(
            // 一个用户创建一次性资源的工厂函数
            new Callable<MyResource>() {
                @Override
                public MyResource call() throws Exception {
                    System.out.println("----> resourceSupplier call");
                    return new MyResource("This is Observable resource!");
                }
            },
            // 一个用于创建Observable的工厂函数,这个函数返回的Observable就是最终被观察的Observable
            new Function<MyResource, ObservableSource<Long>>() {
                @Override
                public ObservableSource<Long> apply(MyResource myResource) throws Exception {
                    System.out.println("----> sourceSupplier apply: " + myResource);
                    return Observable.rangeLong(1, 5);
                }
            },
            // 一个用于释放资源的函数
            new Consumer<MyResource>() {
                @Override
                public void accept(MyResource myResource) throws Exception {
                    System.out.println("----> disposer accept: ");
                    myResource.releaseResource();
                }
            },
            // 可选参数,如果为true的话,则在Observable的结束前执行释放资源的函数
            true).subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe");
                }

                @Override
                public void onNext(Long aLong) {
                    System.out.println("--> onNext: " + aLong);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError: " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete");
                }
            });

输出:

----> resourceSupplier call(1)
----> sourceSupplier apply(1): MyResource{resource='This is Observable resource!'}
--> onSubscribe(1)
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
--> onNext(1): 4
--> onNext(1): 5
----> disposer accept(1):
----> MyResource resource is release.
--> onComplete

Javadoc: using(Callable resourceSupplier, Function sourceSupplier, Consumer disposer)
Javadoc: using(Callable resourceSupplier, Function sourceSupplier, Consumer disposer, boolean eager)

12. To

将Observable转换为另一个对象或数据结构。

将 Observable 或者Observable 发射的数据序列转换为另一个对象或数据结构。它们中的一些会阻塞直到 Observable 终止,然后生成一个等价的对象或数据结构;另一些返回一个发射那个对象或数据结构的 Observable。

由于 rxjava 的 To 操作符中有很多 toXXX 操作符的实现和不同的变体重载,此处就不详细的展开了,有兴趣的可以查看官方的API 文档 详细参阅。

下面几个是常见的几种To操作符的:

  • toList():让Observable将多项数据组合成一个List,然后调用一次onNext方法传递整个列表。
  • toMap(Function keySelector,Function valueSelector):toMap收集原始Observable发射的所有数据项到一个Map(默认是HashMap)然后发射这个Map。 你可以提供一个用于生成Map的Key的函数,还可以提供一个函数转换数据项到Map存储的值(默认数据项本身就是值)。
  • toSortedList(): 它会对产生的列表排序,默认是自然升序,如果发射的数据项没有实现Comparable接口,会抛出一个异常,你也可以传递一个函数作为用于比较两个数据项。
  • toMultimap(Function keySelector, Function valueSelector):类似于toMap,不同的是,它生成的这个Map的value类型还是一个ArrayList。

示例代码:

        /**
         *  1. toList()
         *  让Observable将多项数据组合成一个List,然后调用一次onNext方法传递整个列表。
         */
        range.toList()
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        System.out.println("--> toList accept(1): " + integers);
                    }
                });

        System.out.println("------------------------------------------");
        /**
         *  2. toMap(Function<? super T, ? extends K> keySelector,Function<? super T, ? extends V> valueSelector)
         *   toMap收集原始Observable发射的所有数据项到一个Map(默认是HashMap)然后发射这个Map。
         *   你可以提供一个用于生成Map的Key的函数,还可以提供一个函数转换数据项到Map存储的值(默认数据项本身就是值)。
         */
        range.toMap(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "key" + integer;     // 返回一个Map的key
            }
        }, new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer integer) throws Exception {
                return integer;                     // 返回一个Map的value
            }
        }).subscribe(new SingleObserver<Map<String, Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe(2)");
            }

            @Override
            public void onSuccess(Map<String, Integer> stringIntegerMap) {
                System.out.println("--> onSuccess(2): " + stringIntegerMap);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(2): " + e);
            }
        });

        System.out.println("------------------------------------------");
        /**
         *  3. toSortedList()
         *  它会对产生的列表排序,默认是自然升序,如果发射的数据项没有实现Comparable接口,会抛出一个异常。
         *  然而,你也可以传递一个函数作为用于比较两个数据项
         */
        Observable.just(5, 3, 8, 6, 9, 10)
                .toSortedList()
                .subscribe(new SingleObserver<List<Integer>>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("--> onSubscribe(3)");
                    }

                    @Override
                    public void onSuccess(List<Integer> integers) {
                        System.out.println("--> onSuccess(3): " + integers);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("--> onError(3): " + e);
                    }
                });

        System.out.println("------------------------------------------");
        /**
         *  4. toSortedList(Comparator comparator)
         *
         *  传递一个函数comparator作为用于比较两个数据项,它会对产生的列表排序
         */
        Observable.just(5, 3, 8, 6, 9, 10)
                .toSortedList(new Comparator<Integer>() {
                    @Override
                    public int compare(Integer o1, Integer o2) {
                        System.out.println("--> compare: o1 = " + o1 + ", o2 = " + o2);
                        return o1 - o2;     // 比较器的排序逻辑
                    }
                }).subscribe(new SingleObserver<List<Integer>>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("--> onSubscribe(4)");
                    }

                    @Override
                    public void onSuccess(List<Integer> integers) {
                        System.out.println("--> onSuccess(4): " + integers);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("--> onError(4): " + e);
                    }
                });

        System.out.println("------------------------------------------");
        /**
         *  5. toMultimap(Function<T, K> keySelector, Function<T, V> valueSelector)
         *  类似于 toMap ,不同的是,它生成的这个Map的value类型还是一个ArrayList
         */
        range.toMultimap(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "key" + integer;     // 返回一个Map的key
            }
        }, new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer integer) throws Exception {
                return integer;                  // 返回一个Map的value
            }
        }).subscribe(new SingleObserver<Map<String, Collection<Integer>>>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe(5)");
            }

            @Override
            public void onSuccess(Map<String, Collection<Integer>> stringCollectionMap) {
                System.out.println("--> onSuccess(5): " + stringCollectionMap);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(5): " + e);
            }
        });

输出:

--> toList accept(1): [1, 2, 3, 4, 5]
------------------------------------------
--> onSubscribe(2)
--> onSuccess(2): {key1=1, key2=2, key5=5, key3=3, key4=4}
------------------------------------------
--> onSubscribe(3)
--> onSuccess(3): [3, 5, 6, 8, 9, 10]
------------------------------------------
--> onSubscribe(4)
--> compare: o1 = 3, o2 = 5
--> compare: o1 = 8, o2 = 3
--> compare: o1 = 8, o2 = 5
--> compare: o1 = 6, o2 = 5
--> compare: o1 = 6, o2 = 8
--> compare: o1 = 9, o2 = 6
--> compare: o1 = 9, o2 = 8
--> compare: o1 = 10, o2 = 6
--> compare: o1 = 10, o2 = 9
--> onSuccess(4): [3, 5, 6, 8, 9, 10]
------------------------------------------
--> onSubscribe(5)
--> onSuccess(5): {key1=[1], key2=[2], key5=[5], key3=[3], key4=[4]}

Javadoc: toList()
Javadoc: toMap(Function keySelector,Function valueSelector)
Javadoc: toSortedList()
Javadoc: toMultimap(Function keySelector, Function valueSelector)

小结

本节主要是介绍了 Rxjava 中的各种辅助操作符,比如延迟、超时,事件监听等相关的辅助类型的操作,这在开发中是很有用处的。

提示:以上使用的Rxjava2版本: 2.2.12

Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例

实例代码:

原文地址:https://www.cnblogs.com/jiangming-blogs/p/12127326.html

时间: 2024-10-05 05:20:28

Rxjava2 Observable的辅助操作详解及实例(二)的相关文章

Rxjava2 Observable的辅助操作详解及实例(一)

目录 简要: 1. Delay 2. Do 3. SubscribeOn 4. ObserverOn 5. Serialize 6. Materialize 7. Dematerialize 接续: 简要: 需求了解: Rxjava中有一些方便的辅助操作符,来更方便我们的函数式的编程.比如延迟.定时.指定操作的监听.数据类型转换等一系列的操作. 下面列出了一些用于Observable的辅助操作符: Delay:延时发射Observable的结果. Do:注册一个动作作为原始Observable生

Rxjava2 Observable的创建详解及实例

目录 简要: 1. Create 2. Defer 3. Empty/Never/Error 4. Just 5. From 6. Repeat 7. RepeatWhen 8. RepeatUntil 9. Range 10. interval 11. Timer 小结 简要: 几种主要的需求 直接创建一个Observable(创建操作) 组合多个Observable(组合操作) 对Observable发射的数据执行变换操作(变换操作) 从Observable发射的数据中取特定的值(过滤操作)

Rxjava2 Observable的数据过滤详解及实例(一)

目录 简要: 1. Debounce 1.1 debounce(timeout, unit) 1.2 debounce(debounceSelector) 2. Throttle 2.1 throttleFirst(windowDuration, unit) 2.2 throttleLast(intervalDuration, unit) 2.3 throttleWithTimeout(timeout, unit) 3. Sample 3.1 sample(period, unit) 3.2 s

Rxjava2 Observable的数据变换详解及实例(二)

目录 1. Window 1.1 window(closingSelector) 1.2 window(openingIndicator, closingIndicator) 1.3 window(count) 1.4 window(count, skip) 1.5 window(timespan, TimeUnit) 1.6 window(timespan, TimeUnit, count) 1.7 window(timespan, timeskip, TimeUnit) 2. GroupBy

Rxjava2 Observable的数据变换详解及实例(一)

目录 简要: 1.1 buffer(count) 1.2 buffer(boundary) 1.3 buffer(count, skip) 1.4 buffer(timespan, TimeUnit) 1.5 buffer(timespan, TimeUnit, count) 1.6 buffer(timespan, timeskip, TimeUnit) 1.7 buffer(bufferClosingSelector) 2. Map 3. FlatMap 3.1 flatMap(mapper

Rxjava2 Observable的数据过滤详解及实例(二)

目录 6. Filter 7. Frist 7.1 firstElement() 7.2 first(defaultItem) 7.3 firstOrError() 8. Single 8.1 singleElement() 8.2 single(defaultItem) 8.3 singleOrError() 9. ElementAt 9.1 elementAt(index) 9.2 elementAt(index, defaultItem) 9.3 elementAtOrError(inde

RxJava2.0的使用详解

RxJava2.0的使用详解 1,初识RxJava RxJava就是一种用Java语言实现的响应式编程,来创建基于事件的异步程序 RxJava是一个基于事件订阅的异步执行的一个类库,目前比较火的一些技术框架! 参考资料: Github上RxJava的项目地址: https://github.com/ReactiveX/RxJava 技术文档Api: http://reactivex.io/RxJava/javadoc/ RxAndroid,用于 Android 开发: https://githu

PE文件结构与函数导出表——详解与实例

PE文件结构与函数导出表--详解与实例 随着windows系统从Xp升级到Win7.Win8, 从32位升级到64位,PE文件结构在整体未变的情况下发生了一些小的变动,一方面是推荐的程序装载地址未采用,另一方面,导出函数序号不再是简单的升序,而是一定程度上的进行了乱序.本文首先对PE文件结构进行了详尽的解说,接着介绍了如何得出函数导出表,整个过程采用SysWoW64目录下的wininet.dll实例进行说明.在介绍过程中,明确指出了Win7.Win8等新系统相对Xp带来的区别. 文章链接:htt

JavaScript 身份证号有效验证详解及实例代码

JavaScript 身份证号有效验证详解及实例代码 这篇文章主要介绍了JavaScript 身份证号有效验证详解及实例代码的相关资料,需要的朋友可以参考下 JavaScript验证身份证号 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 <%@ page language="jav