Rxjava2 可连接的Observable(ConnectableObservable)操作详解及实例

目录

  • 简要:
  • 1. ConnectableObservable
  • 2. Publish
  • 3. Connect
  • 4. RefCount
  • 5. Share
  • 6. Replay
  • 小结

简要:

需求了解:

Rxjava中的普通的 Observable 在观察者订阅的时候就会发射数据,但是有的时候我们想自己控制数据的发射,比如在有指定的观察者或者全部的观察者订阅后开始发射数据,这个时候我们就要要用到Rxjava中的可连接的Observable来完成这个需求。

这一节主要介绍 ConnectableObservable 和它的子类以及它们的操作符:

  • ConnectableObservable: 一个可连接的Observable,在订阅后不发射数据,调用 connect() 方法后开始发射数据。
  • Observable.publish():将一个Observable转换为一个可连接的Observable 。
  • ConnectableObservable.connect():指示一个可连接的Observable开始发射数据。
  • Observable.replay():确保所有的订阅者看到相同的数据序列,即使它们在Observable开始发射数据之后才订阅。
  • ConnectableObservable.refCount():让一个可连接的Observable表现得像一个普通的Observable。
  • Observable.share():可以直接将Observable转换为一个具有ConnectableObservable特性的Observable对象,等价于Observable.publish().refCount()
  • Observable.replay():保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅。

1. ConnectableObservable

一个可连接的Observable(ConnectableObservable)与普通的Observable差不多。不同之处:可连接的Observable在被订阅时并不开始发射数据,只有在它的 connect() 被调用时才开始。用这种方法,你可以等部分或者所有的潜在订阅者都订阅了这个Observable之后才开始发射数据。


注意: ConnectableObservable 的线程切换只能通过 replay 操作符来实现,普通 Observable 的 subscribeOn()observerOn() 在 ConnectableObservable 中不起作用。可以通过 replay 操作符的指定线程调度器的方式来进行线程的切换。

Javadoc: ConnectableObservable

2. Publish

将普通的Observable转换为可连接的Observable(ConnectableObservable)。

如果要使用可连接的Observable,可以使用Observable的 publish 操作符,来将相应转换为ConnectableObservable对象。

有一个变体接受一个函数作为参数(publish(Function selector))。这个函数用原始Observable发射的数据作为参数,产生 一个新的数据作为 ConnectableObservable 给发射,替换原位置的数据项。实质是在签名的基础上添加一个 Map 操作。

简单实例:

  // 1. publish()
  // 创建ConnectableObservable
  ConnectableObservable<Integer> connectableObservable = Observable.range(1, 5)
          .publish();    // publish操作将Observable转化为一个可连接的Observable

    // 2. publish(Function<Observable<T>, ObservableSource<R>> selector)
  // 接受原始Observable的数据,产生一个新的Observable,可以对这个Observable进行函数处理
  Observable<String> publish = Observable.range(1, 5)
          .publish(new Function<Observable<Integer>, ObservableSource<String>>() {

              @Override
              public ObservableSource<String> apply(Observable<Integer> integerObservable) throws Exception {
                  System.out.println("--> apply(4): " + integerObservable.toString());

                  Observable<String> map = integerObservable.map(new Function<Integer, String>() {

                      @Override
                      public String apply(Integer integer) throws Exception {
                          return "[this is map value]: " + integer * integer;
                      }
                  });
                  return map;
              }
          });

    publish.subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("--> accept(4): " + s);
        }
    });

输出:

--> apply(4): [email protected]
--> accept(4): [this is map value]: 1
--> accept(4): [this is map value]: 4
--> accept(4): [this is map value]: 9
--> accept(4): [this is map value]: 16
--> accept(4): [this is map value]: 25

Javadoc: Observable.publish()
Javadoc: Observable.publish(Function<Observable<T>,ObservableSource<R> selector)

3. Connect

