Rxjava2 Observable的辅助操作详解及实例(一)

目录

  • 简要:
  • 1. Delay
  • 2. Do
  • 3. SubscribeOn
  • 4. ObserverOn
  • 5. Serialize
  • 6. Materialize
  • 7. Dematerialize
  • 接续:

简要:

需求了解:

Rxjava中有一些方便的辅助操作符,来更方便我们的函数式的编程。比如延迟、定时、指定操作的监听、数据类型转换等一系列的操作。

下面列出了一些用于Observable的辅助操作符:

  • Delay:延时发射Observable的结果。
  • Do:注册一个动作作为原始Observable生命周期事件的监听器。
  • SubscribeOn:指定Observable自身在哪个调度器上执行。
  • ObserverOn:指定一个观察者在哪个调度器上观察这个Observable。
  • Serialize:强制一个Observable连续调用并保证行为正确,其实就是同步事件操作。
  • Materialize/Dematerialize:将数据项和事件通知都当做数据项发射 ,Dematerialize 刚好相反。
  • TimeInterval:将一个发射数据的Observable转换为发射那些数据发射时间间隔的Observable。
  • Timeout:对原始Observable的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知。
  • Timestamp:给Observable发射的数据项附加一个指定的时间戳。
  • Using:创建一个只在Observable生命周期内存在的一次性资源。
  • To:将Observable转换为另一个对象或数据结构。

1. Delay

延迟一段指定的时间再发射来自Observable的发射物。

Delay 操作符让原始 Observable 在发射每项数据之前都暂停一段指定的时间段。效果是Observable发射的数据项在时间上向前整体平移了一个增量。

1.1 delay(long delay, TimeUnit unit)

延迟指定时间段后发射原始Observable发射的数据序列,如果发生异常的话,会立即发射通知给观察者。

1.2 delay(Function<T, ObservableSource> itemDelay)

使用一个函数针对原始 Observable 的每一项数据返回一个 Observable ,它监视返回的这个 Observable ,当任何那样的 Observable 终止时,delay 返回的 Observable 就发射关联的那项数据。

1.3 delay(ObservableSource subscriptionDelay, Function<T, ObservableSource> itemDelay)

延迟直到 subscriptionDelay 发射第一个数据项后开始订阅原始 Observable,然后再使用一个函数针对原始Observable的每一项数据返回一个Observable,它监视返回的这个Observable,当任何那样的 Observable 终止时,delay 返回的 Observable 就发射关联的那项数据。

示例代码:

    // 创建Observable
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
        //  emitter.onError(new Exception("Test Error!"));
            emitter.onNext(4);
            emitter.onNext(5);
            emitter.onComplete();
        }
    });

    /**
     * 1. delay(long delay, TimeUnit unit,
     *  Scheduler scheduler: 可选参数,指定工作线程
     *  boolean delayError:    可选参数,延迟异常通知到最后
     *  )
     *  延迟指定时间段后发射原始Observable发射的数据序列,如果发生异常的话,会立即发射通知给观察者。
     */
    observable.doOnNext(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("--> doOnNext(1): " + integer);
        }

    }).delay(1, TimeUnit.SECONDS, Schedulers.newThread(), false) // 在子线程中延迟1秒发射数据,不延迟异常通知
      .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe(1)");
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("--> onNext(1): " + integer);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(1): " + e);
            }

            @Override
            public void onComplete() {
                System.out.println("--> onCompleted(1)");
            }
      });

    System.in.read();
    System.out.println("-----------------------------------------------------");
    /**
     *  2. delay(Function<T, ObservableSource<U>> itemDelay)
     *   使用一个函数针对原始Observable的每一项数据返回一个Observable,它监视返回的这个Observable,
     *   当任何那样的 Observable 终止时,delay 返回的 Observable 就发射关联的那项数据。
     */
    observable.doOnNext(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("--> doOnNext(2): " + integer);
        }

    }).delay(new Function<Integer, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Integer integer) throws Exception {
            System.out.println("--> ObservableSource(2): " + integer);
            Observable<Long> timer = Observable.timer(integer, TimeUnit.SECONDS);
            return timer;
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(2)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(2): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(2): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onCompleted(2)");
        }
    });

    System.in.read();
    System.out.println("-----------------------------------------------------");
    /**
     *  3. delay(ObservableSource subscriptionDelay, Function<T, ObservableSource> itemDelay)
     *  延迟直到subscriptionDelay发射第一个数据项后开始订阅原始Observable
     *  然后再使用一个函数针对原始Observable的每一项数据返回一个Observable,它监视返回的这个Observable,
     *  当任何那样的 Observable 终止时,delay 返回的 Observable 就发射关联的那项数据。
     */
    observable.doOnNext(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("--> doOnNext(3): " + integer);
        }
       // 延迟3秒后开始订阅源Observable,然后对发射的每项数据进行function函数延迟
    }).delay(Observable.timer(3, TimeUnit.SECONDS), new Function<Integer, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Integer integer) throws Exception {
            System.out.println("--> apply(3): " + integer);
            return Observable.timer(integer, TimeUnit.SECONDS);
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(3)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(3): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(3): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onCompleted(3)");
        }
    });

    System.in.read();

