RxJava操作符总结之过滤

RxJava操作符总结之过滤

jsut()

just(T t1, T t2, T t3 ....) ,just能够传入多个同样类型的參数,并将当前參数一个接着一个的发送。

 Observable.just("1","2","3")
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        System.out.println(s);
                    }
                });
1
2
3

repeat()

repeat() 将当前的消息序列无限制循环发送。我们能够传入一个參数表示循环的次数

 Observable.just("1","2","3")
                .repeat(3)
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        System.out.println(s);
                    }
                });
123123123

defer()

延迟创建Observable

再订阅时创建Observable对象。该方法利用call方法的特性。

public static void main(String[] args) {

        Observable.defer(new Func0<Observable<Integer>>() {
            @Override
            public Observable<Integer> call() {
                return getInt();
            }
        });

    }

    public static Observable<Integer> getInt() {

        System.out.println("getInt()");
        return Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<?

super Integer> subscriber) {

                System.out.print("ss");
                subscriber.onNext(42);
            }
        });
    }

此时getInt()方法不会被调用,会在subscribe()时调用。这个假设看过源代码非常easy理解。或者看我之前的博客RxJava 源代码走读之Observable.create()和subscribe()

range()

从指定数字開始发射数字。

  Observable.range(3,2)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.print(integer);
                    }
                });
34

range(int start,int count) 第一个參数为从哪个数開始,第二个參数为发送多少个。

filter()

过滤作用。依据回调的条件对序列进行筛选。

查询0~49能被3整除的数。


    private static ArrayList<Integer> array = new ArrayList<>();

    public static void main(String[] args) {
        init();
        Observable.from(array)
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {

                        //推断条件,假设返回false则该发送内容将取消,true将继续发送
                        return integer%3==0;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.print(integer+" ");
                    }
                });

    }

    public static void init(){
        for (int i=0;i<50;i++){
            array.add(i);
        }
    }
0 3 6 9 12 15 18 21 24 27 30 33 36 39 42 45 48 

take() takeLast()

  • take()获取发射序列的前几个。后面的取消发送。
  • takeLast()获取发射序列的后几个。其余的取消继续向下发送

获取0~49的前三个数和最后三个数

        Observable.from(array)
                .take(3)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.print(integer+" ");
                    }
                });

        System.out.println();
        Observable.from(array)
                .takeLast(3)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.print(integer+" ");
                    }
                });
0 1 2
47 48 49 

distinct()

将发送序列中反复的值除去。即发送序列后面的值假设和前面有重叠,则后面的值不会被发送。 该方法去重时须要记录发送序列每一次发送的值。所以当有大数据时要注意发送的值。

 Observable.from(array)
                .take(3)
                .repeat(3)
                .distinct()
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.print(integer+" ");
                    }
                });
0 1 2 

distinctUntilChanged()

该方法和distinct()的差别为,当前发射值与上一次发射值同样时则取消当前发射,假设不同样,则继续发射。

即所谓的有改变时发射。

first()和last()

故名思意。就是获取发射序列的第一个和最后一个。

同一时候。该方法能够依据条件进行选择符合条件的第一个和最后一个。

获取0~49中3的倍数的最后一个值

Observable.from(array)
                .last(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer%3==0&&integer!=0;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.print(integer+" ");
                    }
                });
48 

skip()和skipLast()

跳过发射序列的前几个和最后几个 。

该方法和take(),takeLast()相似。

跳过0~49发射序列中的前三个和后三个

  Observable.from(array)
                .skip(3)
                .skipLast(3)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.print(integer+" ");
                    }
                });

3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46

elementAt()和elementAtOrDefault()

获取发射序列指定位置的发射值。

当中当我们指定位置大于发射序列时,会抛出异常。所以推荐使用带有默认值的elementAtDefault()

对0~49的发射序列,获取前三个元素的发射后获取第五个位置的元素值。假设没有,则设置默认值为3.

Observable.from(array)
                .take(3)
                .elementAtOrDefault(5,3)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.print(integer+" ");
                    }
                });
3 

interval()

轮询。该操作符每隔指定时间发送一次事件。