让一个可连接的Observable开始发射数据给订阅者。

  • 可连接的Observable (connectableObservable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了 Connect 操作符时才会开始。
  • RxJava中 connect 是 ConnectableObservable 接口的一个方法,使用 publish 操作符可以将一个普通的Observable转换为一个 ConnectableObservable 。
  • 调用 ConnectableObservable 的 connect 方法会让它后面的Observable开始给发射数据给订阅 者。connect 方法返回一个 Subscription 对象,可以调用它的 unsubscribe 方法让Observable停 止发射数据给观察者。
  • 即使没有任何订阅者订阅它,你也可以使用 connect 方法让一个Observable开始发射数据 (或者开始生成待发射的数据)。这样,你可以将一个"冷"的Observable变为"热"的。

实例代码:

    // 1. publish()
    // 创建ConnectableObservable
    ConnectableObservable<Integer> connectableObservable = Observable.range(1, 5)
            .publish();    // publish操作将Observable转化为一个可连接的Observable

    // 创建普通的Observable
    Observable<Integer> range = Observable.range(1, 5);

    // 1.1 connectableObservable在被订阅时并不开始发射数据,只有在它的 connect() 被调用时才开始
    connectableObservable.subscribe(new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(1)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(1): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(1): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(1)");
        }
    });

    // 1.2 connectableObservable在被订阅时并不开始发射数据,只有在它的 connect() 被调用时才开始
    connectableObservable.subscribe(new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(2)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(2): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(2): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(2)");
        }
    });

    // 1.3 普通Observable在被订阅时就会发射数据
    range.subscribe(new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(3)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(3): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(3): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(3)");
        }
    });

    System.out.println("----------------start connect------------------");
    // 可连接的Observable在被订阅时并不开始发射数据,只有在它的connect()被调用时才开始发射数据
    // connectableObservable.connect();

    // 可选参数Consumer,返回一个Disposable对象,可以获取订阅状态和取消当前的订阅
    connectableObservable.connect(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("--> connect accept: " + disposable.isDisposed());
            // disposable.dispose();
        }
    });

输出:

--> onSubscribe(1)
--> onSubscribe(2)
--> onSubscribe(3)
--> onNext(3): 1
--> onNext(3): 2
--> onNext(3): 3
--> onNext(3): 4
--> onNext(3): 5
--> onComplete(3)
----------------start connect------------------
--> connect accept: false
--> onNext(1): 1
--> onNext(2): 1
--> onNext(1): 2
--> onNext(2): 2
--> onNext(1): 3
--> onNext(2): 3
--> onNext(1): 4
--> onNext(2): 4
--> onNext(1): 5
--> onNext(2): 5
--> onComplete(1)
--> onComplete(2)

Javadoc: ConnectableObservable.connect()
Javadoc: ConnectableObservable.connect(Consumer<Disposable> connection)

4. RefCount

RefCount 的作用是让一个可连接的Observable行为像普通的Observable。

RefCount 操作符把从一个可连接的Observable连接和断开的过程自动化了。它操作一个可连接的Observable,返回一个普通的Observable。当第一个订阅者订阅这个Observable 时, RefCount 连接到下层的可连接Observable。 RefCount 跟踪有多少个观察者订阅它,直到最后一个观察者完成才断开与下层可连接Observable的连接。

解析: refCount() 把 ConnectableObservable 变为一个普通的 Observable 但又保持了 ConnectableObservable 的特性。如果出现第一个 Observer,它就会自动调用 connect(),如果所有的 Observer 全部 dispose,那么它也会停止接受上游 Observable 的数据。

