Android RxJava使用介绍(二) RxJava的操作符

上一篇文章我们通过一个简单的例子来给大家展示了RxJava的基本用法,相信大家已经对RxJava有了大概的了解,由于上篇文章对RxJava的使用介绍都是点到为止,并没有进行深入展开,也许你对RxJava有种名不副实的感觉。OK,下面我们就进入正题,一步步的揭开RxJava的神秘面纱!

一个例子

RxJava的强大之处,在于它提供了非常丰富且功能强悍的操作符,通过使用和组合这些操作符,你几乎能完成所有你想要完成的任务,举个例子如下:

  1. 现在有一个需求:app启动时显示一张图片(一般是app的logo),也就是我们所说的欢迎页,2-3秒后自动跳转到主页面。这不就是几乎每个app都有的启动页需求吗?几乎不用思考,代码如下:
@Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_welcome);

        ImageView view = (ImageView) findViewById(R.id.iv_welcome);
        view.setImageResource(R.drawable.welcome);
        Handler handler = new Handler();
        handler.postDelayed(new Runnable() {
            @Override
            public void run() {
                startActivity(new Intent(WelcomeActivity.this, MainActivity.class));
                finish();
            }
        },2000);
    }

使用RxJava的代码实现如下:

protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_welcome);

        ImageView view = (ImageView) findViewById(R.id.iv_welcome);
        view.setImageResource(R.drawable.welcome);
        Observable.timer(2, TimeUnit.SECONDS, AndroidSchedulers.mainThread()).map(l->{
            startActivity(new Intent(this, MainActivity.class));
            finish();
            return null;
        }).subscribe();
    }

这里的RxJava使用了两个操作符:一个是timer操作符,它的意思是延迟执行某个操作;一个是map操作符,它的意思是转换某个执行结果。

恩,好像除了写法不一样,也没看出RxJava有什么优势呢。好吧,继续往下看!



由于最近产品要做活动,为了在显著的位置把活动的内容显示给用户,经过讨论,就是在欢迎页停留2秒后,要跳转到活动内容的页面(也就是一张广告的图片)上,然后在活动内容的页面停留2-3秒,再跳转到主页面;

protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_welcome);

        ImageView view = (ImageView) findViewById(R.id.iv_welcome);
        view.setImageResource(R.drawable.welcome);
        //开新线程从网络获取图片
        AsyncAdImageFromNet();

        Handler handler = new Handler();
        handler.postDelayed(new Runnable() {
            @Override
            public void run() {
                //mAdBitmapDrawable是从网络获取到的图片,如果获取到图片,则显示出来,否则直接跳转到主页面
                if(mAdBitmapDrawable != null){
                    view.setImageDrawable(mAdBitmapDrawable);
                    handler.postDelayed(new Runnable() {
                        @Override
                        public void run() {
                            startActivity(new Intent(WelcomeActivity.this, MainActivity.class));
                            finish();
                        }
                    }, 2000);
                } else {
                    startActivity(new Intent(WelcomeActivity.this, MainActivity.class));
                    finish();
                }
            }
        }, 2000);

    }

这段代码没有考虑离线情况,即没有网络的情况下是显示不出活动内容图片的,最好是在有网络的情况下就把图片下载到本地缓存起来,不管有没有网络都装载本地的缓存即可,改进后的代码如下:

protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_welcome);

        ImageView view = (ImageView) findViewById(R.id.iv_welcome);
        view.setImageResource(R.drawable.welcome);
        //本地图片缓存路径
        File localBitmapFile = new File(getLocalBitmapPath());
        if(localBitmapFile.exists())
            mAdBitmapDrawable = BitmapFactory.decodeFile(localBitmapFile);
        //开新线程从网络获取图片
        AsyncAdImageFromNet();

        Handler handler = new Handler();
        handler.postDelayed(new Runnable() {
            @Override
            public void run() {
                //mAdBitmapDrawable是本地缓存获取到的图片,如果获取到图片,则显示出来,否则直接跳转到主页面
                if(mAdBitmapDrawable != null){
                    view.setImageDrawable(mAdBitmapDrawable);
                    handler.postDelayed(new Runnable() {
                        @Override
                        public void run() {
                            startActivity(new Intent(WelcomeActivity.this, MainActivity.class));
                            finish();
                        }
                    }, 2000);
                } else {
                    startActivity(new Intent(WelcomeActivity.this, MainActivity.class));
                    finish();
                }
            }
        }, 2000);

    }

仔细一看,代码还是有问题,加载本地缓存图片,如果图片比较大,就会阻塞主线程,最好是把加载本地缓存图片放在一个线程中执行,代码真是越写越乱,我们看看使用RxJava是怎么做的呢?

protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_welcome);

        ImageView view = (ImageView) findViewById(R.id.iv_welcome);
        view.setImageResource(R.mipmap.welcome);

        Observable.mergeDelayError(
                //在新线程中加载本地缓存图片
                loadBitmapFromLocal().subscribeOn(Schedulers.io()),
                //在新线程中加载网络图片
                loadBitmapFromNet().subscribeOn(Schedulers.newThread()),
                Observable.timer(3,TimeUnit.SECONDS).map(c->null))
                //每隔2秒获取加载数据
                .sample(2, TimeUnit.SECONDS, AndroidSchedulers.mainThread())
                .flatMap(r->{
                    if(r==null)  //如果没有获取到图片,直接跳转到主页面
                        return Observable.empty();
                    else { //如果获取到图片,则停留2秒再跳转到主页面
                        view.setImageDrawable(r);
                        return Observable.timer(2, TimeUnit.SECONDS);
                    }
                }).subscribe(
                r->{
                },
                e->{
                    startActivity(new Intent(WelcomeActivity.this, MainActivity.class));
                    finish();
                },
                ()->{
                    startActivity(new Intent(WelcomeActivity.this, MainActivity.class));
                    finish();
                }
        );
    }

这里使用了几个操作符:首先是mergeDelayError,它的意思是合并几个不同的Observable;sample的意思是每隔一段时间就进行采样,在时间间隔范围内获取最后一个发布的Observable; flatMap的意思是把某一个Observable转换成另一个Observable。

可以看到,使用了RxJava,整个代码思路很清晰,不用考虑底层的线程同步、异步通知等内容,把主要精力都集中在如何实现业务上,这就是响应式函数编程的魅力!

操作符分类

通过上面的例子,大家应该看到了RxJava操作符的威力,下面我按类别把常用操作符分别介绍,其实很多内容都是来自于ReactiveX的官方网站,英文比较好的朋友可以参考(http://reactivex.io/)。

按照官方的分类,操作符大致分为以下几种:

  • Creating Observables(Observable的创建操作符),比如:Observable.create()、Observable.just()、Observable.from()等等;
  • Transforming Observables(Observable的转换操作符),比如:observable.map()、observable.flatMap()、observable.buffer()等等;
  • Filtering Observables(Observable的过滤操作符),比如:observable.filter()、observable.sample()、observable.take()等等;
  • Combining Observables(Observable的组合操作符),比如:observable.join()、observable.merge()、observable.combineLatest()等等;
  • Error Handling Operators(Observable的错误处理操作符),比如:observable.onErrorResumeNext()、observable.retry()等等;
  • Observable Utility Operators(Observable的功能性操作符),比如:observable.subscribeOn()、observable.observeOn()、observable.delay()等等;
  • Conditional and Boolean Operators(Observable的条件操作符),比如:observable.amb()、observable.contains()、observable.skipUntil()等等;
  • Mathematical and Aggregate Operators(Observable数学运算及聚合操作符),比如:observable.count()、observable.reduce()、observable.concat()等等;
  • 其他如observable.toList()、observable.connect()、observable.publish()等等;

创建型操作符

create操作符

create操作符是所有创建型操作符的“根”,也就是说其他创建型操作符最后都是通过create操作符来创建Observable的,其流程图例如下:

调用例子如下:

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> observer) {
        try {
            if (!observer.isUnsubscribed()) {
                for (int i = 1; i < 5; i++) {
                    observer.onNext(i);
                }
                observer.onCompleted();
            }
        } catch (Exception e) {
            observer.onError(e);
        }
    }
 } ).subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

运行结果如下:

Next: 1

Next: 2

Next: 3

Next: 4

Sequence complete.

在使用create操作符时,最好要在回调的call函数中增加isUnsubscribed的判断,以便在subscriber在取消订阅时不会再执行call函数中相关代码逻辑,从而避免导致一些意想不到的错误出现;

from操作符

from操作符是把其他类型的对象和数据类型转化成Observable,其流程图例如下:

调用例子如下:

Integer[] items = { 0, 1, 2, 3, 4, 5 };
Observable myObservable = Observable.from(items);

myObservable.subscribe(
    new Action1<Integer>() {
        @Override
        public void call(Integer item) {
            System.out.println(item);
        }
    },
    new Action1<Throwable>() {
        @Override
        public void call(Throwable error) {
            System.out.println("Error encountered: " + error.getMessage());
        }
    },
    new Action0() {
        @Override
        public void call() {
            System.out.println("Sequence complete");
        }
    }
);

运行结果如下:

0

1

2

3

4

5

Sequence complete

just操作符

just操作符也是把其他类型的对象和数据类型转化成Observable,它和from操作符很像,只是方法的参数有所差别,其流程图例如下:

调用例子如下:

Observable.just(1, 2, 3)
          .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

运行结果如下:

Next: 1

Next: 2

Next: 3

Sequence complete.

defer操作符

defer操作符是直到有订阅者订阅时,才通过Observable的工厂方法创建Observable并执行,defer操作符能够保证Observable的状态是最新的,其流程实例如下:

下面通过比较defer操作符和just操作符的运行结果作比较:

