Rxjava2 Observable的数据过滤详解及实例(一)

目录

  • 简要:
  • 1. Debounce
    • 1.1 debounce(timeout, unit)
    • 1.2 debounce(debounceSelector)
  • 2. Throttle
    • 2.1 throttleFirst(windowDuration, unit)
    • 2.2 throttleLast(intervalDuration, unit)
    • 2.3 throttleWithTimeout(timeout, unit)
  • 3. Sample
    • 3.1 sample(period, unit)
    • 3.2 sample(sampler)
  • 4. Distinct
    • 4.1 distinct()
    • 4.2 distinct(keySelector)
    • 4.3 distinctUntilChanged()
    • 4.4 distinctUntilChanged(keySelector)
  • 5. Skip
    • 5.1 skip(count)
    • 5.2 skip(time, unit)
    • 5.3 skipLast(count)
    • 5.4 skipLast(time, unit)
  • 接续:

简要:

需求了解:

对于数据的观察以及处理过程中往往有需要过滤一些不需要的数据的需求,比如防抖(防止快速操作),获取第一项、指定序列项或者最后一项的需要,获取指定时间内的有效数据等。Rx中提供了丰富的数据过滤处理的操作方法。

可用于过滤和选择Observable发射的数据序列的方法:

  • Debounce:过滤发射速率较快的数据项,防抖操作。
  • Throttle: 对数据序列进行限流操作,可以指定获取周期内的指定数据项,也可以用于防抖。
  • Sample: 允许通过将序列划分为时间片段收集数据,并从每片中取出一个值来稀疏序列。
  • Distinct: 过滤掉重复数据。
  • Skip: 跳过指定的N项数据。
  • Filter: 通过函数指定过滤的数据。
  • First: 只发射第一项或者满足某个条件的第一项数据。
  • Single: 与 first 类似,但是如果原始Observable在完成之前不是正好发射一次数据,它会抛出一个NoSuchElementException 的异常通知。
  • ElementAt: 获取原始Observable发射的数据序列指定索引位置的数据项,然后当做自己的唯一数据发射。
  • ignoreElements: 不发射任何数据,只发射Observable的终止通知。
  • Last: 只发射最后一项(或者满足某个条件的最后一项)数据。
  • Take: 只返回Observable发送数据项序列前面的N项数据,忽略剩余的数据。
  • TakeLast: 只发射Observable发送数据项序列的后N项数据,忽略其他数据。
  • ofType: 过滤一个Observable只返回指定类型的数据。

1. Debounce

仅在过了一段指定的时间还没发射数据时才发射一个数据。Debounce 操作符会过滤掉发射速率过快的数据项。

提示: 操作默认在 computation 调度器上执行,但是你可以指定其它的调度器。

1.1 debounce(timeout, unit)

指定每个数据发射后在 timeout 时间内,原始数据序列中没有下一个数据发射时,发射此项数据,否则丢弃这项数据。此操作与 throttleWithTimeout 方法相同。

注意: 这个操作符会在原始数据的 onCompleted 时候直接发射发射数据,不会因为限流而丢弃数据。

实例代码:

     // 1. debounce(long timeout, TimeUnit unit)
     // 发送一个数据,如果在包含timeout时间内,没有第二个数据发射,那么就会发射此数据,否则丢弃此数据
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);  // 下一个数据到此数据发射, 30 < timeout    --> skip
            Thread.sleep(30);
            emitter.onNext(2);  // 下一个数据到此数据发射, 100 > timeout   --> deliver
            Thread.sleep(100);
            emitter.onNext(3);  // 下一个数据到此数据发射, 50 = timeout    --> skip:
            Thread.sleep(50);
            emitter.onNext(4);  // 下一个数据到此数据发射, onCompleted     --> deliver
            emitter.onComplete();

        }
    }).debounce(50, TimeUnit.MILLISECONDS)  // 指定防抖丢弃时间段为50毫秒
    //  .debounce(50, TimeUnit.MILLISECONDS, Schedulers.trampoline())   // 指定调度为当前线程排队
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept debounce(1-1): " + t);
            }
        });

输出:

--> accept debounce(1-1): 2
--> accept debounce(1-1): 4