实例代码:

    /**
     * refCount(int subscriberCount, long timeout, TimeUnit unit, Scheduler scheduler)
     *
     * 具有以下可选参数:
     * subscriberCount: 指定需要连接到上游的订阅者数量。注意:当订阅者满足此数量后才会处理
     * timeout:         所有订阅用户退订后断开连接前的等待时间
     * unit:            时间单位
     * scheduler:        断开连接之前要等待的目标调度器
     */
    Observable<Long> refCountObservable = Observable
            .intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)
            .publish()
            .refCount()
            .subscribeOn(Schedulers.newThread())    // 指定订阅调度在子线程
            .observeOn(Schedulers.newThread());     // 指定观察者调度在子线程
        //  .refCount(1, 500, TimeUnit.MILLISECONDS, Schedulers.newThread());

    // 第1个订阅者
    refCountObservable.subscribe(new Observer<Long>() {
        private  Disposable disposable;
        private  int buff = 0;

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("----> onSubscribe(1): ");
            disposable = d;
        }

        @Override
        public void onNext(Long value) {
            if (buff == 3) {
                disposable.dispose();   // 解除当前的订阅
                System.out.println("----> Subscribe(1) is dispose! ");
            } else {
                System.out.println("--> onNext(1): " + value);
            }
            buff++;
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(1): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(1): ");
        }
    });

    // 第2个订阅者
    refCountObservable.doOnSubscribe(new Consumer<Disposable>() {

                @Override
                public void accept(Disposable disposable) throws Exception {
                    System.out.println("----> onSubscribe(2): ");
                }
            })
            .delaySubscription(2, TimeUnit.SECONDS)   // 延迟2秒后订阅
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long value) throws Exception {
                    System.out.println("--> accept(2): " + value);
                }
            });

    System.in.read();

输出:

----> onSubscribe(1):
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
----> onSubscribe(2):
----> Subscribe(1) is dispose!
--> accept(2): 4
--> accept(2): 5

Javadoc: ConnectableObservable.refCount(subscriberCount, timeout, unit, scheduler)

5. Share

一个普通的Observable可以通过 publish 来将其转换为ConnectableObservable,然后可以调用其 refCount() 的方法将其转换为一个具有 ConnectableObservable 特性的Observable。

其实Observable中还有一个操作方法,可以直接完成此步骤的操作,这就是 Observable.share() 操作符。

可以来看一下share操作符的源码:

    public final Observable<T> share() {
        return publish().refCount();
    }

通过源码可以知道,share() 方法可以直接将Observable转换为一个具有ConnectableObservable特性的Observable对象,即Observable.publish().refCount() == Observable.share()

实例代码:

    // share()
    // 通过share() 同时应用 publish 和 refCount 操作
    Observable<Long> share = Observable
            .intervalRange(1, 5, 0, 500, TimeUnit.MILLISECONDS)
      //    .publish().refCount()
            .share()  // 等价于上面的操作
            .subscribeOn(Schedulers.newThread())    // 指定订阅调度在子线程
            .observeOn(Schedulers.newThread());     // 指定观察者调度在子线程

    // 1. 第一个订阅者
    share.subscribe(new Observer<Long>() {
        private  Disposable disposable;
        private  int buff = 0;

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("----> onSubscribe(1): ");
            disposable = d;
        }

        @Override
        public void onNext(Long value) {
            if (buff == 3) {
                disposable.dispose();   // 解除当前的订阅
                System.out.println("----> Subscribe(1) is dispose! ");
            } else {
                System.out.println("--> onNext(1): " + value);
            }
            buff++;
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(1): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(1): ");
        }
    });

    // 2. 第二个订阅者
    share.doOnSubscribe(new Consumer<Disposable>() {

                @Override
                public void accept(Disposable disposable) throws Exception {
                    System.out.println("----> onSubscribe(2): ");
                }
            })
            .delaySubscription(1, TimeUnit.SECONDS)    // 延迟1秒后订阅
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long value) throws Exception {
                    System.out.println("--> accept(2): " + value);
                }
            });

    System.in.read();

输出:

----> onSubscribe(1):
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
----> onSubscribe(2):
----> Subscribe(1) is dispose!
--> accept(2): 4
--> accept(2): 5

Javadoc: Observable.share()

6. Replay

保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅。