输出:

--> onSubscribe(1)
--> doOnNext(1): 1
--> doOnNext(1): 2
--> doOnNext(1): 3
--> doOnNext(1): 4
--> doOnNext(1): 5
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
--> onNext(1): 4
--> onNext(1): 5
--> onCompleted(1)
-----------------------------------------------------
--> onSubscribe(2)
--> doOnNext(2): 1
--> ObservableSource(2): 1
--> doOnNext(2): 2
--> ObservableSource(2): 2
--> doOnNext(2): 3
--> ObservableSource(2): 3
--> doOnNext(2): 4
--> ObservableSource(2): 4
--> doOnNext(2): 5
--> ObservableSource(2): 5
--> onNext(2): 1
--> onNext(2): 2
--> onNext(2): 3
--> onNext(2): 4
--> onNext(2): 5
--> onCompleted(2)
-----------------------------------------------------
--> onSubscribe(3)
--> doOnNext(3): 1
--> apply(3): 1
--> doOnNext(3): 2
--> apply(3): 2
--> doOnNext(3): 3
--> apply(3): 3
--> doOnNext(3): 4
--> apply(3): 4
--> doOnNext(3): 5
--> apply(3): 5
--> onNext(3): 1
--> onNext(3): 2
--> onNext(3): 3
--> onNext(3): 4
--> onNext(3): 5
--> onCompleted(3)

Javadoc: delay(long delay, TimeUnit unit, Scheduler scheduler, boolean delayError)
Javadoc: delay(Function<T, ObservableSource> itemDelay)
Javadoc: delay(ObservableSource subscriptionDelay, Function<T, ObservableSource> itemDelay)

2. Do

注册一个动作作为原始Observable生命周期事件的监听器。

你可以注册指定的回调,当Observable的某个事件发生时,Rxjava 会在与 Observable 链关联的正常通知集合中调用它。

在Rxjava中有许多相关Do的变体,分别进行不同场景的事件监听,一般有下面几种操作方法:

  1. doOnSubscribe(Consumer onSubscribe):一旦有观察者订阅了Observable,就会被调用。
  2. doOnLifecycle(Consumer onSubscribe, Action onDispose): 在观察者订阅产生和解除时被调用。
  3. doOnNext(Consumer onNext):在 Observable 每次发射数据前被调用。
  4. doOnEach(Observer observer): 在 Observable 调用观察者的所有通知前被调用。
  5. doAfterNext(Consumer onAfterNext):在 Observable 调用OnNext通知(数据发射通知)之后被调用。
  6. doOnError(Consumer onError):注册一个动作,当它的 Observable 由于异常终止调用 onError 时会被调用。
  7. doOnTerminate(Action onTerminate): 当Observable终止之前会被调用,无论是正常还是异常终止。
  8. doAfterTerminate(Action onFinally): 当Observable终止之后会被调用,无论是正常还是异常终止。
  9. doOnComplete(Action onComplete):Observable正常终止调用 onCompleted 时会被调用。
  10. doFinally(Action onFinally):Observable终止之后会被调用,无论是正常还是异常终止,但是优先于doAfterTerminate。
  11. doOnDispose(Action onDispose):在观察者调用Disposable的dispose()方法时被调用。

