RxJava----操作符:过滤Observable

到目前为止我们看到的示例都很简单。你也可以用 Rx 来处理大批量实时数据,但是如果把所有大批量数据整个打包发给你的话,使用 Rx 还有啥优势呢? 本节 我们将介绍一些操作符(operators )来过滤数据、或者把所有数据变成一个需要的数据。

如果你了解过函数式编程(functional programming)或者 Java 中的 Stream,则本节介绍的操作函数是非常眼熟的。

本节中所有的操作符都返回一个不影响前一个 Observable 的新 Observable。 整个 Rx 框架都遵守该原则。通过创建新的 Observable 来转换之前的 Observable而不会对之前的 Observable造成干扰。

订阅到初始 Observable 的 Subscribers 不会受到任何影响,但是在后面的章节中也会看到,开发者也需要当心该原则。

过滤操作

这些操作符用于从Observable发射的数据中进行选择

操作符 含义
filter 过滤,过滤掉没有通过谓词测试的数据项,只发射通过测试的
ignoreElements 忽略所有的数据,只保留终止通知(onError或onCompleted)
distinct 去重,过滤掉重复数据项
debounce 只有在空闲了一段时间后才发射数据,通俗的说,就是如果一段时间没有操作,就执行一次操作
elementAt 取值,取特定位置的数据项
first 首项,只发射满足条件的第一条数据
last 末项,只发射最后一条数据
sample 取样,定期发射最新的数据,等于是数据抽样,有的实现里叫ThrottleFirst
skip 跳过前面的若干项数据
skipLast 跳过后面的若干项数据
take 只保留前面的若干项数据
takeLast 只保留后面的若干项数据

Marble diagrams(弹子图)

你可以想象一个机器,不停的发射弹子出来,发射出来的弹子可以被其他模块再次加工(比如 上色、把不合格的弹子给回收了),加工完成后再次发射出来 … 弹子图就是对这个机器的抽象描述。在 Rx 中流行使用这种方式来描述操作符,毕竟图片看起来直观多了。 Marble diagrams(弹子图)基本元素如下:

时间从左往右流动,每个图形代表一个数据,竖线代表发射完成了,而 X 代表出现错误了。 操作函数把上面的 Observable 转换下面的新的 Observable , 里面的每个数据都被操作函数给处理了并返回一个新的数据。

filter

filter 函数使用一个 predicate 函数接口来判断每个发射的值是否能通过这个判断。如果返回 true,则该数据继续往下一个(过滤后的) Observable 发射。

比如下面示例创建了一个发射 0 到 9 十个数字的 源Observable。在该 Observable 使用一个 filter 操作来过滤掉奇数,最后只保留偶数。

        Observable<Integer> values = Observable.range(0,10);
        Subscription oddNumbers = values
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer%2==0;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        log("Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log(e.getMessage().toString());
                    }
                    @Override
                    public void onNext(Integer integer) {
                        log(integer+"");
                    }
                });

结果:

0
2
4
6
8
Complete!

elementAt

elementAt()方法仅从一个序列中发射第n个元素然后就完成了。

    Observable<Integer> values = Observable.range(0, 10);
    Subscription subscription = values
    .elementAt(3)
    .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        log("Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log(e.getMessage().toString());
                    }
                    @Override
                    public void onNext(Integer integer) {
                        log(integer+"");
                    }
                });

结果:

2

sample

timeout

    Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
    Subscription subscription = getCurrentTemperature()
        .timeout(300,TimeUnit.MILLISECONDS)
        .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        log("Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log(e.getMessage().toString());
                    }
                    @Override
                    public void onNext(Integer integer) {
                        log(integer+"");
                    }
                });

结果:

0
1
error

debounce

  • debounce()函数过滤掉由Observable发射的速率过快的数据;如果在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个。
  • 就像sample()和timeout()函数一样,debounce()使用TimeUnit对象指定时间间隔。

ignoreElements

ignoreElements 会忽略所有发射的数据,只让 onCompleted 和 onError 可以通过。

    Observable<Integer> values = Observable.range(0, 10);
    Subscription subscription = values
    .ignoreElements()
    .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        log("Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log(e.getMessage().toString());
                    }
                    @Override
                    public void onNext(Integer integer) {
                        log(integer+"");
                    }
                });

结果:

Complete!
ignoreElements() 和使用 filter(v -> false) 是一样的效果。

take和skip

  • 下面两个操作函数依据发射数据的索引来在特定的位置切断数据流,可以从头开始切断也可以从末尾开始切断。
  • take 从头开始获取前 N 个数据,而 skip 则是从头开始 跳过 N 个数据。注意,如果发射的数据比 N 小,则这两个函数都会发射一个 error。