Javadoc: debounce(timeout, unit)
Javadoc: debounce(timeout, unit, scheduler)

1.2 debounce(debounceSelector)

原始数据发射每一个序列都通过绑定监听debounceSelector的数据通知,在debounceSelector数据发送前,如果有下一个数据,则丢弃当前项数据,继续监视下一个数据。

注意: 这个操作符会在原始数据的 onCompleted 时候直接发射发射数据,不会因为限流而丢弃数据。

实例代码:

    // 2. debounce(debounceSelector)
    // 原始数据发射每一个序列的通过监听debounceSelector的数据通知,
    // 在debounceSelector数据发送前,如果有下一个数据,则丢弃当前项数据,继续监视下一个数据
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);      // skip     --> debounceSelector is no emitter(<2s)
            Thread.sleep(1000);
            emitter.onNext(2);      // skip     --> debounceSelector is no emitter(<2s)
            Thread.sleep(200);
            emitter.onNext(3);      // deliver  --> debounceSelector is emitter(>2s)
            Thread.sleep(2500);
            emitter.onNext(4);      // skip     --> debounceSelector is no emitter(=2s)
            Thread.sleep(2000);
            emitter.onNext(5);      // deliver  --> onComplete
            Thread.sleep(500);
            emitter.onComplete();
        }
    }).debounce(new Function<Integer, ObservableSource<Long>>() {

            @Override
            public ObservableSource<Long> apply(Integer t) throws Exception {
                System.out.println("--> apply(1-2): " + t);
                // 设置过滤延迟时间为2秒,此时返回的Observable从订阅到发送数据时间段即为timeout
                return Observable.timer(2, TimeUnit.SECONDS)
                        .doOnSubscribe(new Consumer<Disposable>() {

                            @Override
                            public void accept(Disposable t) throws Exception {
                                // 开始订阅,监听数据的发送来过滤数据
                                System.out.println("--> debounceSelector(1-2) is onSubscribe!");
                            }
                        }).doOnDispose(new Action() {

                            @Override
                            public void run() throws Exception {
                                // 发射数据后,丢弃当前的数据,解除当前绑定
                                System.out.println("--> debounceSelector(1-2) is unSubscribe!");
                            }
                        });
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("----------> accept(1-2): " + t);
            }
        });

输出:

--> apply(1-2): 1
--> debounceSelector(1-2) is onSubscribe!
--> debounceSelector(1-2) is unSubscribe!
--> apply(1-2): 2
--> debounceSelector(1-2) is onSubscribe!
--> debounceSelector(1-2) is unSubscribe!
--> apply(1-2): 3
--> debounceSelector(1-2) is onSubscribe!
--> debounceSelector(1-2) is unSubscribe!
----------> accept(1-2): 3
--> apply(1-2): 4
--> debounceSelector(1-2) is onSubscribe!
--> debounceSelector(1-2) is unSubscribe!
--> apply(1-2): 5
--> debounceSelector(1-2) is onSubscribe!
----------> accept(1-2): 5
--> debounceSelector(1-2) is unSubscribe!

Javadoc: debounce(debounceSelector)

2. Throttle

主要应用于数据序列的节流操作,在指定的采样周期内获取指定的数据。Throttling 也用于稀疏序列。当生产者发出的值超出我们想要的值时,我们不需要每个序列值,我们可以通过限制它来稀释序列。

注意: 时间的划分不一定是统一的。例如,发射数据的时间间隔与划分数据的时间间隔一致时,在原始数据发送的一个时间点(此时数据还没有实际发送),此时可能由于划分时间已到,划分的数据片直接关闭了,所以有的时间片数据会有时间间隙差异。

提示: 操作默认在 computation 调度器上执行,但是你可以指定其它的调度器。

2.1 throttleFirst(windowDuration, unit)

获取每个 windowDuration 时间段内的原始数据序列中的第一项数据,直到原始数据全部发送完毕。

解析: 实际在每个采样周期内,先发送第一项接收到的数据,然后丢弃后续周期内的数据项。