如果在将一个Observable转换为可连接的Observable之前对它使用 Replay 操作符,产生的这个可连接Observable将总是发射完整的数据序列给任何未来的观察者,可以缓存发射过的数据,即使那些观察者在这 个Observable开始给其它观察者发射数据之后才订阅。

注意: replay操作符生成的 connectableObservable ,如果没有对缓存进行限定,那么无论观察者何时去订阅,都可以收到 Observable 完整的数据序列项。

replay 操作符最好根据实际情况限定缓存的大小,否则数据发射过快或者较多时会占用很高的内存。replay 操作符有可以接受不同参数的变体,有的可以指定 replay 的最大缓存数量或者指定缓存时间,还可以指定调度器。

  • replay不仅可以缓存Observable的所有数据序列,也可以进行限定缓存大小的操作。
  • 还有有一种 replay 返回一个普通的Observable。它可以接受一个变换函数为参数,这个函数接受原始Observable发射的数据项为参数,返回结果Observable要发射的一项数据。因此,这个操作符其实是 replay 变换之后的数据项。

实例代码:

    // 创建发射数据的Observable
    Observable<Long> observable = Observable
            .intervalRange(1,
                    10,
                    1,
                    500,
                    TimeUnit.MILLISECONDS,
                    Schedulers.newThread());

    /**
     * 1.1 replay(Scheduler scheduler)
     * 可选参数:scheduler, 指定线程调度器
     * 接受原始数据的所有数据
     */
//  ConnectableObservable<Long> replay1 = observable.replay();

    /**
     * 1.2 replay(int bufferSize, Scheduler scheduler)
     * 可选参数:scheduler, 指定线程调度器
     * 只缓存 bufferSize 个最近的原始数据
     */
//  ConnectableObservable<Long> replay1 = observable.replay(1); // 设置缓存大小为1, 从原数据中缓存最近的1个数据

    /**
     * 1.3 replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler)
     * 可选参数:scheduler, 指定线程调度器
     * 在订阅前指定的时间段内缓存 bufferSize 个数据, 注意计时开始是原始数据发射第1个数据项之后开始
     */
//  ConnectableObservable<Long> replay1 = observable.replay(5, 1000, TimeUnit.MILLISECONDS);

    /**
     * 1.4 replay(long time, TimeUnit unit, Scheduler scheduler)
     * 可选参数:scheduler, 指定线程调度器
     * 在订阅前指定的时间段内缓存数据, 注意计时开始是原始数据发射第1个数据项之后开始
     */
   ConnectableObservable<Long> replay1 = observable.replay( 1000, TimeUnit.MILLISECONDS);

    // 进行 connect 操作
    replay1.connect();

    // 第一个观察者
    replay1.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> onSubScribe(1-1)");
        }
    }).subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            System.out.println("--> accept(1-1): " + aLong);
        }
    });

    // 第二个观察者(延迟1秒后订阅)
    replay1.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> onSubScribe(1-2)");
        }
    }).delaySubscription(1, TimeUnit.SECONDS)
      .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println("--> accept(1-2): " + aLong);
            }
      });

    // 第三个观察者(延迟2秒后订阅)
    replay1.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> onSubScribe(1-3)");
        }
    }).delaySubscription(2, TimeUnit.SECONDS)
       .subscribe(new Consumer<Long>() {
           @Override
           public void accept(Long aLong) throws Exception {
               System.out.println("--> accept(1-3): " + aLong);
           }
       });

    System.in.read();
    System.out.println("----------------------------------------------------------");
    /**
     * 2. replay(Function<Observable<T>, ObservableSource<R>> selector,
     * int bufferSize,                              可选参数: 指定从元数据序列数据的缓存大小
     * long time, TimeUnit unit,        可选参数: 指定缓存指定时间段的数据序列
     * Scheduler scheduler)                 可选参数: 指定线程调度器
     *
     * 接受一个变换函数 function 为参数,这个函数接受原始Observable发射的数据项为参数
     * 通过指定的函数处理后,返回一个处理后的Observable
     */
    Observable<String> replayObservable = observable.replay(new Function<Observable<Long>, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Observable<Long> longObservable) throws Exception {
            // 对原始数据进行处理
            Observable<String> map = longObservable.map(new Function<Long, String>() {
                @Override
                public String apply(Long aLong) throws Exception {
                    return aLong + "2 = " + aLong * aLong;  // 将原始数据进行平方处理,并转换为字符串数据类型
                }
            });

            return map;
        }
    }, 1, Schedulers.newThread());

    replayObservable.subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread());

    // 第一个观察者
    replayObservable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("--> onSubScribe(2-1)");
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("--> accept(2-1): " + s);
        }
    });

    // 订阅第二个观察者 (延迟2秒后订阅)
    replayObservable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("--> onSubScribe(2-2)");
        }
    }).delaySubscription(2, TimeUnit.SECONDS)
      .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("--> accept(2-2): " + s);
            }
       });

    System.in.read();