take和skip

take

当我们不需要整个序列时,而是只想取开头或结尾的几个元素,我们可以用take()或takeLast()。

Observable<T>   take(int num)

    Observable<Integer> values = Observable.range(0, 5);
    Subscription first2 = values
        .take(2)
        .subscribe(
                new Action1<String>() {
                    @Override
                    public void call(String s) {
                        log(s);
                    }
               }
        );    

结果:

0
1
只要第 N 个数据可用, take 操作就结束了,立即执行onCompleted()。 如果在 N 个数据发射之前发生了 error, error 信息会继续传递到下一个 Observable。 如果 第 N 个数据发射后, take 就不再关心源 Observable 的状态了。
---------------这里的额状态-------------
        Observable<Integer> values = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onError(new Exception("Oops"));
                ----如果这里有个subscriber.onNext(2);
            }
        });
        Subscription subscription = values
            .take(1)
            .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        log("Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log(e.getMessage().toString());
                    }
                    @Override
                    public void onNext(Integer integer) {
                        log(integer+"");
                    }
                });  

结果:

1
Complete!

skip

skip 返回 take 操作忽略的另外一部分数据。也就是跳过前面 N 个数据。
Observable<T>   skip(int num)

    Observable<Integer> values = Observable.range(0, 5);
    Subscription subscription = values
        .skip(2)
        .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        log("Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log(e.getMessage().toString());
                    }
                    @Override
                    public void onNext(Integer integer) {
                        log(integer+"");
                    }
                });

结果:

2
3
4
Complete!

take和skip的重载

除了根据发射数据的索引来过滤数据以外,还可以使用数据流发射的时间来过滤。比如过滤掉前五秒发射的数据。
Observable<T>   take(long time, java.util.concurrent.TimeUnit unit)
Observable<T>   skip(long time, java.util.concurrent.TimeUnit unit)
    Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
    Subscription subscription = values
        .take(250, TimeUnit.MILLISECONDS)
        .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        log("Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log(e.getMessage().toString());
                    }
                    @Override
                    public void onNext(Integer integer) {
                        log(integer+"");
                    }
                });

结果:

0
1
Complete!
上面示例中只获取前 250 毫秒发射的数据。 第 300 毫秒才开始发射数据 2, 所以这里只获取 0 和1 两个数据。

takeLast和skipLast

skip 和 take 是从头开始索引数据,而 skipLast 和 takeLast 和他们相反,是从末尾开始索引数据。
    Observable<Integer> values = Observable.range(0, 5);
    Subscription subscription = values
        .skipLast(2)
        .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        log("Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log(e.getMessage().toString());
                    }
                    @Override
                    public void onNext(Integer integer) {
                        log(integer+"");
                    }
                });

结果:

0
1
2
Completed
同样这两个函数也有依时间为条件的重载函数。

takeWhile和skipWhile

这两个函数是使用一个 predicate 参数来当做判断条件。 如果判断条件返回为 ture, 则 takeWhile 保留该数据。
Observable<T>   takeWhile(Func1<? super T,java.lang.Boolean> predicate)
        Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
        Subscription subscription = values
                .takeWhile(new Func1<Long, Boolean>() {
                    @Override
                    public Boolean call(Long aLong) {
                        return aLong<2;
                    }
                })
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onCompleted() {
                        log("Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log(e.getMessage().toString());
                    }
                    @Override
                    public void onNext(Long aLong) {
                        log(aLong+"");
                    }
                });

结果:

0
1
Complete!
不出意料, skipWhile 跳过过滤条件为 true 的数据。
        Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
        Subscription subscription = values
                .skipWhile(new Func1<Long, Boolean>() {
                    @Override
                    public Boolean call(Long aLong) {
                        return aLong<2;
                    }
                })
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onCompleted() {
                        log("Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log(e.getMessage().toString());
                    }
                    @Override
                    public void onNext(Long aLong) {
                        log(aLong+"");
                    }
                });

结果:

2
3
4
...

takeUntil 和 skipUntil

  • takeUntil 和 skipUntil 这两个函数和 takeWhile 、skipWhile 刚好相反。 当判断条件为 false 的时候, takeUntil 保留该数据。
  • takeUntil 和 skipUntil 还有另外一种不一样的重载函数。切断的条件为 另外一个 Observable 发射数据的时刻。