示例代码:

    /**
     *  1. doOnSubscribe(Consumer onSubscribe)
     *  一旦有观察者订阅了Observable,就会被调用
     */
    Observable.just(999).doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> doOnSubscribe");
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(1)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(1): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(1): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onCompleted(1)");
        }
    });

    System.out.println("--------------------------------------------");
    /**
     *  2. doOnLifecycle(Consumer onSubscribe, Action onDispose)
     *  onSubscribe: 接受观察者订阅前的通知,可以在此通知中解除订阅
     *  onDispose:   接受观察者调用解除订阅通知
     *  在观察者订阅产生和解除时调用
     */
    Observable.just(999).doOnLifecycle(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> doOnLifecycle onSubscribe(2)");
            // disposable.dispose();  // 可以在观察者订阅前直接解除订阅
        }
    }, new Action() {
        @Override
        public void run() throws Exception {
            System.out.println("----> doOnLifecycle onDispose(2)");
        }
    }).subscribe(new Observer<Integer>() {
        private Disposable disposable;

        @Override
        public void onSubscribe(Disposable d) {
            disposable = d;
            System.out.println("--> onSubscribe(2)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(2): " + integer);
            disposable.dispose(); // 手动解除订阅
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(2): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onCompleted(2)");
        }
    });

    System.out.println("--------------------------------------------");
    /**
     *  3. doOnNext(Consumer onNext)
     *  在Observable每次发射数据前被调用
     */
    Observable.just(999).doOnNext(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("----> doOnNext(3): " + integer);
        }
    }).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("--> accept(3): " + integer);
        }
    });

    System.out.println("--------------------------------------------");
    /**
     *  4. doOnEach(Observer observer)
     *  在Observable调用观察者的所有通知前被调用
     */
    Observable.just(999).doOnEach(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("----> doOnEach(4) onSubscribe");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("----> doOnEach(4) onNext: " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("----> doOnEach(4) onError: " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("----> doOnEach(4) onComplete");
        }
    }).subscribe(new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(4)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(4): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(4): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onCompleted(4)");
        }
    });

    System.out.println("--------------------------------------------");
    /**
     *  5. doAfterNext(Consumer onAfterNext)
     *  在Observable调用OnNext通知(数据发射通知)之后被调用
     */
    Observable.just(999).doAfterNext(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("----> doAfterNext(5): " + integer);
        }
    }).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("--> onNext(5): " + integer);
        }
    });

    System.out.println("--------------------------------------------");
    /**
     *  6. doOnError(Consumer onError)
     *  注册一个动作,当它的Observable由于异常终止调用 onError 时会被调用
     */
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onError(new Exception("Test Error!"));
        }
    }).doOnError(new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            System.out.println("----> doOnError(6): " + throwable);
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(6)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(6): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(6): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(6)");
        }
    });

    System.out.println("--------------------------------------------");
    /**
     *  7.
     *  doOnTerminate(Action onTerminate):  当Observable终止之前会被调用,无论是正常还是异常终止
     *  doAfterTerminate(Action onFinally): 当Observable终止之后会被调用,无论是正常还是异常终止
     */
    Observable.just(999).doOnTerminate(new Action() {
        @Override
        public void run() throws Exception {
            System.out.println("----> doOnTerminate(7)");
        }
    }).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("--> accept(7): " + integer);
        }
    });

    System.out.println("--------------------------------------------");
    /**
     *  8. doOnComplete(Action onComplete)
     *  Observable正常终止调用 onCompleted 时会被调用
     */
    Observable.just(999).doOnComplete(new Action() {
        @Override
        public void run() throws Exception {
            System.out.println("----> doOnComplete(8)");
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(8)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(8): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(8): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(8)");
        }
    });

    System.out.println("--------------------------------------------");
    /**
     *  9. doFinally(Action onFinally)
     *  Observable终止之后会被调用,无论是正常还是异常终止,但是优先于doAfterTerminate
     */
    Observable.just(999).doFinally(new Action() {
        @Override
        public void run() throws Exception {
            System.out.println("----> doFinally(9)");
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(9)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(9): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(9): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(9)");
        }
    });

    System.out.println("--------------------------------------------");
    /**
     *  10. doOnDispose(Action onDispose)
     *  在观察者调用Disposable的dispose()方法时被调用
     */
    Observable.just(999).doOnDispose(new Action() {
        @Override
        public void run() throws Exception {
            System.out.println("----> doOnDispose(10)");
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(10)");
            d.dispose();
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(10): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(10): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(10)");
        }
    });