该方法默认在conmputation线程执行

        Observable
                .interval(3, TimeUnit.SECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Long>() {
                               @Override
                               public void call(Long s) {
                                   Log.i("info",s);
                               }
                           }
                );

第一个參数:延时时间 第二个參数:单位

该操作符会从0開始。每隔1秒发送一次

略微复杂点的,对于列表。我们要遍历打印此列表,则代码例如以下

Observable.interval(3,TimeUnit.SECONDS)
                .flatMap(new Func1<Long, Observable<String>>() {
                    @Override
                    public Observable<String> call(Long aLong) {
                        return Observable.just(array.get(aLong.intValue()));
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        Log.i("info","onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.i("info","error");
                    }

                    @Override
                    public void onNext(String s) {
                        Log.i("info","onNext--"+s);
                    }
                });
04-13 15:44:28.634 15455-15455/mahao.alex.rxjava I/info: onNext--aa
04-13 15:44:31.634 15455-15455/mahao.alex.rxjava I/info: onNext--bb
04-13 15:44:34.634 15455-15455/mahao.alex.rxjava I/info: onNext--cc
04-13 15:44:37.634 15455-15455/mahao.alex.rxjava I/info: onNext--dd
04-13 15:44:40.644 15455-15455/mahao.alex.rxjava I/info: error

打印例如以下,并且是每隔三秒。

打印一次。。

timer()

延迟固定时间后发送元素。

interval()差别为该操作符仅仅发送一次。

Observable.timer(3,TimeUnit.SECONDS)
                .flatMap(new Func1<Long, Observable<String>>() {
                    @Override
                    public Observable<String> call(Long aLong) {
                        return Observable.just(array.get(aLong.intValue()));
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        Log.i("info","onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.i("info","error");
                    }

                    @Override
                    public void onNext(String s) {
                        Log.i("info","onNext--"+s);
                    }
                });
04-13 15:52:32.114 23036-23036/mahao.alex.rxjava I/info: onNext--aa
04-13 15:52:32.114 23036-23036/mahao.alex.rxjava I/info: onCompleted

注意:该操作符执行在conputation线程中。

sample()

将发射序列每隔固定间隔获取其近期值并向下发送。

这个分为两种情况。

  • 发送序列的时间间隔大于sample的时间间隔
  • 发送序列的时间间隔小于sample的时间间隔

对于另外一种情况,就是每隔固定间隔发射就可以。而第一种情况存在的一种特殊情况

以下我们看一下样例

有一个数组{“aa”,”bb”,”cc”,”dd”}每隔三秒发射,而sample每隔两秒筛选。

Observable.interval(3,TimeUnit.SECONDS)
                .flatMap(new Func1<Long, Observable<String>>() {
                    @Override
                    public Observable<String> call(Long aLong) {
                        return Observable.just(array.get(aLong.intValue()));
                    }
                })
                .sample(2,TimeUnit.SECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        Log.i("info","onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.i("info","error");
                    }

                    @Override
                    public void onNext(String s) {
                        Log.i("info","onNext--"+s);
                    }
                });
04-13 16:13:45.404 11935-11935/mahao.alex.rxjava I/info: onNext--aa
04-13 16:13:49.404 11935-11935/mahao.alex.rxjava I/info: onNext--bb
04-13 16:13:51.404 11935-11935/mahao.alex.rxjava I/info: onNext--cc
04-13 16:13:55.404 11935-11935/mahao.alex.rxjava I/info: onNext--dd
04-13 16:13:56.414 11935-11935/mahao.alex.rxjava I/info: error

看一下他们的事件间隔。四次发射的时间间隔为 4,2,4。最后error暂且不提。

为什么是这个时间间隔呢?

图尽管丑,但还是有一定道理的

再上一张好看的图

timeOut

指定最小的发射时间间隔,假设指定的当前时间间隔内没有发送元素。则抛出异常,停止。

debounce

当发送的数据的时间间隔小于debounce指定的时间间隔,则当前发送的数据将被过滤,假设在指定的时间间隔内仍没有数据发送,则会发送最后一个。

时间: 2024-10-27 06:59:22

RxJava操作符总结之过滤的相关文章

RxJava操作符——条件和布尔操作符(Conditional and Boolean Operators)

RxJava系列教程: 1. RxJava使用介绍 [视频教程] 2. RxJava操作符 ? Creating Observables(Observable的创建操作符) [视频教程] ? Transforming Observables(Observable的转换操作符) [视频教程] ? Filtering Observables(Observable的过滤操作符) [视频教程] ? Combining Observables(Observable的组合操作符) [视频教程] ? Erro

RxJava操作符 -创建型

操作符类型 创建操作 变换操作 过滤操作 组合操作 错误处理 辅助操作 条件和布尔操作 算术和聚合操作 连接操作 转换操作 创建操作 create 你可以使用create操作符从头开始创建一个Observable,给这个操作符传递一个接受观察者作为参数的函数,编写这个函数让它的行为表现为一个Observable–恰当的调用观察者的onNext,onError和onCompleted方法. 一个形式正确的有限Observable必须尝试调用观察者的onCompleted正好一次或者它的onErro

Android RxJava操作符一览

前言 把现在接触到的操作符全部整理进来,方便查阅,遇到新的也会添加进来.和RxJavaLearn 的README.md同步更新. 操作符决策树 直接创建一个Observable(创建操作) 组合多个Observable(组合操作) 对Observable发射的数据执行变换操作(变换操作) 从Observable发射的数据中取特定的值(过滤操作) 转发Observable的部分值(条件/布尔/过滤操作) 对Observable发射的数据序列求值(算术/聚合操作) 创建操作 用于创建Observab

RxJava操作符(04-过滤操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51656494 本文出自:[openXu的博客] 目录: Debounce Distinct ElementAt Filter First Last IgnoreElements SampleThrottleFirst SkipSkipLast TakeTakeLast 源码下载 "过滤操作",顾名思义,就是过滤掉Observable发射的一些数据,不让他发射出去,也就是忽略丢弃掉

RxJava操作符的简单使用

一.准备工作在app的build.gradle文件下的dependencies中添加依赖: compile 'io.reactivex:rxjava:1.3.0' compile 'io.reactivex:rxandroid:1.2.1' 二.RxJava的操作符用法: 1.create //create 创建Observable //Observable 被观察者 //Subscribers观察者 Observable<String> observable = Observable.cre

RxJava操作符(03-变换操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51649975 本文出自:[openXu的博客] 目录: Buffer FlatMap flatMapIterable concatMap switchMap GroupBy Map cast Scan Window 源码下载 ??变换操作符的作用是对Observable发射的数据按照一定规则做一些变换操作,然后将变换后的数据发射出去,接下来看看RxJava中主要有哪些变换操作符: 1. B

RxJava操作符(二) __变换操作

RxJava变换操作符 这周真心比较累,一直都在加班,今天才有点自己的时间来学习新的内容,外包工作苦啊! 上周学习了各种创建操作符,像create,from,Just,Defer-.等等,这周中也工作中也用了不少,有时间也需要总结一下自己在工作中使用的操作符.好了,现在来开始学习一个变换操作符吧,不知道什么意思没关系,一个一个去试错吧. map 官方的翻译是对于Observable发射的每一项数据,都会应用一个函数,执行变换操作,然后返回一个发射这些结果的Observable. 还是举个例子吧,

RxJava操作符(05-结合操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51656736 本文出自:[openXu的博客] 目录: CombineLatest Join Merge StartWith Switch Zip 源码下载 结合操作就是将多个Observable发射的数据按照一定规则组合后发射出去,接下来看看RxJava中的结合操作符: 1. CombineLatest ??当两个Observables中的任何一个发射数据时,使用一个函数结合每个Obse

RxJava操作符repeatWhen()和retryWhen()

第一次见到.repeatWhen()和.retryWhen()这两个操作符的时候就非常困惑了.不得不说,它们绝对是"最令人困惑弹珠图"的有力角逐者. 然而它们都是非常有用的操作符:允许你有条件的重新订阅已经结束的Observable.我最近研究了它们的工作原理,现在我希望尝试着去解释它们(因为,我也是耗费了一些精力才参透它们). Repeat与Retry的对比 首先,来了解一下.repeat()和.retry()之间最直观的区别是什么?这个问题并不难:区别就在于什么样的终止事件会触发重