输出:

----> onSubScribe(1-1)
--> accept(1-1): 1
--> accept(1-1): 2
--> accept(1-1): 3
----> onSubScribe(1-2)
--> accept(1-2): 2
--> accept(1-2): 3
--> accept(1-1): 4
--> accept(1-2): 4
--> accept(1-1): 5
--> accept(1-2): 5
----> onSubScribe(1-3)
--> accept(1-3): 4
--> accept(1-3): 5
--> accept(1-1): 6
--> accept(1-2): 6
--> accept(1-3): 6
--> accept(1-1): 7
--> accept(1-2): 7
--> accept(1-3): 7
--> accept(1-1): 8
--> accept(1-2): 8
--> accept(1-3): 8
--> accept(1-1): 9
--> accept(1-2): 9
--> accept(1-3): 9
--> accept(1-1): 10
--> accept(1-2): 10
--> accept(1-3): 10
----------------------------------------------------------
--> onSubScribe(2-1)
--> accept(2-1): 12 = 1
--> accept(2-1): 22 = 4
--> accept(2-1): 32 = 9
--> accept(2-1): 42 = 16
--> onSubScribe(2-2)
--> accept(2-1): 52 = 25
--> accept(2-2): 12 = 1
--> accept(2-2): 22 = 4
--> accept(2-1): 62 = 36
--> accept(2-2): 32 = 9
--> accept(2-1): 72 = 49
--> accept(2-1): 82 = 64
--> accept(2-2): 42 = 16
--> accept(2-2): 52 = 25
--> accept(2-1): 92 = 81
--> accept(2-2): 62 = 36
--> accept(2-1): 102 = 100
--> accept(2-2): 72 = 49
--> accept(2-2): 82 = 64
--> accept(2-2): 92 = 81
--> accept(2-2): 102 = 100

Javadoc: Observable.replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler)
Javadoc: Observable.replay(Function<Observable,ObservableSource> selector, int bufferSize, long time, TimeUnit unit, Scheduler scheduler)

小结

Rxjava 的连接操作符主要的核心是 ConnectableObservable 这个可连接的Observable对象的概念。可连接的 Observable 在被订阅时并不会直接发射数据,只有在他的 connect() 方法被调用时才会发射数据。便于更好的对数据的发射行为的控制,同时也对数据有很好的操作能力,可以缓存数据,指定缓存大小,时间片段缓存等。

提示:以上使用的Rxjava2版本: 2.2.12

Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例

实例代码:

原文地址:https://www.cnblogs.com/jiangming-blogs/p/12127304.html

时间: 2024-10-10 00:58:54

Rxjava2 可连接的Observable(ConnectableObservable)操作详解及实例的相关文章

Rxjava2 Observable的辅助操作详解及实例(二)