输出:

----> doOnSubscribe
--> onSubscribe(1)
--> onNext(1): 999
--> onCompleted(1)
--------------------------------------------
----> doOnLifecycle onSubscribe(2)
--> onSubscribe(2)
--> onNext(2): 999
----> doOnLifecycle onDispose(2)
--------------------------------------------
----> doOnNext(3): 999
--> accept(3): 999
--------------------------------------------
--> onSubscribe(4)
----> doOnEach(4) onNext: 999
--> onNext(4): 999
----> doOnEach(4) onComplete
--> onCompleted(4)
--------------------------------------------
--> onNext(5): 999
----> doAfterNext(5): 999
--------------------------------------------
--> onSubscribe(6)
----> doOnError(6): java.lang.Exception: Test Error!
--> onError(6): java.lang.Exception: Test Error!
--------------------------------------------
--> accept(7): 999
----> doOnTerminate(7)
--------------------------------------------
--> onSubscribe(8)
--> onNext(8): 999
----> doOnComplete(8)
--> onComplete(8)
--------------------------------------------
--> onSubscribe(9)
--> onNext(9): 999
--> onComplete(9)
----> doFinally(9)
--------------------------------------------
--> onSubscribe(10)
----> doOnDispose(10)

Javadoc: doOnSubscribe(Consumer onSubscribe)
Javadoc: doOnLifecycle(Consumer onSubscribe, Action onDispose)
Javadoc: doOnNext(Consumer onNext)
Javadoc: doOnEach(Observer observer)
Javadoc: doAfterNext(Consumer onAfterNext)
Javadoc: doOnError(Consumer onError)
Javadoc: doOnTerminate(Action onTerminate)
Javadoc: doAfterTerminate(Action onFinally)
Javadoc: doOnComplete(Action onComplete)
Javadoc: doFinally(Action onFinally)
Javadoc: doOnDispose(Action onDispose)

3. SubscribeOn

指定Observable自身在哪个调度器上执行。

使用调度器 Scheduler 来管理多线程环境中Observable的转场。你可以使用 SubscribeOn 操作符指定Observable在一个特定的调度器上运转。

示例代码:

    // 查看当前线程id
    System.out.println("----> main: threadID = " + Thread.currentThread().getId());

    /**
     *  subscribeOn(Scheduler scheduler)
     *  指定Observable在指定的scheduler上调度
     */
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // 查看Observable的工作线程id
            System.out.println("----> SubscribeOn: threadID = " + Thread.currentThread().getId());
            emitter.onNext(999);
            emitter.onComplete();
        }
    }).subscribeOn(Schedulers.newThread())  // 指定Observable的工作线程在子线程
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept: " + integer);
                }
            });

    System.in.read();