i=10;
        Observable justObservable = Observable.just(i);
        i=12;
        Observable deferObservable = Observable.defer(new Func0<Observable<Object>>() {
            @Override
            public Observable<Object> call() {
                return Observable.just(i);
            }
        });
        i=15;

        justObservable.subscribe(new Subscriber() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Object o) {
                System.out.println("just result:" + o.toString());
            }
        });

        deferObservable.subscribe(new Subscriber() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Object o) {
                System.out.println("defer result:" + o.toString());
            }
        });
   }

其中i是类的成员变量,运行结果如下:

just result:10

defer result:15

可以看到,just操作符是在创建Observable就进行了赋值操作,而defer是在订阅者订阅时才创建Observable,此时才进行真正的赋值操作

timer操作符

timer操作符是创建一串连续的数字,产生这些数字的时间间隔是一定的;这里有两种情况:

  • 一种是隔一段时间产生一个数字,然后就结束,可以理解为延迟产生数字,其流程实例如下:

  • 一种是每隔一段时间就产生一个数字,没有结束符,也就是是可以产生无限个连续的数字,其流程实例如下:

    timer操作符默认情况下是运行在一个新线程上的,当然你可以通过传入参数来修改其运行的线程。

    下面是调用例子:

//每隔两秒产生一个数字
        Observable.timer(2, 2, TimeUnit.SECONDS).subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("error:" + e.getMessage());
            }

            @Override
            public void onNext(Long aLong) {
                System.out.println("Next:" + aLong.toString());
            }
        });

运行结果如下:

Next:0

Next:1

Next:2

Next:3

……

interval操作符

interval操作符是每隔一段时间就产生一个数字,这些数字从0开始,一次递增1直至无穷大;interval操作符的实现效果跟上面的timer操作符的第二种情形一样。以下是流程实例:

interval操作符默认情况下是运行在一个新线程上的,当然你可以通过传入参数来修改其运行的线程。

调用例子就不列出了,基本跟上面timer的调用例子一样。

range操作符

range操作符是创建一组在从n开始,个数为m的连续数字,比如range(3,10),就是创建3、4、5…12的一组数字,其流程实例如下:

调用例子如下:

//产生从3开始,个数为10个的连续数字
        Observable.range(3,10).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("error:" + e.getMessage());
            }

            @Override
            public void onNext(Integer i) {
                System.out.println("Next:" + i.toString());
            }
        });

运行结果如下:

Next:3

Next:4

Next:5

Next:6

….

Next:12

Sequence complete.

repeat/repeatWhen操作符

repeat操作符是对某一个Observable,重复产生多次结果,其流程实例如下:

repeatWhen操作符是对某一个Observable,有条件地重新订阅从而产生多次结果,其流程实例如下:

repeat和repeatWhen操作符默认情况下是运行在一个新线程上的,当然你可以通过传入参数来修改其运行的线程。

repeat调用例子如下:

//连续产生两组(3,4,5)的数字
        Observable.range(3,3).repeat(2).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("error:" + e.getMessage());
            }

            @Override
            public void onNext(Integer i) {
                System.out.println("Next:" + i.toString());
            }
        });

运行结果如下:

Next:3

Next:4

Next:5

Next:3

Next:4

Next:5

Sequence complete.

repeatWhen调用例子如下:

Observable.just(1,2,3).repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
            @Override
            public Observable<?> call(Observable<? extends Void> observable) {
                //重复3次
                return observable.zipWith(Observable.range(1, 3), new Func2<Void, Integer, Integer>() {
                    @Override
                    public Integer call(Void aVoid, Integer integer) {
                        return integer;
                    }
                }).flatMap(new Func1<Integer, Observable<?>>() {
                    @Override
                    public Observable<?> call(Integer integer) {
                        System.out.println("delay repeat the " + integer + " count");
                        //1秒钟重复一次
                        return Observable.timer(1, TimeUnit.SECONDS);
                    }
                });
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Integer value) {
                System.out.println("Next:" + value);
            }
        });

运行结果如下:

Next:1

Next:2

Next:3

repeat the 1 count

Next:1

Next:2

Next:3

repeat the 2 count

Next:1

Next:2

Next:3

repeat the 3 count

Next:1

Next:2

Next:3

Sequence complete.



Transforming Observables(Observable的转换操作符)

buffer操作符

buffer操作符周期性地收集源Observable产生的结果到列表中,并把这个列表提交给订阅者,订阅者处理后,清空buffer列表,同时接收下一次收集的结果并提交给订阅者,周而复始。

需要注意的是,一旦源Observable在产生结果的过程中出现异常,即使buffer已经存在收集到的结果,订阅者也会马上收到这个异常,并结束整个过程。

buffer的名字很怪,但是原理很简单,流程图如下:

调用例子如下:

//定义邮件内容
        final String[] mails = new String[]{"Here is an email!", "Another email!", "Yet another email!"};
        //每隔1秒就随机发布一封邮件
        Observable<String> endlessMail = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    if (subscriber.isUnsubscribed()) return;
                    Random random = new Random();
                    while (true) {
                        String mail = mails[random.nextInt(mails.length)];
                        subscriber.onNext(mail);
                        Thread.sleep(1000);
                    }
                } catch (Exception ex) {
                    subscriber.onError(ex);
                }
            }
        }).subscribeOn(Schedulers.io());
        //把上面产生的邮件内容缓存到列表中,并每隔3秒通知订阅者
        endlessMail.buffer(3, TimeUnit.SECONDS).subscribe(new Action1<List<String>>() {
            @Override
            public void call(List<String> list) {

                System.out.println(String.format("You‘ve got %d new messages!  Here they are!", list.size()));
                for (int i = 0; i < list.size(); i++)
                    System.out.println("**" + list.get(i).toString());
            }
        });

运行结果如下:

You’ve got 3 new messages! Here they are!(after 3s)

**Here is an email!

**Another email!

**Another email!

You’ve got 3 new messages! Here they are!(after 6s)

**Here is an email!

**Another email!

**Here is an email!

……

flatMap操作符

flatMap操作符是把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者。

flatMap操作符通过传入一个函数作为参数转换源Observable,在这个函数中,你可以自定义转换规则,最后在这个函数中返回一个新的Observable,然后flatMap操作符通过合并这些Observable结果成一个Observable,并依次提交结果给订阅者。

值得注意的是,flatMap操作符在合并Observable结果时,有可能存在交叉的情况,如下流程图所示:

调用例子如下:

private Observable<File> listFiles(File f){
        if(f.isDirectory()){
            return Observable.from(f.listFiles()).flatMap(new Func1<File, Observable<File>>() {
                @Override
                public Observable<File> call(File file) {
                    return listFiles(f);
                }
            });
        } else {
            return Observable.just(f);
        }
    }

    @Override
    public void onClick(View v) {
        Observable.just(getApplicationContext().getExternalCacheDir())
                .flatMap(new Func1<File, Observable<File>>() {
                    @Override
                    public Observable<File> call(File file) {
                        //参数file是just操作符产生的结果,这里判断file是不是目录文件,如果是目录文件,则递归查找其子文件flatMap操作符神奇的地方在于,返回的结果还是一个Observable,而这个Observable其实是包含多个文件的Observable的,输出应该是ExternalCacheDir下的所有文件
                        return listFiles(file);
                    }
                })
                .subscribe(new Action1<File>() {
                    @Override
                    public void call(File file) {
                        System.out.println(file.getAbsolutePath());
                    }
                });

    }

concatMap操作符

cancatMap操作符与flatMap操作符类似,都是把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者。

与flatMap操作符不同的是,concatMap操作符在处理产生的Observable时,采用的是“连接(concat)”的方式,而不是“合并(merge)”的方式,这就能保证产生结果的顺序性,也就是说提交给订阅者的结果是按照顺序提交的,不会存在交叉的情况。

concatMap的流程如下:

concatMap的调用例子与flatMap类似,这里不做重复

switchMap操作符

switchMap操作符与flatMap操作符类似,都是把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者。

与flatMap操作符不同的是,switchMap操作符会保存最新的Observable产生的结果而舍弃旧的结果,举个例子来说,比如源Observable产生A、B、C三个结果,通过switchMap的自定义映射规则,映射后应该会产生A1、A2、B1、B2、C1、C2,但是在产生B2的同时,C1已经产生了,这样最后的结果就变成A1、A2、B1、C1、C2,B2被舍弃掉了!流程图如下:

以下是flatMap、concatMap和switchMap的运行实例对比:

 //flatMap操作符的运行结果
        Observable.just(10, 20, 30).flatMap(new Func1<Integer, Observable<Integer>>() {
            @Override
            public Observable<Integer> call(Integer integer) {
                //10的延迟执行时间为200毫秒、20和30的延迟执行时间为180毫秒
                int delay = 200;
                if (integer > 10)
                    delay = 180;

                return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.println("flatMap Next:" + integer);
            }
        });

        //concatMap操作符的运行结果
        Observable.just(10, 20, 30).concatMap(new Func1<Integer, Observable<Integer>>() {
            @Override
            public Observable<Integer> call(Integer integer) {
                //10的延迟执行时间为200毫秒、20和30的延迟执行时间为180毫秒
                int delay = 200;
                if (integer > 10)
                    delay = 180;

                return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.println("concatMap Next:" + integer);
            }
        });

        //switchMap操作符的运行结果
        Observable.just(10, 20, 30).switchMap(new Func1<Integer, Observable<Integer>>() {
            @Override
            public Observable<Integer> call(Integer integer) {
                //10的延迟执行时间为200毫秒、20和30的延迟执行时间为180毫秒
                int delay = 200;
                if (integer > 10)
                    delay = 180;

                return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.println("switchMap Next:" + integer);
            }
        });

运行结果如下:

flatMap Next:20

flatMap Next:10

flatMap Next:30

flatMap Next:15

flatMap Next:10

flatMap Next:5

