RxJava操作符的简单使用

一、准备工作在app的build.gradle文件下的dependencies中添加依赖:

    compile ‘io.reactivex:rxjava:1.3.0‘
    compile ‘io.reactivex:rxandroid:1.2.1‘

二、RxJava的操作符用法:

1、create

//create  创建Observable
        //Observable 被观察者
        //Subscribers观察者

        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                Log.i(TAG,"call.....");
                if(!subscriber.isUnsubscribed()) {
                    subscriber.onNext("我被执行了1"); //只有完成了下面的订阅方法才能执行onNext等方法
                    subscriber.onNext("我被执行了2");
                    subscriber.onNext("我被执行了3");
                    subscriber.onNext("我被执行了4");
                    subscriber.onNext("我被执行了5");
                    subscriber.onNext("我被执行了6");

                    subscriber.onCompleted();
                }
            }
        });

        //完成订阅
        observable.subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                Log.i(TAG,"onCompleted") ;
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG,"onError:"+e.getMessage());
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG,"onNext:"+s);
            }
        }) ;

2、from

//from 也可以创建Observable  讲int类型的数组01234转化为Observable,通过订阅的onNext可以一个个的取到数组的值
        Integer[] arr = {0,1,2,3,4};
        //完成创建Observable
        Observable<Integer> from = Observable.from(arr);
        //完成订阅
        from.subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.i(TAG,"onCompleted") ;
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG,"onError:"+e.getMessage());
            }

            @Override
            public void onNext(Integer i) {
                Log.i(TAG,"onNext:"+i);
            }
        }) ;

3、just

//just  也可以创建Observable  参数是可变参数
        Observable.just(0, 1, 2, 3, 4, 5)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.i(TAG,"call:"+integer);
                    }
                }) ;

4、map

//map 变换,在该实例中,将Integer转换成String
//在实际开发中,需要通过String类型的url获取到图片

        String[] urls = {};
        Observable.from(urls)
                .map(new Func1<String, Bitmap>() { //第一个参数为传入的类型,第二个参数为返回的类型
                    @Override
                    public Bitmap call(String s) {
                        //s 代表图片url,网络请求通过url 获取到图片
                        return null;
                    }
                }).subscribe(new Action1<Bitmap>() { // 获取的的Bitmap类型再进行订阅处理
            @Override
            public void call(Bitmap bitmap) {
                //iv.setBackage()
            }
        });

        /*Observable.just(0,1,2,3)
                .map(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        return integer+"转换了";
                    }
                }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i(TAG,"call:"+s);//0转换了  1转换了。。。
            }
        });*/

5、flatMap

//flatmap 转换  将多个Observable转换成一个Observalbe然后发送
        //获取城市的天气

        /**
         *
         * map 是将一种类型转换成另一种类型(可以是任意类型)
         *
         * flatmap 是将一种类转换成Observable(泛型可以是任意的)
         *
         */

        String[] citys = {"北京","上海","杭州"};
        Observable.from(citys)
                .flatMap(new Func1<String, Observable<WeathData>>() {
                    @Override
                    public Observable<WeathData> call(String s) {
                        return getCityWeathData(s);
                    }
                }).subscribe(new Action1<WeathData>() {
            @Override
            public void call(WeathData weathData) {
                Log.i(TAG,weathData.city+weathData.state);
            }
        });

此处用到的getCityWeathData方法返回出来一个泛型为WeathData的Observable:

/**
     * 获取一个城市的天气数据
     * @param city
     * @return
     */
    private Observable<WeathData> getCityWeathData(final String city){
        return Observable.just(city)
                .map(new Func1<String, WeathData>() {
                    @Override
                    public WeathData call(String s) {
                        //通过网络请求获取城市的天气数据
                        WeathData weathData = new WeathData();
                        weathData.city = city ;
                        weathData.state = "晴天" ;
                        return weathData ;
                    }
                });
    }

6、zip

//zip  将两个Observable按照规则严格的合成一个Observable
        Observable<Integer> observable1 = Observable.just(10, 20, 30,40);
        Observable<Integer> observable2 = Observable.just(1, 2, 3,4);

        Observable.zip(observable1, observable2, new Func2<Integer, Integer, String>() {
            @Override
            public String call(Integer integer, Integer integer2) {
                //定义合并规则
                return integer + integer2 + "abc";
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String string) {
                Log.i(TAG,"call:"+string) ;
            }
        });

7、zipWith

//zipwith 将本身与其他的Observable按照规则严格的合并成一个Observable
        Observable.just(10,20,30,40)
                .zipWith(Observable.just("a", "b", "c"), new Func2<Integer, String, String>() {
                    @Override
                    public String call(Integer integer, String s) {
                        //合并规则
                        return integer + s ;
                    }
                }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i(TAG,"call:"+s) ;
            }
        });

8、retry

//retry 在出错的时候重试(异常的时候重新执行)
        //用处:网络连接异常的时候
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                try {
                    for (int i = 0; i < 5; i++) {
                        if (i == 3) {
                            throw new Exception("出错了");
                        }
                        subscriber.onNext(i);
                    }
                    subscriber.onCompleted();
                }catch (Exception e){
                    subscriber.onError(e);
                }
            }
        }).retry(2).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.i(TAG,"onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG,"onError:"+e.getMessage()) ;
            }

            @Override
            public void onNext(Integer integer) {
                Log.i(TAG,"onNext:"+integer) ;
            }
        });

9、retryWhen