输出:

----> main: threadID = 1
----> SubscribeOn: threadID = 13
--> accept: 999

Javadoc: subscribeOn(Scheduler scheduler)

4. ObserverOn

指定一个观察者在哪个调度器上观察这个Observable。

使用调度器 Scheduler 来管理多线程环境中Observable的转场。你可以使用 ObserveOn 操作符指定Observable在一个特定的调度器上发送通知给观察者 (调用观察者的onNext, onCompleted , onError 方法)。

示例代码:

    // 查看当前线程id
    System.out.println("----> main: threadID = " + Thread.currentThread().getId());

    /**
     *  observeOn(Scheduler scheduler,
     *  boolean delayError,     // 可选参数是否延迟异常
     *  int bufferSize                    // 指定缓存大小
     *  )
     * 指定观察者在指定的scheduler线程中调度
     */
    Observable.just(999).observeOn(Schedulers.newThread(), true, 3)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    // 查看观察者的线程id
                    System.out.println("--> accept ThreadID: " + Thread.currentThread().getId());
                    System.out.println("--> accept: " + integer);
                }
            });

    System.in.read();

输出:

----> main: threadID = 1
--> accept ThreadID: 13
--> accept: 999

Javadoc: observeOn(Scheduler scheduler)
Javadoc: observeOn(Scheduler scheduler, boolean delayError)
Javadoc: observeOn(Scheduler scheduler, boolean delayError, int bufferSize)

5. Serialize

强制一个Observable连续调用并保证行为正确,其实就是同步事件操作。

一个Observable可以异步调用它的观察者的方法,可能是从不同的线程调用。这可能会让Observable行为不正确,它可能会在某一个 onNext 调用之前尝试调用 onCompleted 或 onError 方法,或者从两个不同的线程同时调用 onNext 方法。使用 Serialize 操作符,你可以纠正这个Observable的行为,保证它的行为是正确的且是同步的。

示例代码:

    /**
     *  serialize()
     *  强制一个Observable连续调用(同步)并保证行为正确
     */
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // 多线程事件调用
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < 10; i++) {
                        emitter.onNext(i + 1);
                    }
                    emitter.onComplete();
                }
            }).start();

            // 多线程事件调用
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i = 100; i < 110; i++) {
                        emitter.onNext(i + 1);
                    }
                    emitter.onComplete();
                }
            }).start();
        }
    }).serialize() // 序列化,合法性操作
      .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("--> onNext: " + integer);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError: " + e);
            }

            @Override
            public void onComplete() {
                System.out.println("--> onComplete");
            }
      });

    System.in.read();

输出:

---------------------------------------------
下面是没有使用Serialize()场景,发现不合法的调用
--> onSubscribe
--> onNext: 1
--> onNext: 2
--> onNext: 3
--> onNext: 4
--> onNext: 5
--> onNext: 6
--> onNext: 7
--> onNext: 8
--> onNext: 101
--> onNext: 102
--> onNext: 103
--> onNext: 104
--> onNext: 105
--> onNext: 9
--> onNext: 106
--> onNext: 10
--> onNext: 107
--> onComplete
--> onNext: 108     // 不合法的调用
----------------------------------------------
下面是使用Serialize()场景,合法的事件调用
--> onSubscribe
--> onNext: 1
--> onNext: 2
--> onNext: 3
--> onNext: 4
--> onNext: 5
--> onNext: 6
--> onNext: 7
--> onNext: 8
--> onNext: 9
--> onNext: 101
--> onNext: 102
--> onNext: 103
--> onNext: 104
--> onNext: 105
--> onNext: 106
--> onNext: 107
--> onNext: 108
--> onNext: 109
--> onNext: 110
--> onComplete

Javadoc: serialize()

6. Materialize

Materialize 将数据项和事件通知都当做数据项发射。