目录 8. TimeInterval 9. Timeout 9.1 timeout(timeout, timeUnit) 9.2 timeout(timeout, timeUnit, scheduler, other) 9.3 timeout(Function itemTimeoutIndicator, ObservableSource other) 10. Timestamp 11. Using 12. To 小结 接续上篇: Rxjava2 Observable的辅助操作详解及实例(一) 8

Rxjava2 Observable的辅助操作详解及实例(一)

目录 简要: 1. Delay 2. Do 3. SubscribeOn 4. ObserverOn 5. Serialize 6. Materialize 7. Dematerialize 接续: 简要: 需求了解: Rxjava中有一些方便的辅助操作符,来更方便我们的函数式的编程.比如延迟.定时.指定操作的监听.数据类型转换等一系列的操作. 下面列出了一些用于Observable的辅助操作符: Delay:延时发射Observable的结果. Do:注册一个动作作为原始Observable生

Rxjava2 Observable的结合操作详解及实例

目录 简要: 1. CombineLatest 2. Join 3. Merge 3.1 merge 3.2 mergeDelayError 4. Zip 5. StartWith 6. SwitchOnNext 6.1 switchOnNext 6.2 switchOnNextDelayError 小结 简要: 需求了解: 在使用 RxJava 开发的过程中,很多时候需要结合多个条件或者数据的逻辑判断,比如登录功能的表单验证,实时数据比对等.这个时候我们就需要使用 RxJava 的结合操作符来

Rxjava2 Observable的错误处理操作详解及实例

目录 简要: 1. Catch 1.1 onErrorReturn 1.2 onErrorResumeNext 1.3 onExceptionResumeNext 2. Retry 2.1 retry() 2.2 retry(long times) 2.3 retry(long times, Predicate predicate) 2.4 retry(Predicate predicate) 2.5 retry(BiPredicate predicate) 2.6 retryUntil(Boo

Rxjava2 Observable的创建详解及实例

目录 简要: 1. Create 2. Defer 3. Empty/Never/Error 4. Just 5. From 6. Repeat 7. RepeatWhen 8. RepeatUntil 9. Range 10. interval 11. Timer 小结 简要: 几种主要的需求 直接创建一个Observable(创建操作) 组合多个Observable(组合操作) 对Observable发射的数据执行变换操作(变换操作) 从Observable发射的数据中取特定的值(过滤操作)

Rxjava2 Observable的数据变换详解及实例(一)

目录 简要: 1.1 buffer(count) 1.2 buffer(boundary) 1.3 buffer(count, skip) 1.4 buffer(timespan, TimeUnit) 1.5 buffer(timespan, TimeUnit, count) 1.6 buffer(timespan, timeskip, TimeUnit) 1.7 buffer(bufferClosingSelector) 2. Map 3. FlatMap 3.1 flatMap(mapper

Rxjava2 Observable的数据过滤详解及实例(一)

目录 简要: 1. Debounce 1.1 debounce(timeout, unit) 1.2 debounce(debounceSelector) 2. Throttle 2.1 throttleFirst(windowDuration, unit) 2.2 throttleLast(intervalDuration, unit) 2.3 throttleWithTimeout(timeout, unit) 3. Sample 3.1 sample(period, unit) 3.2 s

Rxjava2 Observable的数据变换详解及实例(二)

目录 1. Window 1.1 window(closingSelector) 1.2 window(openingIndicator, closingIndicator) 1.3 window(count) 1.4 window(count, skip) 1.5 window(timespan, TimeUnit) 1.6 window(timespan, TimeUnit, count) 1.7 window(timespan, timeskip, TimeUnit) 2. GroupBy

Rxjava2 Observable的数据过滤详解及实例(二)

目录 6. Filter 7. Frist 7.1 firstElement() 7.2 first(defaultItem) 7.3 firstOrError() 8. Single 8.1 singleElement() 8.2 single(defaultItem) 8.3 singleOrError() 9. ElementAt 9.1 elementAt(index) 9.2 elementAt(index, defaultItem) 9.3 elementAtOrError(inde