// 获取源Observable的数据直到 other Observable 发射第一个数据时停止
public final <E> Observable<T> takeUntil(Observable<? extends E> other)

    Observable<Long> values = Observable.interval(100,TimeUnit.MILLISECONDS);
    Observable<Long> cutoff = Observable.timer(250, TimeUnit.MILLISECONDS);
    Subscription subscription = values
        .takeUntil(cutoff)
        .subscribe(new Observer<Long>() {
                    @Override
                    public void onCompleted() {
                        log("Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log(e.getMessage().toString());
                    }
                    @Override
                    public void onNext(Long aLong) {
                        log(aLong+"");
                    }
                });

结果:

0
1
  • 你应该还记得,这个 timer 函数会等待 250 毫秒然后发射一个数据。
  • 当 takeUntil 收到 这个数据的时候就停止继续接受 values 发射的数据。
  • cutoff 这个充当信号的 Observable 可以是任意数据类型的,这里不关心数据只关心何时发射了数据。
  • skipUntil 也是一样,当收到另外一个 Observable 发射数据的时候,就开始接收 源 Observable 的数据。
    Observable<Long> values = Observable.interval(100,TimeUnit.MILLISECONDS);
    Observable<Long> cutoff = Observable.timer(250, TimeUnit.MILLISECONDS);
    Subscription subscription = values
        .skipUntil(cutoff)
        .subscribe(new Observer<Long>() {
                    @Override
                    public void onCompleted() {
                        log("Complete!");
                    }
                    @Override
                    public void onError(Throwable e) {
                        log(e.getMessage().toString());
                    }
                    @Override
                    public void onNext(Long aLong) {
                        log(aLong+"");
                    }
                });

结果:

2
3
4
...

first and last

first()方法和last()方法很容易弄明白。它们从Observable中只发射第一个元素或者最后一个元素。这两个都可以传Func1作为参数,:一个可以确定我们感兴趣的第一个或者最后一个的方法:

——–注意list和from的使用

distinct 和 distinctUntilChanged

distinct 函数用来过滤掉已经出现过的数据了。

    public final Observable<T> distinct()
    public final <U> Observable<T> distinct(Func1<? super T,? extends U> keySelector)

distinct 还有个变体是 distinctUntilChanged。区别是 distinctUntilChanged 只过滤相邻的 key 一样的数据。

    public final Observable<T> distinctUntilChanged()
    public final <U> Observable<T> distinctUntilChanged(Func1<? super T,? extends U> keySelector)

distinct()

        Observable<Integer> values = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(2);
                subscriber.onCompleted();
            }
        });
        Subscription subscription = values
                .distinct()
                .subscribe(
                        new Action1<Integer>() {
                            @Override
                            public void call(Integer integer) {
                                log(integer + "");
                            }
                        }
                );

结果:

1
2
3

distinct(keySelector)

distinct 还有一个重载函数,该函数有个生成 key 的参数。每个发射的数据都使用该参数生成一个 key,然后使用该key 来判断数据是否一样。

        Observable<String> values = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("First");
                subscriber.onNext("Second");
                subscriber.onNext("Third");
                subscriber.onNext("Fourth");
                subscriber.onNext("Fifth");
                subscriber.onCompleted();
            }
        });
        Subscription subscription = values
                .distinct(new Func1<String, Object>() {
                    @Override
                    public Object call(String s) {
                        return s.charAt(0);
                    }
                })
                .subscribe(
                        new Action1<String>() {
                            @Override
                            public void call(String s) {
                                log(s);
                            }
                        }
                );

结果:

First
Second
Third
“Fourth” 和 “Fifth” 字符串被过滤掉了,应为他们的 key (首字母)和 First 一样。已经发射过的数据将被过滤掉。
有经验的码农知道,该函数在内部维护一个 key 集合来保存所有已经发射数据的 key,当有新的数据发射的时候,在集合中查找该 数据的key 是否存在。 在使用 Rx 操作函数的时把内部细节给封装起来了,但是我们应该注意该问题来避免性能问题。(如果有大量的数据,维护一个内部的集合来保存 key 可能会占用很多内存。)

distinctUntilChanged

        Observable<Integer> values = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(2);
                subscriber.onCompleted();
            }
        });
        Subscription subscription = values
                .distinctUntilChanged()
                .subscribe(
                        new Action1<Integer>() {
                            @Override
                            public void call(Integer integer) {
                                log(integer + "");
                            }
                        }
                );

结果:

1
2
3
2

distinctUntilChanged(keySelector)

        Observable<String> values = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("First");
                subscriber.onNext("Second");
                subscriber.onNext("Third");
                subscriber.onNext("Fourth");
                subscriber.onNext("Fifth");
                subscriber.onCompleted();
            }
        });
        Subscription subscription = values
                .distinctUntilChanged(new Func1<String, Object>() {
                    @Override
                    public Object call(String s) {
                        return s.charAt(0);
                    }
                })
                .subscribe(
                        new Action1<String>() {
                            @Override
                            public void call(String s) {
                                log(s);
                            }
                        }
                );