一个合法的有限的Obversable将调用它的观察者的 onNext 方法零次或多次,然后调用观察者的 onCompleted 或 onError 正好一次。 Materialize 操作符将这一系列调用,包括原来的 onNext 通知和终止通知onCompleted 或 onError 都转换为一个Observable发射的数据序列。

解析: 将来自原始Observable的通知转换为 Notification 对象,然后它返回的Observable会发射这些数据。

示例代码:

    /**
     *  materialize()
     *  将来自原始Observable的通知转换为Notification对象,然后它返回的Observable会发射这些数据。
     */
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onError(new Exception("Test Error!"));
            emitter.onComplete();
        }
    }).materialize()
      .subscribe(new Observer<Notification<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe");
            }

            @Override
            public void onNext(Notification<Integer> integerNotification) {
                System.out.println("--> onNext: " + integerNotification);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError: " + e);
            }

            @Override
            public void onComplete() {
                System.out.println("--> onComplete");
            }
      });

输出:

--> onSubscribe
--> onNext: OnNextNotification[1]
--> onNext: OnNextNotification[2]
--> onNext: OnErrorNotification[java.lang.Exception: Test Error!]
--> onComplete

Javadoc: materialize()

7. Dematerialize

Dematerialize 操作符是 Materialize 的逆向过程,它将 Materialize 转换的结果还原成它原本的形式。

解析: dematerialize 反转这个过程,将原始Observable发射的 Notification 对象还原成Observable的通知。

示例代码:

    /**
     *  dematerialize()
     *  过时的方法,在Rxjava:2.2.4中已经被dematerialize(Function<T, Notification<R>> selector)替代
     *  将原始Observable发射的 Notification 对象还原成Observable的通知。
     */
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onError(new Exception("Test Error!"));
            emitter.onComplete();
        }
    }).materialize()
      .dematerialize()  // 将Notification 对象还原成Observable的通知
      .subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe(1)");
            }

            @Override
            public void onNext(Object o) {
                System.out.println("--> onNext(1): " + o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(1): " + e);
            }

            @Override
            public void onComplete() {
                System.out.println("--> onComplete(1)");
            }
      });

    System.out.println("------------------------------------------------");
    /**
     *  dematerialize(Function<T, Notification<R>> selector)
     *  将原始Observable发射的 Notification 对象经过一个selector函数处理后,发射一个新的Notification,
     *  还原成Observable的通知。
     */
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onError(new Exception("Test Error!"));
            emitter.onComplete();
        }
    }).materialize()
      .dematerialize(new Function<Notification<Integer>, Notification<Integer>>() {
                @Override
                public Notification<Integer> apply(Notification<Integer> integerNotification) throws Exception {
                    System.out.println("--> apply(2): " + integerNotification);
                    return integerNotification;
                }
      }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe(2)");
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("--> onNext(2): " + integer);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(2): " + e);
            }

            @Override
            public void onComplete() {
                System.out.println("--> onComplete(2)");
            }
    });

输出:

--> onSubscribe(1)
--> onNext(1): 1
--> onNext(1): 2
--> onError(1): java.lang.Exception: Test Error!
------------------------------------------------
--> onSubscribe(2)
--> apply(2): OnNextNotification[1]
--> onNext(2): 1
--> apply(2): OnNextNotification[2]
--> onNext(2): 2
--> apply(2): OnErrorNotification[java.lang.Exception: Test Error!]
--> onError(2): java.lang.Exception: Test Error!

Javadoc: dematerialize()
Javadoc: dematerialize(Function<T,Notification<R>> selector)

接续:

后续的Observable的辅助操作部分请参考: Rxjava2 Observable的辅助操作详解及实例(二)

Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例

原文地址:https://www.cnblogs.com/jiangming-blogs/p/12127319.html

时间: 2024-10-31 17:15:53

Rxjava2 Observable的辅助操作详解及实例(一)的相关文章

Rxjava2 Observable的辅助操作详解及实例(二)

