Rxjava2 Observable的条件操作符详解及实例

目录

  • 简要:
  • 1. Amb
  • 2. DefaultIfEmpty
  • 3. SwitchIfEmpty
  • 4. SkipUntil
  • 5. SkipWhile
  • 6. TakeUntil
    • 6.1 takeUntil(ObservableSource other)
    • 6.2 takeUntil(Predicate stopPredicate)
  • 7. TakeWhile
  • 小结

简要:

需求了解:

在使用 Rxjava 开发中,经常有一些各种条件的操作 ,如比较两个 Observable 谁先发射了数据、跳过指定条件的 Observable 等一系列的条件操作需求,那么很幸运, Rxjava 中已经有了很多条件操作符,一起来了解一下吧。

下面列出了一些Rxjava的用于条件操作符:

  • Amb:给定两个或多个Observables,它只发射首先发射数据或通知的那个Observable的所有数据。
  • DefaultIfEmpty:发射来自原始Observable的值,如果原始 Observable 没有发射任何数据项,就发射一个默认值。
  • SwitchIfEmpty:如果原始Observable没有发射数据时,发射切换一个指定的Observable继续发射数据。
  • SkipUntil:丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一个数据,然后发射原始 Observable 的剩余数据。
  • SkipWhile:丢弃原始 Observable 发射的数据,直到一个特定的条件为假,然后发射原始 Observable 剩余的数据。
  • TakeUntil:发射来自原始 Observable 的数据,直到第二个 Observable 发射了一个数据或一个通知。

1. Amb

给定两个或多个Observables,它只发射首先发射数据或通知的那个Observable的所有数据。

解析: 对多个Observable进行监听,首先发射通知(包括数据)的Observable将会被观察者观察,发射这个Observable的所有数据。

示例代码:

    // 创建Observable
   Observable<Integer> delayObservable = Observable.range(1, 5)
                                                                                    .delay(100, TimeUnit.MILLISECONDS); // 延迟100毫秒发射数据
    Observable<Integer> rangeObservable = Observable.range(6, 5);

    // 创建Observable的集合
    ArrayList<Observable<Integer>> list = new ArrayList<>();
    list.add(delayObservable);
    list.add(rangeObservable);

    // 创建Observable的数组
    Observable<Integer>[] array = new Observable[2];
    array[0] = delayObservable;
    array[1] = rangeObservable;

    /**
     *  1. ambWith(ObservableSource<? extends T> other)
     *  与另外一个Observable比较,只发射首先发射通知的Observable的数据
     */
    rangeObservable.ambWith(delayObservable)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(1): " + integer);
                }
            });

    System.in.read();
    System.out.println("------------------------------------------------");
    /**
     *  2. amb(Iterable<? extends ObservableSource<? extends T>> sources)
     *  接受一个Observable类型的集合, 只发射集合中首先发射通知的Observable的数据
     */
    Observable.amb(list)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(2): " + integer);
                }
            });

    System.in.read();
    System.out.println("------------------------------------------------");
    /**
     *  3. ambArray(ObservableSource<? extends T>... sources)
     *  接受一个Observable类型的数组, 只发射数组中首先发射通知的Observable的数据
     */
    Observable.ambArray(array)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(3): " + integer);
                }
            });

    System.in.read();

输出:

--> accept(1): 6
--> accept(1): 7
--> accept(1): 8
--> accept(1): 9
--> accept(1): 10
------------------------------------------------
--> accept(2): 6
--> accept(2): 7
--> accept(2): 8
--> accept(2): 9
--> accept(2): 10
------------------------------------------------
--> accept(3): 6
--> accept(3): 7
--> accept(3): 8
--> accept(3): 9
--> accept(3): 10

Javadoc: ambWith(ObservableSource other)
Javadoc: amb(Iterable sources)
Javadoc: ambArray(ObservableSource... sources)

2. DefaultIfEmpty

发射来自原始Observable的值,如果原始 Observable 没有发射数据项,就发射一个默认值。

解析: DefaultIfEmpty 简单的精确地发射原始Observable的值,如果原始Observable没有发射任何数据正常终止(以 onCompleted 的形式), DefaultIfEmpty 返回的Observable就发射一个你提供的默认值。如果你需要发射更多的数据,或者切换备用的Observable,你可以考虑使用 switchIfEmpty 操作符 。