实例代码:

    // 1. throttleFirst(long windowDuration, TimeUnit unit)
    // 指定每个指定时间内取第一项数据, 直到原始数据序列全部发送结束
    Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS)
        .doOnNext(new Consumer<Long>() {

            @Override
            public void accept(Long t) throws Exception {
                System.out.println("--> DataSource doOnNext : " + t);
            }
        }).throttleFirst(2, TimeUnit.SECONDS)                           // 获取每隔2秒之内收集的第一项数据
     //   .throttleFirst(2, TimeUnit.SECONDS, Schedulers.newThread())   // 指定调度线程为newThread()
          .subscribe(new Observer<Long>() {

                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> throttleFirst onSubscribe");
                }

                @Override
                public void onNext(Long t) {
                    System.out.println("-------------> throttleFirst onNext: " + t);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> throttleFirst onError: " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> throttleFirst onComplete");
                }
            });

输出:

--> throttleFirst onSubscribe
--> DataSource doOnNext : 1
-------------> throttleFirst onNext: 1
--> DataSource doOnNext : 2
--> DataSource doOnNext : 3
--> DataSource doOnNext : 4
-------------> throttleFirst onNext: 4
--> DataSource doOnNext : 5
--> DataSource doOnNext : 6
--> DataSource doOnNext : 7
-------------> throttleFirst onNext: 7
--> DataSource doOnNext : 8
--> DataSource doOnNext : 9
-------------> throttleFirst onNext: 9
--> DataSource doOnNext : 10
--> throttleFirst onComplete

Javadoc: throttleFirst(windowDuration, unit)
Javadoc: throttleFirst(windowDuration, unit, scheduler)

2.2 throttleLast(intervalDuration, unit)

获取每个 windowDuration 时间段内的原始数据序列中的最近的一项数据,直到原始数据全部发送完毕。throttleLast 运算符以固定间隔而不是相对于最后一项来划分时间。它会在每个窗口中发出最后一个值,而不是它后面的第一个值。

解析: 实际在每个采样周期内,先缓存收集的数据,等周期结束发送最后一项数据,丢弃最后数据项前面的数据。

实例代码:

    // 2. throttleLast(long intervalDuration, TimeUnit unit)
    // 指定间隔时间内取最后一项数据,直到原始数据序列全部发送结束
    Observable.intervalRange(1, 10, 0, 1050, TimeUnit.MILLISECONDS)
        .doOnNext(new Consumer<Long>() {

            @Override
            public void accept(Long t) throws Exception {
                System.out.println("--> DataSource doOnNext : " + t);
            }
        }).throttleLast(2, TimeUnit.SECONDS)                            // 获取每隔2秒之内收集的最后一项数据
     //   .throttleLast(2, TimeUnit.SECONDS, Schedulers.newThread())    // 指定调度线程为newThread()
          .subscribe(new Observer<Long>() {

                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> throttleLast onSubscribe");
                }

                @Override
                public void onNext(Long t) {
                    System.out.println("-------------> throttleLast onNext: " + t);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> throttleLast onError: " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> throttleLast onComplete");
                }
            });

输出:

--> throttleLast onSubscribe
--> DataSource doOnNext : 1
--> DataSource doOnNext : 2
-------------> throttleLast onNext: 2
--> DataSource doOnNext : 3
--> DataSource doOnNext : 4
-------------> throttleLast onNext: 4
--> DataSource doOnNext : 5
--> DataSource doOnNext : 6
-------------> throttleLast onNext: 6
--> DataSource doOnNext : 7
--> DataSource doOnNext : 8
-------------> throttleLast onNext: 8
--> DataSource doOnNext : 9
--> DataSource doOnNext : 10
--> throttleLast onComplete

Javadoc: throttleLast(intervalDuration, unit)
Javadoc: throttleLast(intervalDuration, unit, scheduler)

2.3 throttleWithTimeout(timeout, unit)

指定每个数据发射后在 timeout 时间内,原始数据序列中没有下一个数据发射时,发射此项数据,否则丢弃这项数据。此操作与 debounce 方法相同。

注意: 这个操作符会在原始数据的 onCompleted 时候直接发射发射数据,不会因为限流而丢弃数据。