//retrywhen 异常的时候执行
        //网络请求框架中,一般使用retryWhen  要执行操作是连接网络,连接出异常的时候,
        // 1、我们可以直接重复执行连接网络,retry
        // 2、同时我们也可以判断连接异常的类型,再做决定是否重连 retyrWhen
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                Log.i(TAG,"总出错");
                subscriber.onError(new Throwable("出错了"));
            }
        }).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
            @Override
            public Observable<?> call(Observable<? extends Throwable> observable) {
                return observable.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
                    @Override
                    public Integer call(Throwable throwable, Integer integer) {
                        return integer;
                    }
                }).flatMap(new Func1<Integer, Observable<?>>() {
                    @Override
                    public Observable<?> call(Integer integer) {
                        //timer 延迟执行的操作符
                        Log.i(TAG,"延迟"+integer+"秒");
                        return Observable.timer(integer, TimeUnit.SECONDS);
                    }
                });
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.i(TAG,"onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG,"onError:"+e.getMessage()) ;
            }

            @Override
            public void onNext(Integer integer) {
                Log.i(TAG,"onNext:"+integer) ;
            }
        }) ;

10、filter

//filter 按照规则过滤
        Observable.just(0,1,2,3,4,5)
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer < 2;
                    }
                }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.i(TAG,"call:"+integer) ;
            }
        });

。。。

原文地址:https://www.cnblogs.com/happy-warmth/p/10506638.html

时间: 2024-10-21 13:42:01

RxJava操作符的简单使用的相关文章

RxJava操作符——条件和布尔操作符(Conditional and Boolean Operators)

RxJava系列教程: 1. RxJava使用介绍 [视频教程] 2. RxJava操作符 ? Creating Observables(Observable的创建操作符) [视频教程] ? Transforming Observables(Observable的转换操作符) [视频教程] ? Filtering Observables(Observable的过滤操作符) [视频教程] ? Combining Observables(Observable的组合操作符) [视频教程] ? Erro

RxJava操作符总结之过滤

RxJava操作符总结之过滤 jsut() just(T t1, T t2, T t3 ....) ,just能够传入多个同样类型的參数,并将当前參数一个接着一个的发送. Observable.just("1","2","3") .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); 1

RxJava操作符(03-变换操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51649975 本文出自:[openXu的博客] 目录: Buffer FlatMap flatMapIterable concatMap switchMap GroupBy Map cast Scan Window 源码下载 ??变换操作符的作用是对Observable发射的数据按照一定规则做一些变换操作,然后将变换后的数据发射出去,接下来看看RxJava中主要有哪些变换操作符: 1. B

RxJava操作符 -创建型

操作符类型 创建操作 变换操作 过滤操作 组合操作 错误处理 辅助操作 条件和布尔操作 算术和聚合操作 连接操作 转换操作 创建操作 create 你可以使用create操作符从头开始创建一个Observable,给这个操作符传递一个接受观察者作为参数的函数,编写这个函数让它的行为表现为一个Observable–恰当的调用观察者的onNext,onError和onCompleted方法. 一个形式正确的有限Observable必须尝试调用观察者的onCompleted正好一次或者它的onErro

Android RxJava操作符一览

前言 把现在接触到的操作符全部整理进来,方便查阅,遇到新的也会添加进来.和RxJavaLearn 的README.md同步更新. 操作符决策树 直接创建一个Observable(创建操作) 组合多个Observable(组合操作) 对Observable发射的数据执行变换操作(变换操作) 从Observable发射的数据中取特定的值(过滤操作) 转发Observable的部分值(条件/布尔/过滤操作) 对Observable发射的数据序列求值(算术/聚合操作) 创建操作 用于创建Observab

RxJava操作符(二) __变换操作

RxJava变换操作符 这周真心比较累,一直都在加班,今天才有点自己的时间来学习新的内容,外包工作苦啊! 上周学习了各种创建操作符,像create,from,Just,Defer-.等等,这周中也工作中也用了不少,有时间也需要总结一下自己在工作中使用的操作符.好了,现在来开始学习一个变换操作符吧,不知道什么意思没关系,一个一个去试错吧. map 官方的翻译是对于Observable发射的每一项数据,都会应用一个函数,执行变换操作,然后返回一个发射这些结果的Observable. 还是举个例子吧,

RxJava操作符repeatWhen()和retryWhen()

第一次见到.repeatWhen()和.retryWhen()这两个操作符的时候就非常困惑了.不得不说,它们绝对是"最令人困惑弹珠图"的有力角逐者. 然而它们都是非常有用的操作符:允许你有条件的重新订阅已经结束的Observable.我最近研究了它们的工作原理,现在我希望尝试着去解释它们(因为,我也是耗费了一些精力才参透它们). Repeat与Retry的对比 首先,来了解一下.repeat()和.retry()之间最直观的区别是什么?这个问题并不难:区别就在于什么样的终止事件会触发重

RxJava操作符(02-创建操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51645348 本文出自:[openXu的博客] 目录: Create Defer EmptyNeverThrow From Interval Just Range Repeat Timer 源码下载 ??在上一篇博客中我们初步体验了Rxjava的使用,领略了RxJava对于异步操作流编码的简洁风格,接下来的一系列博客,我们主要学习RxJava中的操作符,也就是RxJava的一些API,由于

RxJava操作符(05-结合操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51656736 本文出自:[openXu的博客] 目录: CombineLatest Join Merge StartWith Switch Zip 源码下载 结合操作就是将多个Observable发射的数据按照一定规则组合后发射出去,接下来看看RxJava中的结合操作符: 1. CombineLatest ??当两个Observables中的任何一个发射数据时,使用一个函数结合每个Obse