示例代码:

    /**
     *   defaultIfEmpty(@NotNull T defaultItem)
     *  如果原始Observable没有发射任何数据正常终止(以 onCompleted 的形式),
     *  DefaultIfEmpty 返回的Observable就发射一个你提供的默认值defaultItem。
     */
    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onComplete();   // 不发射任何数据,直接发射完成通知
        }
    }).defaultIfEmpty("No Data emitter!!!")
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe");
                }

                @Override
                public void onNext(String s) {
                    System.out.println("--> onNext: " + s);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError: " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete");
                }
            });

输出:

--> onSubscribe
--> onNext: No Data emitter!!!
--> onComplete

Javadoc: defaultIfEmpty(T defaultItem)

3. SwitchIfEmpty

如果原始Observable没有发射数据时,发射切换一个指定的Observable继续发射数据。

解析: 如果原始 Observable 没有发射数据时,发射切换指定的 other 继续发射数据。

示例代码:

    /**
     *  switchIfEmpty(ObservableSource other)
     *  如果原始Observable没有发射数据时,发射切换指定的other继续发射数据
     */
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onComplete();           // 不发射任何数据,直接发射完成通知
        }
    }).switchIfEmpty(Observable.just(888))  // 如果原始Observable没有发射数据项,默认发射备用的Observable,发射数据项888
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe");
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("--> onNext: " + integer);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError: " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete");
                }
            });

输出:

--> onSubscribe
--> onNext: 888
--> onComplete

Javadoc: switchIfEmpty(ObservableSource other)

4. SkipUntil

丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一个数据,然后发射原始 Observable 的剩余数据。


示例代码:

    /**
     *  skipUntil(ObservableSource other)
     *  丢弃原始Observable发射的数据,直到other发射了一个数据,然后发射原始Observable的剩余数据。
     */
    Observable.intervalRange(1, 10, 0, 500, TimeUnit.MILLISECONDS)
            // 丢弃2000毫秒的原始Observable发射的数据,接受后面的剩余部分数据
            .skipUntil(Observable.timer(2000, TimeUnit.MILLISECONDS))
            .subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe");
                }

                @Override
                public void onNext(Long aLong) {
                    System.out.println("--> onNext: " + aLong);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError: " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete");
                }
            });

    System.in.read();

输出:

--> onSubscribe
--> onNext: 5
--> onNext: 6
--> onNext: 7
--> onNext: 8
--> onNext: 9
--> onNext: 10
--> onComplete

Javadoc: skipUntil(ObservableSource other)

5. SkipWhile

丢弃原始 Observable 发射的数据,直到一个特定的条件为假,然后发射原始 Observable 剩余的数据。

示例代码:

        /**
         *  skipWhile(Predicate<? super T> predicate)
         *  丢弃原始 Observable 发射的数据,直到函数predicate的条件为假,然后发射原始Observable剩余的数据。
         */
        Observable.intervalRange(1, 10, 0, 500, TimeUnit.MILLISECONDS)
                .skipWhile(new Predicate<Long>() {
                    @Override
                    public boolean test(Long aLong) throws Exception {
                        if (aLong > 5) {
                            return false;       // 当原始数据大于5时,发射后面的剩余部分数据
                        }
                        return true;            // 丢弃原始数据项
                    }
                }).subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("--> onSubscribe");
                    }

                    @Override
                    public void onNext(Long aLong) {
                        System.out.println("--> onNext: " + aLong);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("--> onError: " + e);
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("--> onComplete");
                    }
                });

        System.in.read();

输出:

--> onSubscribe
--> onNext: 6
--> onNext: 7
--> onNext: 8
--> onNext: 9
--> onNext: 10
--> onComplete

Javadoc: skipWhile(Predicate predicate)

6. TakeUntil

发射来自原始 Observable 的数据,直到第二个 Observable 发射了一个数据或一个通知。

6.1 takeUntil(ObservableSource other)

TakeUntil 订阅并开始发射原始 Observable,它还监视你提供的第二个 Observable。如果第二个 Observable 发射了一项数据或者发射了一个终止通知,TakeUntil 返回的 Observable 会停止发射原始 Observable 并终止。

解析: 第二个Observable发射一项数据或一个 onError 通知或一个 onCompleted 通知都会导致 takeUntil 停止发射数据。