实例代码:

    // 3. throttleWithTimeout(long timeout, TimeUnit unit)
    // 发送一个数据,如果在包含timeout时间内,没有第二个数据发射,那么就会发射此数据,否则丢弃此数据
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);  // 下一个数据到此数据发射, --> skip:       30 < timeout
            Thread.sleep(30);
            emitter.onNext(2);  // 下一个数据到此数据发射, --> skip:       50 = timeout
            Thread.sleep(50);
            emitter.onNext(3);  // 下一个数据到此数据发射, --> deliver:    60 > timeout
            Thread.sleep(60);
            emitter.onNext(4);  // onComplete           --> deliver:    onComplete
            emitter.onComplete();
        }
    }).throttleWithTimeout(50, TimeUnit.MILLISECONDS) // 指定防抖丢弃时间段为50毫秒
 //   .throttleWithTimeout(50, TimeUnit.MILLISECONDS, Schedulers.newThread()) // 指定调度线程为newThread()
      .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                // TODO Auto-generated method stub
                System.out.println("--> accept throttleWithTimeout(3): " + t);
            }
        });

输出:

--> accept throttleWithTimeout(3): 3
--> accept throttleWithTimeout(3): 4

Javadoc: throttleWithTimeout(timeout, unit)
Javadoc: throttleWithTimeout(timeout, unit, scheduler)

3. Sample

sample 允许您通过将序列划分为时间片段,并从每片中取出一个值来稀疏序列。当每片结束时,将发出其中的最后一个值(如果有的话)。

注意: 时间的划分不一定是统一的。例如,发射数据的时间间隔与划分数据的时间间隔一致时,在原始数据发送的一个时间点(此时数据还没有实际发送),此时可能由于划分时间已到,划分的数据片直接关闭了,所以有的时间片数据会有时间间隙差异。

3.1 sample(period, unit)

获取每个 period 时间片段内手机收据序列的最后一项,忽略此时间片内收集的其他数据项。

实例代码:

    // 1. sample(long period, TimeUnit unit)/sample(long period, TimeUnit unit)
    // 将序列分为 period 的时间片段,从每片重取出最近的一个数据
    // 等同于throttleLast
    Observable.intervalRange(1, 5, 0, 1100, TimeUnit.MILLISECONDS)
        .doOnNext(new Consumer<Long>() {

            @Override
            public void accept(Long t) throws Exception {
                System.out.println("--> DataSource onNext: " + t);
            }
        }).sample(2, TimeUnit.SECONDS)                              // 每3秒时间段数据中取最近一个值
    //    .sample(2, TimeUnit.SECONDS, true)                        // 参数emitLast,设置是否忽略未采样的最后一个数据
    //    .sample(2, TimeUnit.SECONDS, Schedulers.newThread())      // 指定调度器为newThread()
          .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept(1): " + t);
                }
          });

输出:

--> DataSource onNext: 1
--> DataSource onNext: 2
--> accept(1): 2
--> DataSource onNext: 3
--> DataSource onNext: 4
--> accept(1): 4
--> DataSource onNext: 5

Javadoc: sample(long period, TimeUnit unit)
Javadoc: sample(long period, TimeUnit unit, emitLast)
Javadoc: sample(long period, TimeUnit unit, scheduler)
Javadoc: sample(long period, TimeUnit unit, scheduler, emitLast)

3.2 sample(sampler)

sample 的这个方法每当第二个 sampler 发射一个数据(或者当它终止)时就对原始 Observable 进行采样。第二个Observable通过参数传递给 sample

实例代码:

    // 2. sample(ObservableSource sampler)
    // 每当第二个 sampler 发射一个数据(或者当它终止)时就对原始 Observable进行采样
    Observable.intervalRange(1, 5, 0, 1020, TimeUnit.MILLISECONDS)
        .doOnNext(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> DataSource onNext: " + t);
                }
        }).sample(Observable.interval(2, TimeUnit.SECONDS)) // 每隔2秒进行一次采样
          .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept(2): " + t);
                }
          });

输出:

--> DataSource onNext: 1
--> DataSource onNext: 2
--> accept(2): 2
--> DataSource onNext: 3
--> DataSource onNext: 4
--> accept(2): 4
--> DataSource onNext: 5

Javadoc: sample(sampler)
Javadoc: sample(sampler, emitLast)

4. Distinct

抑制(过滤掉)重复的数据项。Distinct 的过滤规则是:只允许还没有发射过的数据项通过。