目录 8. TimeInterval 9. Timeout 9.1 timeout(timeout, timeUnit) 9.2 timeout(timeout, timeUnit, scheduler, other) 9.3 timeout(Function itemTimeoutIndicator, ObservableSource other) 10. Timestamp 11. Using 12. To 小结 接续上篇: Rxjava2 Observable的辅助操作详解及实例(一) 8

Rxjava2 Observable的创建详解及实例

目录 简要: 1. Create 2. Defer 3. Empty/Never/Error 4. Just 5. From 6. Repeat 7. RepeatWhen 8. RepeatUntil 9. Range 10. interval 11. Timer 小结 简要: 几种主要的需求 直接创建一个Observable(创建操作) 组合多个Observable(组合操作) 对Observable发射的数据执行变换操作(变换操作) 从Observable发射的数据中取特定的值(过滤操作)

Rxjava2 Observable的数据过滤详解及实例(一)

目录 简要: 1. Debounce 1.1 debounce(timeout, unit) 1.2 debounce(debounceSelector) 2. Throttle 2.1 throttleFirst(windowDuration, unit) 2.2 throttleLast(intervalDuration, unit) 2.3 throttleWithTimeout(timeout, unit) 3. Sample 3.1 sample(period, unit) 3.2 s

Rxjava2 Observable的数据变换详解及实例(二)

目录 1. Window 1.1 window(closingSelector) 1.2 window(openingIndicator, closingIndicator) 1.3 window(count) 1.4 window(count, skip) 1.5 window(timespan, TimeUnit) 1.6 window(timespan, TimeUnit, count) 1.7 window(timespan, timeskip, TimeUnit) 2. GroupBy

Rxjava2 Observable的数据变换详解及实例(一)

目录 简要: 1.1 buffer(count) 1.2 buffer(boundary) 1.3 buffer(count, skip) 1.4 buffer(timespan, TimeUnit) 1.5 buffer(timespan, TimeUnit, count) 1.6 buffer(timespan, timeskip, TimeUnit) 1.7 buffer(bufferClosingSelector) 2. Map 3. FlatMap 3.1 flatMap(mapper

Rxjava2 Observable的数据过滤详解及实例(二)

目录 6. Filter 7. Frist 7.1 firstElement() 7.2 first(defaultItem) 7.3 firstOrError() 8. Single 8.1 singleElement() 8.2 single(defaultItem) 8.3 singleOrError() 9. ElementAt 9.1 elementAt(index) 9.2 elementAt(index, defaultItem) 9.3 elementAtOrError(inde

RxJava2.0的使用详解

RxJava2.0的使用详解 1,初识RxJava RxJava就是一种用Java语言实现的响应式编程,来创建基于事件的异步程序 RxJava是一个基于事件订阅的异步执行的一个类库,目前比较火的一些技术框架! 参考资料: Github上RxJava的项目地址: https://github.com/ReactiveX/RxJava 技术文档Api: http://reactivex.io/RxJava/javadoc/ RxAndroid,用于 Android 开发: https://githu

PE文件结构与函数导出表——详解与实例

PE文件结构与函数导出表--详解与实例 随着windows系统从Xp升级到Win7.Win8, 从32位升级到64位,PE文件结构在整体未变的情况下发生了一些小的变动,一方面是推荐的程序装载地址未采用,另一方面,导出函数序号不再是简单的升序,而是一定程度上的进行了乱序.本文首先对PE文件结构进行了详尽的解说,接着介绍了如何得出函数导出表,整个过程采用SysWoW64目录下的wininet.dll实例进行说明.在介绍过程中,明确指出了Win7.Win8等新系统相对Xp带来的区别. 文章链接:htt

JavaScript 身份证号有效验证详解及实例代码

JavaScript 身份证号有效验证详解及实例代码 这篇文章主要介绍了JavaScript 身份证号有效验证详解及实例代码的相关资料,需要的朋友可以参考下 JavaScript验证身份证号 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 <%@ page language="jav