示例代码:

    // 创建Observable,发送数字1~10,每间隔200毫秒发射一个数据
    Observable<Long> observable = Observable.intervalRange(1, 10, 0, 200, TimeUnit.MILLISECONDS);

    /**
     *  1. takeUntil(ObservableSource other)
     *  发射来自原始Observable的数据,直到other发射了一个数据或一个通知后停止发射原始Observable并终止。
     */
    observable.takeUntil(Observable.timer(1000, TimeUnit.MILLISECONDS)) // 1000毫秒后停止发射原始数据
            .subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(1)");
                }

                @Override
                public void onNext(Long aLong) {
                    System.out.println("--> onNext(1): " + aLong);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(1): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(1)");
                }
            });

    System.in.read();

输出:

--> onSubscribe(1)
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
--> onNext(1): 4
--> onNext(1): 5
--> onComplete(1)

Javadoc: takeUntil(ObservableSource other)

6.2 takeUntil(Predicate stopPredicate)

每次发射数据后,通过一个谓词函数来判定是否需要终止发射数据。

解析: 每次发射数据后,通过一个谓词函数 stopPredicate 来判定是否需要终止发射数据,如果 stopPredicate 返回 true 怎表示停止发射原始Observable后面的数据,否则继续发射后面的数据。

示例代码:

    /**
     *  2. takeUntil(Predicate<? super T> stopPredicate)
     *  每次发射数据后,通过一个谓词函数stopPredicate来判定是否需要终止发射数据
     *  如果stopPredicate返回true怎表示停止发射后面的数据,否则继续发射后面的数据
     */
    observable.takeUntil(new Predicate<Long>() {
        @Override
        public boolean test(Long aLong) throws Exception {  // 函数返回false则为继续发射原始数据,true则停止发射原始数据
            if(aLong > 5){
                return true;      // 满足条件后,停止发射数据
            }
            return false;         // 继续发射数据
        }
    }).subscribe(new Observer<Long>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(2)");
        }

        @Override
        public void onNext(Long aLong) {
            System.out.println("--> onNext(2): " + aLong);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(2): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(2)");
        }
    });

    System.in.read();

输出:

--> onSubscribe(2)
--> onNext(2): 1
--> onNext(2): 2
--> onNext(2): 3
--> onNext(2): 4
--> onNext(2): 5
--> onNext(2): 6
--> onComplete(2)

Javadoc: takeUntil(Predicate stopPredicate)

7. TakeWhile

发射原始Observable的数据,直到一个特定的条件,然后跳过剩余的数据。

解析: 发射原始 Observable 的数据,直到 predicate 的条件为 false ,然后跳过剩余的数据。

示例代码:

        // 创建Observable,发送数字1~10,每间隔200毫秒发射一个数据
        Observable<Long> observable = Observable.intervalRange(1, 10, 0, 200, TimeUnit.MILLISECONDS);

        /**
         *  takeWhile(Predicate predicate)
         *  发射原始Observable的数据,直到predicate的条件为false,然后跳过剩余的数据
         */
        observable.takeWhile(new Predicate<Long>() {
            @Override
            public boolean test(Long aLong) throws Exception {  // 函数返回值决定是否继续发射后续的数据
                if(aLong > 5){
                    return false;        // 满足条件后跳过后面的数据
                }
                return true;             // 继续发射数据
            }
        }).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe");
            }

            @Override
            public void onNext(Long aLong) {
                System.out.println("--> onNext: " + aLong);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError: " + e);
            }

            @Override
            public void onComplete() {
                System.out.println("--> onComplete");
            }
        });

        System.in.read();

输出:

--> onSubscribe(1)
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
--> onNext(1): 4
--> onNext(1): 5
--> onComplete(1)

Javadoc: takeWhile(Predicate predicate)

小结

本节主要介绍了Rxjava条件操作符可以根据不同的条件进行数据的发射,变换等相关行为。

提示:以上使用的Rxjava2版本: 2.2.12

Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例

实例代码:

原文地址:https://www.cnblogs.com/jiangming-blogs/p/12127332.html

时间: 2024-10-12 03:00:32

Rxjava2 Observable的条件操作符详解及实例的相关文章

Rxjava2 Observable的数据过滤详解及实例(一)