在某些实现中,有一些方法中允许你调整判定两个数据不同( distinct )的标准。还有一些实现只比较一项数据和它的直接前驱,因此只会从序列中过滤掉连续重复的数据。

4.1 distinct()

只允许还没有发射过的数据项通过,过滤数据序列中的所有重复的数据项,保证处理后的数据序列没有重复。

示例代码:

    // 1. distinct()
    // 去除全部数据中重复的数据
    Observable.just(1, 2, 3, 3, 3, 4, 4, 5, 6, 6)
            .distinct()
            .subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept distinct(1): " + t);
                }
            });

输出:

--> accept distinct(1): 1
--> accept distinct(1): 2
--> accept distinct(1): 3
--> accept distinct(1): 4
--> accept distinct(1): 5
--> accept distinct(1): 6

Javadoc: distinct()

4.2 distinct(keySelector)

这个操作符接受一个函数。这个函数根据原始Observable发射的数据项产生一个 Key,然后,比较这些Key而不是数据本身,来判定两个数据是否是不同的

实例代码:

    // 数根据原始Observable发射的数据项产生一个 Key,然后比较这些Key而不是数据本身,来判定两个数据是否是不同的(去除全部数据中重复的数据)
    Observable.just(1, 2, 3, 3, 4, 5, 6, 6)
            .distinct(new Function<Integer, String>() {

                @Override
                public String apply(Integer t) throws Exception {
                    // 根据奇数或偶数来判断数据序列的重复的key
                    return t % 2 == 0 ? "even" : "odd";
                }
            }).subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept distinct(2): " + t);
                }
            });

输出:

--> accept distinct(2): 1
--> accept distinct(2): 2

Javadoc: distinct(keySelector)

4.3 distinctUntilChanged()

distinctUntilChanged 操作符,去除数据序列中的连续重复项。它只判定一个数据和它的直接前驱是否是不同的。

实例代码:

    // 3. distinctUntilChanged()
    // 去除连续重复的数据
    Observable.just(1, 2, 3, 3, 4, 5, 6, 6, 3, 2)
        .distinctUntilChanged()
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept distinctUntilChanged(3): " + t);
            }
        });

输出:

--> accept distinctUntilChanged(3): 1
--> accept distinctUntilChanged(3): 2
--> accept distinctUntilChanged(3): 3
--> accept distinctUntilChanged(3): 4
--> accept distinctUntilChanged(3): 5
--> accept distinctUntilChanged(3): 6
--> accept distinctUntilChanged(3): 3
--> accept distinctUntilChanged(3): 2

Javadoc: distinctUntilChanged()

4.4 distinctUntilChanged(keySelector)

distinctUntilChanged(keySelector) 操作符,根据一个函数产生的 Key 判定两个相邻的数据项是不是相同的,去除连续重复的数据。

实例代码:

    // 4. distinctUntilChanged(Function<T,K>)
    // 数根据原始Observable发射的数据项产生的 Key,去除连续重复的数据
    Observable.just(8, 2, 3, 5, 9, 5, 6, 6)
            .distinctUntilChanged(new Function<Integer, String>() {

                @Override
                public String apply(Integer t) throws Exception {
                    // 根据原始数据处理后添加key,依据这个key来判断是否重复(去除连续重复的数据)
                    return t % 2 == 0 ? "even" : "odd";
                }
            }).subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept distinctUntilChanged(4): " + t);
                }
            });

输出:

--> accept distinctUntilChanged(4): 8
--> accept distinctUntilChanged(4): 3
--> accept distinctUntilChanged(4): 6

Javadoc: distinctUntilChanged(keySelector)

5. Skip

主要用于忽略Observable发射的指定的 N 项数据,如跳过数据序列的前面或后面 N 项数据,指定时间段内的数据项。

Skip 操作符的还有一些变体的操作方法如下:

5.1 skip(count)

忽略 Observable 发射的前 N 项数据,只保留之后的数据。

实例代码:

    // 1. skip(long count)
    // 跳过前count项数据,保留后面的数据
    Observable.range(1, 10)
        .skip(5) // 过滤数据序列前5项数据
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept skip(1): " + t);
            }
        });

输出:

--> accept skip(1): 6
--> accept skip(1): 7
--> accept skip(1): 8
--> accept skip(1): 9
--> accept skip(1): 10