结果:

First
Second
Third
Fourth
时间: 2024-10-04 07:22:57

RxJava----操作符:过滤Observable的相关文章

RxJava操作符总结之过滤

RxJava操作符总结之过滤 jsut() just(T t1, T t2, T t3 ....) ,just能够传入多个同样类型的參数,并将当前參数一个接着一个的发送. Observable.just("1","2","3") .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); 1

RxJava操作符——条件和布尔操作符(Conditional and Boolean Operators)

RxJava系列教程: 1. RxJava使用介绍 [视频教程] 2. RxJava操作符 ? Creating Observables(Observable的创建操作符) [视频教程] ? Transforming Observables(Observable的转换操作符) [视频教程] ? Filtering Observables(Observable的过滤操作符) [视频教程] ? Combining Observables(Observable的组合操作符) [视频教程] ? Erro

Android RxJava操作符一览

前言 把现在接触到的操作符全部整理进来,方便查阅,遇到新的也会添加进来.和RxJavaLearn 的README.md同步更新. 操作符决策树 直接创建一个Observable(创建操作) 组合多个Observable(组合操作) 对Observable发射的数据执行变换操作(变换操作) 从Observable发射的数据中取特定的值(过滤操作) 转发Observable的部分值(条件/布尔/过滤操作) 对Observable发射的数据序列求值(算术/聚合操作) 创建操作 用于创建Observab

RxJava操作符的简单使用

一.准备工作在app的build.gradle文件下的dependencies中添加依赖: compile 'io.reactivex:rxjava:1.3.0' compile 'io.reactivex:rxandroid:1.2.1' 二.RxJava的操作符用法: 1.create //create 创建Observable //Observable 被观察者 //Subscribers观察者 Observable<String> observable = Observable.cre

RxJava操作符(03-变换操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51649975 本文出自:[openXu的博客] 目录: Buffer FlatMap flatMapIterable concatMap switchMap GroupBy Map cast Scan Window 源码下载 ??变换操作符的作用是对Observable发射的数据按照一定规则做一些变换操作,然后将变换后的数据发射出去,接下来看看RxJava中主要有哪些变换操作符: 1. B

RxJava操作符(09-算术/聚合操作&amp;连接操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51692493 本文出自:[openXu的博客] 目录: 算术聚合 Count Concat Reduce 连接操作 Publish Connect RefCount Replay 源码下载 算术&聚合 1. Count ??Count操作符将一个Observable转换成一个发射单个值的Observable,这个值表示原始Observable发射的数据的数量. ?? 如果原始Observa

RxJava操作符 -创建型

操作符类型 创建操作 变换操作 过滤操作 组合操作 错误处理 辅助操作 条件和布尔操作 算术和聚合操作 连接操作 转换操作 创建操作 create 你可以使用create操作符从头开始创建一个Observable,给这个操作符传递一个接受观察者作为参数的函数,编写这个函数让它的行为表现为一个Observable–恰当的调用观察者的onNext,onError和onCompleted方法. 一个形式正确的有限Observable必须尝试调用观察者的onCompleted正好一次或者它的onErro

RxJava操作符(04-过滤操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51656494 本文出自:[openXu的博客] 目录: Debounce Distinct ElementAt Filter First Last IgnoreElements SampleThrottleFirst SkipSkipLast TakeTakeLast 源码下载 "过滤操作",顾名思义,就是过滤掉Observable发射的一些数据,不让他发射出去,也就是忽略丢弃掉

RxJava操作符(二) __变换操作

RxJava变换操作符 这周真心比较累,一直都在加班,今天才有点自己的时间来学习新的内容,外包工作苦啊! 上周学习了各种创建操作符,像create,from,Just,Defer-.等等,这周中也工作中也用了不少,有时间也需要总结一下自己在工作中使用的操作符.好了,现在来开始学习一个变换操作符吧,不知道什么意思没关系,一个一个去试错吧. map 官方的翻译是对于Observable发射的每一项数据,都会应用一个函数,执行变换操作,然后返回一个发射这些结果的Observable. 还是举个例子吧,

RxJava操作符(05-结合操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51656736 本文出自:[openXu的博客] 目录: CombineLatest Join Merge StartWith Switch Zip 源码下载 结合操作就是将多个Observable发射的数据按照一定规则组合后发射出去,接下来看看RxJava中的结合操作符: 1. CombineLatest ??当两个Observables中的任何一个发射数据时,使用一个函数结合每个Obse