目录 简要: 1. Debounce 1.1 debounce(timeout, unit) 1.2 debounce(debounceSelector) 2. Throttle 2.1 throttleFirst(windowDuration, unit) 2.2 throttleLast(intervalDuration, unit) 2.3 throttleWithTimeout(timeout, unit) 3. Sample 3.1 sample(period, unit) 3.2 s

Rxjava2 Observable的数据过滤详解及实例(二)

目录 6. Filter 7. Frist 7.1 firstElement() 7.2 first(defaultItem) 7.3 firstOrError() 8. Single 8.1 singleElement() 8.2 single(defaultItem) 8.3 singleOrError() 9. ElementAt 9.1 elementAt(index) 9.2 elementAt(index, defaultItem) 9.3 elementAtOrError(inde

Rxjava2 Observable的数据变换详解及实例(二)

目录 1. Window 1.1 window(closingSelector) 1.2 window(openingIndicator, closingIndicator) 1.3 window(count) 1.4 window(count, skip) 1.5 window(timespan, TimeUnit) 1.6 window(timespan, TimeUnit, count) 1.7 window(timespan, timeskip, TimeUnit) 2. GroupBy

Rxjava2 Observable的数据变换详解及实例(一)

目录 简要: 1.1 buffer(count) 1.2 buffer(boundary) 1.3 buffer(count, skip) 1.4 buffer(timespan, TimeUnit) 1.5 buffer(timespan, TimeUnit, count) 1.6 buffer(timespan, timeskip, TimeUnit) 1.7 buffer(bufferClosingSelector) 2. Map 3. FlatMap 3.1 flatMap(mapper

Rxjava2 Observable的结合操作详解及实例

目录 简要: 1. CombineLatest 2. Join 3. Merge 3.1 merge 3.2 mergeDelayError 4. Zip 5. StartWith 6. SwitchOnNext 6.1 switchOnNext 6.2 switchOnNextDelayError 小结 简要: 需求了解: 在使用 RxJava 开发的过程中,很多时候需要结合多个条件或者数据的逻辑判断,比如登录功能的表单验证,实时数据比对等.这个时候我们就需要使用 RxJava 的结合操作符来

Rxjava2 Observable的辅助操作详解及实例(二)

目录 8. TimeInterval 9. Timeout 9.1 timeout(timeout, timeUnit) 9.2 timeout(timeout, timeUnit, scheduler, other) 9.3 timeout(Function itemTimeoutIndicator, ObservableSource other) 10. Timestamp 11. Using 12. To 小结 接续上篇: Rxjava2 Observable的辅助操作详解及实例(一) 8

Rxjava2 Observable的辅助操作详解及实例(一)

目录 简要: 1. Delay 2. Do 3. SubscribeOn 4. ObserverOn 5. Serialize 6. Materialize 7. Dematerialize 接续: 简要: 需求了解: Rxjava中有一些方便的辅助操作符,来更方便我们的函数式的编程.比如延迟.定时.指定操作的监听.数据类型转换等一系列的操作. 下面列出了一些用于Observable的辅助操作符: Delay:延时发射Observable的结果. Do:注册一个动作作为原始Observable生

(一)Python入门-4控制语句:02单分支选择结构-条件表达式详解

一:选择结构介绍 选择结构通过判断条件是否成立,来决定执行哪个分支.选择结构有多种形式,分为:单分 支.双分支.多分支.流程图如下: 二:单分支选择结构 if语句单分支结构的语法形式如下: if 条件表达式: 语句/语句块 其中:1条件表达式:可以是逻辑表达式.关系表达式.算术表达式等. 2语句/语句块:可以是一条语句,也可以是多条语句.多条语句,缩进必须对齐一致. 三:条件表达式详解 在选择和循环结构中,条件表达式的值为 False的情况如下: False.0.0.0.空值 None.空序列对

SQL Server 执行计划操作符详解(2)——串联(Concatenation )

本文接上文:SQL Server 执行计划操作符详解(1)--断言(Assert) 前言: 根据计划,本文开始讲述另外一个操作符串联(Concatenation),读者可以根据这个词(中英文均可)先幻想一下是干嘛的.其实还是挺直观,就是把东西连起来,那么下面我们来看看到底连什么?怎么连?什么时候连? 简介: 串联操作符既是物理操作符,也是逻辑操作符,在中文版SQL Server的图形化执行计划中称为"串联",在其他格式及英文版本中称为"Concatenation".