Javadoc: skip(count)

5.2 skip(time, unit)

skip 的这个变体接受一个时长参数,它会丢弃原始Observable开始的那段时间段发射的数据,时长和时间单位通过参数指定。

实例代码:

    // 2. skip(long time, TimeUnit unit)
    // 跳过开始的time时间段内的数据,保留后面的数据
    Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
        .skip(2, TimeUnit.SECONDS)  // 跳过前2秒的数据
        .subscribe(new Consumer<Long>() {

            @Override
            public void accept(Long t) throws Exception {
                System.out.println("--> accept skip(2): " + t);
            }
        });

输出:

--> accept skip(2): 4
--> accept skip(2): 5

Javadoc: skip(time, unit)
Javadoc: skip(time, unit, scheduler)

5.3 skipLast(count)

使用 SkipLast 操作符修改原始Observable,你可以忽略Observable发射的后 N 项数据,只保留前面的数据。

实例代码:

    // 3. skipLast(int count)
    // 跳过数据后面的count个数据
    Observable.range(1, 10)
        .skipLast(5) // 跳过数据序列的后5项数据
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept skipLast(3): " + t);
            }
        });

输出:

--> accept skipLast(3): 1
--> accept skipLast(3): 2
--> accept skipLast(3): 3
--> accept skipLast(3): 4
--> accept skipLast(3): 5

Javadoc: skipLast(count)

5.4 skipLast(time, unit)

还有一个 skipLast 变体接受一个时间段参数,它会丢弃在原始 Observable 的生命周期内最后一段时间内发射的数据。时长和时间单位通过参数指定。

注意: 这个机制是这样实现的:延迟原始 Observable 发射的任何数据项,直到自原始数据发射之后过了给定的时长之后,才开始发送数据。

实例代码:

    // 4. skipLast(long time, TimeUnit unit, [boolean delayError])
    // 丢弃在原始Observable的生命周 期内最后time时间内发射的数据
    // 可选参数delayError:延迟异常通知
    Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS)
        .doOnNext(new Consumer<Long>() {

            @Override
            public void accept(Long t) throws Exception {
                System.out.println("--> DataSource: " + t);
            }
        }).skipLast(2, TimeUnit.SECONDS)
    //    .skipLast(2, TimeUnit.SECONDS, Schedulers.trampoline()) // 通过scheduler指定工作线程
    //    .skipLast(2, TimeUnit.SECONDS, true)                  // 延迟Error的通知,多用于组合Observable的场景
          .subscribe(new Consumer<Long>() {

              @Override
              public void accept(Long t) throws Exception {
                  System.out.println("--> accept skipLast(4): " + t);
              }
          });

输出:

--> DataSource: 1
--> DataSource: 2
--> DataSource: 3
--> accept skipLast(4): 1
--> DataSource: 4
--> accept skipLast(4): 2
--> DataSource: 5
--> accept skipLast(4): 3
--> DataSource: 6
--> accept skipLast(4): 4
--> DataSource: 7
--> accept skipLast(4): 5
--> DataSource: 8
--> accept skipLast(4): 6
--> DataSource: 9
--> accept skipLast(4): 7
--> DataSource: 10
--> accept skipLast(4): 8

注意: skipLast 的这个操作默认在 computation 调度器上执行,但是你可以使用Scheduler参数指定其 它的调度器。
Javadoc: skipLast(time, unit)
Javadoc: skipLast(time, unit, delayError)
Javadoc: skipLast(time, unit, scheduler)
Javadoc: skipLast(time, unit, scheduler, delayError)
Javadoc: skipLast(time, unit, scheduler, delayError, bufferSize)

接续:

后续的Rx相关数据过滤部分请参考: Rxjava2 Observable的数据过滤详解及实例(二)

Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例

原文地址:https://www.cnblogs.com/jiangming-blogs/p/12127275.html

时间: 2024-10-07 06:43:12

Rxjava2 Observable的数据过滤详解及实例(一)的相关文章

Rxjava2 Observable的数据过滤详解及实例(二)