switchMap Next:30

switchMap Next:15

concatMap Next:10

concatMap Next:5

concatMap Next:20

concatMap Next:10

concatMap Next:30

concatMap Next:15

groupBy操作符

groupBy操作符是对源Observable产生的结果进行分组,形成一个类型为GroupedObservable的结果集,GroupedObservable中存在一个方法为getKey(),可以通过该方法获取结果集的Key值(类似于HashMap的key)。

值得注意的是,由于结果集中的GroupedObservable是把分组结果缓存起来,如果对每一个GroupedObservable不进行处理(既不订阅执行也不对其进行别的操作符运算),就有可能出现内存泄露。因此,如果你对某个GroupedObservable不进行处理,最好是对其使用操作符take(0)处理。

groupBy操作符的流程图如下:

调用例子如下:

Observable.interval(1, TimeUnit.SECONDS).take(10).groupBy(new Func1<Long, Long>() {
            @Override
            public Long call(Long value) {
                //按照key为0,1,2分为3组
                return value % 3;
            }
        }).subscribe(new Action1<GroupedObservable<Long, Long>>() {
            @Override
            public void call(GroupedObservable<Long, Long> result) {
                result.subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long value) {
                        System.out.println("key:" + result.getKey() +", value:" + value);
                    }
                });
            }
        });

运行结果如下:

key:0, value:0

key:1, value:1

key:2, value:2

key:0, value:3

key:1, value:4

key:2, value:5

key:0, value:6

key:1, value:7

key:2, value:8

key:0, value:9

map操作符

map操作符是把源Observable产生的结果,通过映射规则转换成另一个结果集,并提交给订阅者进行处理。

map操作符的流程图如下:

调用例子如下:

Observable.just(1,2,3,4,5,6).map(new Func1<Integer, Integer>() {
            @Override
            public Integer call(Integer integer) {
                //对源Observable产生的结果,都统一乘以3处理
                return integer*3;
            }
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.println("next:" + integer);
            }
        });

运行结果如下:

next:3

next:6

next:9

next:12

next:15

next:18

cast操作符

cast操作符类似于map操作符,不同的地方在于map操作符可以通过自定义规则,把一个值A1变成另一个值A2,A1和A2的类型可以一样也可以不一样;而cast操作符主要是做类型转换的,传入参数为类型class,如果源Observable产生的结果不能转成指定的class,则会抛出ClassCastException运行时异常。

cast操作符的流程图如下:

调用例子如下:

 Observable.just(1,2,3,4,5,6).cast(Integer.class).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer value) {
                System.out.println("next:"+value);
            }
        });

运行结果如下:

next:1

next:2

next:3

next:4

next:5

next:6

scan操作符

scan操作符通过遍历源Observable产生的结果,依次对每一个结果项按照指定规则进行运算,计算后的结果作为下一个迭代项参数,每一次迭代项都会把计算结果输出给订阅者。

scan操作符的流程图如下:

调用例子如下:

Observable.just(1, 2, 3, 4, 5)
    .scan(new Func2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer sum, Integer item) {
            //参数sum就是上一次的计算结果
            return sum + item;
        }
    }).subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

运行结果如下:

Next: 1

Next: 3

Next: 6

Next: 10

Next: 15

Sequence complete.

window操作符

window操作符非常类似于buffer操作符,区别在于buffer操作符产生的结果是一个List缓存,而window操作符产生的结果是一个Observable,订阅者可以对这个结果Observable重新进行订阅处理。

window操作符有很多个重载方法,这里只举一个简单的例子,其流程图如下:

调用例子如下:

Observable.interval(1, TimeUnit.SECONDS).take(12)
                .window(3, TimeUnit.SECONDS)
                .subscribe(new Action1<Observable<Long>>() {
                    @Override
                    public void call(Observable<Long> observable) {
                        System.out.println("subdivide begin......");
                        observable.subscribe(new Action1<Long>() {
                            @Override
                            public void call(Long aLong) {
                                System.out.println("Next:" + aLong);
                            }
                        });
                    }
                });

运行结果如下:

subdivide begin……

Next:0

Next:1

subdivide begin……

Next:2

Next:3

Next:4

subdivide begin……

Next:5

Next:6

Next:7

subdivide begin……

Next:8

Next:9

Next:10

subdivide begin……

Next:11

Filtering Observables(Observable的过滤操作符)

debounce操作符

debounce操作符对源Observable每产生一个结果后,如果在规定的间隔时间内没有别的结果产生,则把这个结果提交给订阅者处理,否则忽略该结果。

值得注意的是,如果源Observable产生的最后一个结果后在规定的时间间隔内调用了onCompleted,那么通过debounce操作符也会把这个结果提交给订阅者。

debounce操作符的流程图如下:

调用例子如下:

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                if(subscriber.isUnsubscribed()) return;
                try {
                    //产生结果的间隔时间分别为100、200、300...900毫秒
                    for (int i = 1; i < 10; i++) {
                        subscriber.onNext(i);
                        Thread.sleep(i * 100);
                    }
                    subscriber.onCompleted();
                }catch(Exception e){
                    subscriber.onError(e);
                }
            }
        }).subscribeOn(Schedulers.newThread())
          .debounce(400, TimeUnit.MILLISECONDS)  //超时时间为400毫秒
          .subscribe(
                new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.println("Next:" + integer);
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        System.out.println("Error:" + throwable.getMessage());
                    }
                }, new Action0() {
                    @Override
                    public void call() {
                        System.out.println("completed!");
                    }
                });

运行结果如下:

Next:4

Next:5

Next:6

Next:7

Next:8

Next:9

completed!

distinct操作符

distinct操作符对源Observable产生的结果进行过滤,把重复的结果过滤掉,只输出不重复的结果给订阅者,非常类似于SQL里的distinct关键字。

distinct操作符的流程图如下:

调用例子如下:

Observable.just(1, 2, 1, 1, 2, 3)
          .distinct()
          .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

运行结果如下:

Next: 1

Next: 2

Next: 3

Sequence complete.

elementAt操作符

elementAt操作符在源Observable产生的结果中,仅仅把指定索引的结果提交给订阅者,索引是从0开始的。其流程图如下:

调用例子如下:

Observable.just(1,2,3,4,5,6).elementAt(2)
          .subscribe(
                new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.println("Next:" + integer);
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        System.out.println("Error:" + throwable.getMessage());
                    }
                }, new Action0() {
                    @Override
                    public void call() {
                        System.out.println("completed!");
                    }
                });

运行结果如下:

Next:3

completed!

filter操作符

filter操作符是对源Observable产生的结果按照指定条件进行过滤,只有满足条件的结果才会提交给订阅者,其流程图如下:

调用例子如下:

Observable.just(1, 2, 3, 4, 5)
          .filter(new Func1<Integer, Boolean>() {
              @Override
              public Boolean call(Integer item) {
                return( item < 4 );
              }
          }).subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

运行结果如下:

Next: 1

Next: 2

Next: 3

Sequence complete.

ofType操作符

ofType操作符类似于filter操作符,区别在于ofType操作符是按照类型对结果进行过滤,其流程图如下:

调用例子如下:

Observable.just(1, "hello world", true, 200L, 0.23f)
          .ofType(Float.class)
          .subscribe(new Subscriber<Object>() {
              @Override
              public void onNext(Object item) {
                  System.out.println("Next: " + item);
              }

              @Override
              public void onError(Throwable error) {
                  System.err.println("Error: " + error.getMessage());
              }

              @Override
              public void onCompleted() {
                  System.out.println("Sequence complete.");
              }
          });

运行结果如下:

Next: 0.23

Sequence complete.

first操作符

first操作符是把源Observable产生的结果的第一个提交给订阅者,first操作符可以使用elementAt(0)和take(1)替代。其流程图如下:

调用例子如下:

 Observable.just(1,2,3,4,5,6,7,8)
          .first()
          .subscribe(new Subscriber<Integer>() {
              @Override
              public void onNext(Integer item) {
                  System.out.println("Next: " + item);
              }

              @Override
              public void onError(Throwable error) {
                  System.err.println("Error: " + error.getMessage());
              }

              @Override
              public void onCompleted() {
                  System.out.println("Sequence complete.");
              }
          });

运行结果如下:

Next: 1

Sequence complete.

single操作符

single操作符是对源Observable的结果进行判断,如果产生的结果满足指定条件的数量不为1,则抛出异常,否则把满足条件的结果提交给订阅者,其流程图如下:

调用例子如下:

Observable.just(1,2,3,4,5,6,7,8)
          .single(new Func1<Integer, Boolean>() {
              @Override
              public Boolean call(Integer integer) {
                  //取大于10的第一个数字
                  return integer>10;
              }
          })
          .subscribe(new Subscriber<Integer>() {
              @Override
              public void onNext(Integer item) {
                  System.out.println("Next: " + item);
              }

              @Override
              public void onError(Throwable error) {
                  System.err.println("Error: " + error.getMessage());
              }

              @Override
              public void onCompleted() {
                  System.out.println("Sequence complete.");
              }
          });

运行结果如下:

Error: Sequence contains no elements

last操作符

last操作符把源Observable产生的结果的最后一个提交给订阅者,last操作符可以使用takeLast(1)替代。其流程图如下:

调用例子如下:

Observable.just(1, 2, 3)
          .last()
          .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

运行结果如下:

Next: 3

Sequence complete.

ignoreElements操作符

ignoreElements操作符忽略所有源Observable产生的结果,只把Observable的onCompleted和onError事件通知给订阅者。ignoreElements操作符适用于不太关心Observable产生的结果,只是在Observable结束时(onCompleted)或者出现错误时能够收到通知。

ignoreElements操作符的流程图如下:

调用例子如下:

Observable.just(1,2,3,4,5,6,7,8).ignoreElements()
          .subscribe(new Subscriber<Integer>() {
              @Override
              public void onNext(Integer item) {
                  System.out.println("Next: " + item);
              }

              @Override
              public void onError(Throwable error) {
                  System.err.println("Error: " + error.getMessage());
              }

              @Override
              public void onCompleted() {
                  System.out.println("Sequence complete.");
              }
          });

运行结果如下:

Sequence complete.

sample操作符

sample操作符定期扫描源Observable产生的结果,在指定的时间间隔范围内对源Observable产生的结果进行采样,其流程图如下:

调用例子如下:

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                if(subscriber.isUnsubscribed()) return;
                try {
                    //前8个数字产生的时间间隔为1秒,后一个间隔为3秒
                    for (int i = 1; i < 9; i++) {
                        subscriber.onNext(i);
                        Thread.sleep(1000);
                    }
                    Thread.sleep(2000);
                    subscriber.onNext(9);
                    subscriber.onCompleted();
                } catch(Exception e){
                    subscriber.onError(e);
                }
            }
        }).subscribeOn(Schedulers.newThread())
          .sample(2200, TimeUnit.MILLISECONDS)  //采样间隔时间为2200毫秒
          .subscribe(new Subscriber<Integer>() {
              @Override
              public void onNext(Integer item) {
                  System.out.println("Next: " + item);
              }

              @Override
              public void onError(Throwable error) {
                  System.err.println("Error: " + error.getMessage());
              }

              @Override
              public void onCompleted() {
                  System.out.println("Sequence complete.");
              }
          });

运行结果如下:

Next: 3

Next: 5

Next: 7

Next: 8

Sequence complete.

skip操作符

skip操作符针对源Observable产生的结果,跳过前面n个不进行处理,而把后面的结果提交给订阅者处理,其流程图如下:

调用例子如下:

Observable.just(1,2,3,4,5,6,7).skip(3)
          .subscribe(new Subscriber<Integer>() {
              @Override
              public void onNext(Integer item) {
                  System.out.println("Next: " + item);
              }

              @Override
              public void onError(Throwable error) {
                  System.err.println("Error: " + error.getMessage());
              }

              @Override
              public void onCompleted() {
                  System.out.println("Sequence complete.");
              }
          });

运行结果如下:

Next: 4

Next: 5

Next: 6

Next: 7

Sequence complete.

skipLast操作符

skipLast操作符针对源Observable产生的结果,忽略Observable最后产生的n个结果,而把前面产生的结果提交给订阅者处理,

值得注意的是,skipLast操作符提交满足条件的结果给订阅者是存在延迟效果的,看以下流程图即可明白:

可以看到skipLast操作符把最后的天蓝色球、蓝色球、紫色球忽略掉了,但是前面的红色球等并不是源Observable一产生就直接提交给订阅者,这里有一个延迟的效果。

调用例子如下:

Observable.just(1,2,3,4,5,6,7).skipLast(3)
          .subscribe(new Subscriber<Integer>() {
              @Override
              public void onNext(Integer item) {
                  System.out.println("Next: " + item);
              }

              @Override
              public void onError(Throwable error) {
                  System.err.println("Error: " + error.getMessage());
              }

              @Override
              public void onCompleted() {
                  System.out.println("Sequence complete.");
              }
          });

运行结果如下:

Next: 1

Next: 2

Next: 3

Next: 4

Sequence complete.

take操作符

take操作符是把源Observable产生的结果,提取前面的n个提交给订阅者,而忽略后面的结果,其流程图如下:

调用例子如下:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
          .take(4)
          .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

运行结果如下:

Next: 1

Next: 2

Next: 3

Next: 4

Sequence complete.

takeFirst操作符

takeFirst操作符类似于take操作符,同时也类似于first操作符,都是获取源Observable产生的结果列表中符合指定条件的前一个或多个,与first操作符不同的是,first操作符如果获取不到数据,则会抛出NoSuchElementException异常,而takeFirst则会返回一个空的Observable,该Observable只有onCompleted通知而没有onNext通知。

takeFirst操作符的流程图如下:

调用例子如下:

Observable.just(1,2,3,4,5,6,7).takeFirst(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                //获取数值大于3的数据
                return integer>3;
            }
        })
          .subscribe(new Subscriber<Integer>() {
              @Override
              public void onNext(Integer item) {
                  System.out.println("Next: " + item);
              }

              @Override
              public void onError(Throwable error) {
                  System.err.println("Error: " + error.getMessage());
              }

              @Override
              public void onCompleted() {
                  System.out.println("Sequence complete.");
              }
          });

运行结果如下:

Next: 4

Sequence complete.

takeLast操作符

takeLast操作符是把源Observable产生的结果的后n项提交给订阅者,提交时机是Observable发布onCompleted通知之时。其流程图如下:

调用例子如下:

Observable.just(1,2,3,4,5,6,7).takeLast(2)
          .subscribe(new Subscriber<Integer>() {
              @Override
              public void onNext(Integer item) {
                  System.out.println("Next: " + item);
              }

              @Override
              public void onError(Throwable error) {
                  System.err.println("Error: " + error.getMessage());
              }

              @Override
              public void onCompleted() {
                  System.out.println("Sequence complete.");
              }
          });

运行结果如下:

Next: 6

Next: 7

Sequence complete.

不知不觉介绍了那么多操作符,篇幅有点长了,下回继续介绍其他的操作符,敬请期待!

转载:http://blog.csdn.net/job_hesc/article/details/46495281

时间: 2024-11-11 09:38:35

Android RxJava使用介绍(二) RxJava的操作符的相关文章

RxJava系列之二 变换类操作符具体解释1

1.回想 上一篇文章我们主要介绍了RxJava , RxJava 的Observables和 RxJava的just操作符.以及RxJava一些经常使用的操作. 没看过的抓紧点我去看吧. 事实上RxJava有非常多的操作符, 而我们学过的just仅仅是创建类操作符的当中一种. 以后我会陆续介绍其它的创建类操作符. 文章代码地址:https://github.com/jiang111/RxJavaDemo 2. 变换类操作符之map 開始本篇文章的解说: map操作符的详细使用方法. map是属于

【ALearning】第五章 Android相关组件介绍(二)Service

Service是Android中的四大组件之一,所以在Android开发过程中起到非常重要的作用.下面我们来看一下官方对Service的定义. A Service is an application component thatcan perform long-running operations in the background and does not provide auser interface. Another application component can start a se

Android RxJava使用介绍(三) RxJava的操作符

上一篇文章已经具体解说了RxJava的创建型操作符.本片文章将继续解说RxJava操作符.包括: Transforming Observables(Observable的转换操作符) Filtering Observables(Observable的过滤操作符) Transforming Observables(Observable的转换操作符) buffer操作符 buffer操作符周期性地收集源Observable产生的结果到列表中.并把这个列表提交给订阅者,订阅者处理后,清空buffer列

Android RxJava使用介绍(五) RxJava的操作符

本篇文章继续介绍以下类型的操作符 - Mathematical and Aggregate Operators(Observable数学运算及聚合操作符) 一.Concat Concat操作符将多个Observable结合成一个Observable并发射数据,并且严格按照先后顺序发射数据,前一个Observable的数据没有发射完,是不能发射后面Observable的数据的. 有两个操作符跟它类似,但是有区别,分别是 1.startWith:仅仅是在前面插上一个数据. 2.merge:其发射的数

Android RxJava使用介绍(四) RxJava的操作符

本篇文章继续介绍以下类型的操作符 Combining Observables(Observable的组合操作符) Error Handling Operators(Observable的错误处理操作符) Combining Observables(Observable的组合操作符) combineLatest操作符 combineLatest操作符把两个Observable产生的结果进行合并,合并的结果组成一个新的Observable.这两个Observable中任意一个Observable产生

Android RxJava使用介绍(一)概念

RxJava到底是什么?使用RxJava到底有什么好处呢?其实RxJava是ReactiveX中使用Java语言实现的版本,目前ReactiveX已经实现的语言版本有: Java: RxJava JavaScript: RxJS C#: Rx.NET C#(Unity): UniRx Scala: RxScala Clojure: RxClojure C++: RxCpp Ruby: Rx.rb Python: RxPY Groovy: RxGroovy JRuby:RxJRuby Kotlin

Android RxJava使用介绍(一) Hello World

最近在做东西的时候,一直在使用RxJava框架,越是深入了解RxJava,就越觉得这个框架威力实在是太大了.好东西不能一个人独自享受,后面几篇文章我会由浅入深来介绍一下RxJava的使用方法,相信看完之后,你会跟我一样逐渐喜欢上这个"威力无比"的武器!那么,RxJava到底是什么?使用RxJava到底有什么好处呢?其实RxJava是ReactiveX中使用Java语言实现的版本,目前Reactive... http://bar.cnyes.com/html/100102-1/8D260

RxJava 和 RxAndroid 二

前言:对Rx不了解的朋友可以先看我的第一篇博文 RxJava 和 RxAndroid ,是对Rxjava的基本介绍 1.merge操作符,合并观察对象 1 package app.com.myapplication; 2 3 import android.support.v7.app.AppCompatActivity; 4 import android.os.Bundle; 5 6 import java.util.ArrayList; 7 import java.util.List; 8 9

Rxjava的介绍与使用

最近RxJava越来越流行了,在移动端也越来越多的项目开始使用这个框架了!唯一的问题就是上手不容易,尤其是大部分人之前都是使用命令式编程语言.但是一旦你弄明白了,你就会发现RxJava真是太棒了. 这里仅仅是帮助你了解RxJava,整个系列共有四篇文章,希望你看完这四篇文章之后能够了解RxJava背后的思想,并且喜欢上RxJava. 基础 RxJava最核心的两个东西是Observables(被观察者,事件源)和Subscribers(观察者).Observables发出一系列事件,Subscr