目录 6. Filter 7. Frist 7.1 firstElement() 7.2 first(defaultItem) 7.3 firstOrError() 8. Single 8.1 singleElement() 8.2 single(defaultItem) 8.3 singleOrError() 9. ElementAt 9.1 elementAt(index) 9.2 elementAt(index, defaultItem) 9.3 elementAtOrError(inde

Rxjava2 Observable的数据变换详解及实例(二)

目录 1. Window 1.1 window(closingSelector) 1.2 window(openingIndicator, closingIndicator) 1.3 window(count) 1.4 window(count, skip) 1.5 window(timespan, TimeUnit) 1.6 window(timespan, TimeUnit, count) 1.7 window(timespan, timeskip, TimeUnit) 2. GroupBy

Rxjava2 Observable的数据变换详解及实例(一)

目录 简要: 1.1 buffer(count) 1.2 buffer(boundary) 1.3 buffer(count, skip) 1.4 buffer(timespan, TimeUnit) 1.5 buffer(timespan, TimeUnit, count) 1.6 buffer(timespan, timeskip, TimeUnit) 1.7 buffer(bufferClosingSelector) 2. Map 3. FlatMap 3.1 flatMap(mapper

Rxjava2 Observable的结合操作详解及实例

目录 简要: 1. CombineLatest 2. Join 3. Merge 3.1 merge 3.2 mergeDelayError 4. Zip 5. StartWith 6. SwitchOnNext 6.1 switchOnNext 6.2 switchOnNextDelayError 小结 简要: 需求了解: 在使用 RxJava 开发的过程中,很多时候需要结合多个条件或者数据的逻辑判断,比如登录功能的表单验证,实时数据比对等.这个时候我们就需要使用 RxJava 的结合操作符来

Rxjava2 Observable的条件操作符详解及实例

目录 简要: 1. Amb 2. DefaultIfEmpty 3. SwitchIfEmpty 4. SkipUntil 5. SkipWhile 6. TakeUntil 6.1 takeUntil(ObservableSource other) 6.2 takeUntil(Predicate stopPredicate) 7. TakeWhile 小结 简要: 需求了解: 在使用 Rxjava 开发中,经常有一些各种条件的操作 ,如比较两个 Observable 谁先发射了数据.跳过指定条

Rxjava2 Observable的辅助操作详解及实例(二)

目录 8. TimeInterval 9. Timeout 9.1 timeout(timeout, timeUnit) 9.2 timeout(timeout, timeUnit, scheduler, other) 9.3 timeout(Function itemTimeoutIndicator, ObservableSource other) 10. Timestamp 11. Using 12. To 小结 接续上篇: Rxjava2 Observable的辅助操作详解及实例(一) 8

Rxjava2 Observable的辅助操作详解及实例(一)

目录 简要: 1. Delay 2. Do 3. SubscribeOn 4. ObserverOn 5. Serialize 6. Materialize 7. Dematerialize 接续: 简要: 需求了解: Rxjava中有一些方便的辅助操作符,来更方便我们的函数式的编程.比如延迟.定时.指定操作的监听.数据类型转换等一系列的操作. 下面列出了一些用于Observable的辅助操作符: Delay:延时发射Observable的结果. Do:注册一个动作作为原始Observable生

JVM 运行时数据区详解

一.运行时数据区: Java虚拟机在执行Java程序的过程中会把它所管理的内存划分为若干个不同数据区域. 1.有一些是随虚拟机的启动而创建,随虚拟机的退出而销毁,所有的线程共享这些数据区. 2.第二种则是与线程一一对应,随线程的开始和结束而创建和销毁,线程之间相互隔离. java虚拟机所管理的内存将会包括以下几个运行时数据区域 二.数据区详解 1.程序计数器(Program Counter Register) 也叫PC寄存器是一块较小的内存空间,它的作用是存储当前线程所执行的字节码的信号指示器.

ContentProvider数据访问详解

ContentProvider数据访问详解 Android官方指出的数据存储方式总共有五种:Shared Preferences.网络存储.文件存储.外储存储.SQLite,这些存储方式一般都只是在一个单独的应用程序中实现数据的共享,而对于需要操作其他应用程序中的数据时(如媒体库.通讯录等),可能就需要借助ContentProvider了. 1.ContentProvider ContentProvider为存储和获取数据提供了统一的接口,使用表的形式来对数据进行封装,使得开发者在后续的开发过程