rxJava入门学习笔记

前言

现在rx系列似乎是火的不行了,一打开群就是各种rxJava、rxAndroid。

最近正恶补各种新技术来充实自己,所以前些天写完两篇介绍JuheNews项目的文章之后,马上又开始加入了学习rxJava的阵营当中。

欢迎来到rxJava

刚开始看rxJava的系列文章的时候,一万头草泥马在心里来回奔腾:这tm跟屎一样的东西写的都是个啥?现在开始找到了一点感觉了。

网上很多大神都把rxJava看成设计模式中的观察者模式,刚开始接触它的时候,它的链式调用我反而感觉有点像建造者模式(比如AlertDialog),不过有区别的是建造者模式中方法调用的顺序并不影响最后的建造结果,而在rxJava的链式调用中,各种Operators的顺序会影响后面的结果。这一点我们可以在后面验证。

在我看来,rxJava核心的部分有3个:

  • Observables 被观察者,数据源。
  • Operators 操作符,对Observables的各种加工的操作。
  • Subscribers 观察者,负责对整个流程的把控,包括接收Observables被Operators加工后的结果、处理加工时的异常、监听处理加工完成时的事件等。

所以rxJava风格的代码一般按下面四个步骤来写:

1、创建数据源Observables

2、对Observables添加各种你想要的加工操作(这一步可以没有,视具体业务而定)

3、创建Subscribers

4、绑定Observables和Subscribers(不绑定怎么接收Observables传过来的数据呢?)

初学时可能看的一头雾水,但是可以不用过于纠结里面的细节,先熟悉这种写法就行。举个栗子,我们现在要用rxJava的风格来输出一句hello rx,该怎么写呢?

Hello RxJava

要使用rxJava我们需要在项目中添加依赖库,即在build.gradle文件中添加如下代码:

compile ‘io.reactivex:rxjava:1.1.5‘

配置完成后,我们先来看一段最简单的rxJava风格的代码。

    private void hello() {
        //创建数据源
        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {

                //添加数据
                subscriber.onNext("hello rx");
                //数据添加完成
                subscriber.onCompleted();
            }
        });
        //创建观察者
        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {

                //接受数据源添加过来的数据
                v(s);//这只是一句打印log的普通操作,我自己写的方法
            }
        };
        //绑定数据源和观察者
        observable.subscribe(subscriber);
    }

上面创建数据源时用的Observable.create()静态方法,传入一个Observable.OnSubscribe对象,并且重写了它的call方法。如果你不理解,暂时把它当成这样一个模板即可,总之,这里只是为了创建一个observable对象,不必深究细节。

需要注意的是call()方法的参数subscriber是哪里来的呢?看代码第33行,就是绑定Observable和Subscriber时传进来的,所以代码第8行调用完成之后,就会跳到29行。

是不是看的一头雾水,没关系,rxJava的使用模板就在这里了,不管rxJava有多牛逼多强大,再多复杂的操作都是按这种套路写的。

Simple Hello

如果你觉得上面的代码有点多,看看下面的写法:

    private void simpleHello() {

        //创建Observable和绑定subscribe过程写在一句话里面了
        Observable.just("hello rx").subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {

                v(s);
            }
        });
    }

和上面那个Observable.create()方法一样,Observable.just()也是Observable的一个静态方法,并且它返回一个Observable对象。上面的写法就等同于下面:

    private void simpleHello() {

        //创建数据源
        Observable<String> observable = Observable.just("hello rx");
        //创建观察者
        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {

                //接受数据源添加过来的数据
                v(s);//这只是一句打印log的普通操作,我自己写的方法
            }
        };
        //绑定数据源和观察者
        observable.subscribe(subscriber);
    }

还是觉得有点多,不过这里只是为了举例说明rxJava的用法。

operate

操作符是rxJava最核心、最牛逼的功能之一,说这句话没有人会打我吧。。

我理解的操作符,其实是rxJava中封装好的一系列方法,然后开发者可以很简单方便的api调用来实现以往那些需要复杂逻辑的操作。

比如要过滤一些数据,可以通过filter操作符,其实就是调用filter()方法;

比如要遍历一个集合,可以通过from操作符,其实就是调用from()方法;

比如要将数据源转换成另一个observable,可以通过map操作符,其实就是调用map()方法;

还有很多···

举个栗子,我们要实现一个功能:传入一些整数,过滤掉其中的奇数,然后将这些数乘以2,最后打印出来。

    private void operate() {

        //创建监听者
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer value) {

                v(value + "");
            }
        };
        //创建被监听者、数据源
        Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {

                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onCompleted();
            }
        });
        //绑定数据源和观察者
        observable.subscribe(subscriber);
    }

为了避免思维混乱,上面的代码暂时没有加入过滤、乘以2的操作,如果你现在能看懂这些代码,说明现在你对rxJava已经了解一二了。

对于过滤奇数的操作,需要调用filter操作符,如下:

observable.filter(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {

                return integer % 2 == 0;
            }
        })

上面call()方法中返回false的数据将会被丢弃,所以我们只需写integer % 2 == 0即可把所有奇数过滤掉了。

对于乘以2的操作,需要调用map操作符来完成,如下:

observable.map(new Func1<Integer, Integer>() {
            @Override
            public Integer call(Integer integer) {
                return integer * 2;
            }
        })

call()方法的返回值类型可以自定义,如果你想把过滤后的偶数除以2,返回double类型,则代码如下:

observable.map(new Func1<Integer, Double>() {
            @Override
            public Double call(Integer integer) {
                return integer / 2;
            }
        })

结合上面两步,实现传入一些整数,过滤掉其中的奇数,然后将这些数乘以2,最后打印出来的这个功能最后的代码如下:

private void operate() {

        //创建监听者
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer value) {

                v(value + "");
            }
        };
        //创建被监听者、数据源
        Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {

                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onNext(4);
                subscriber.onNext(5);
                subscriber.onCompleted();
            }
        });
        //操作数据源
        observable.filter(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {

                return integer % 2 == 0;
            }
        }).map(new Func1<Integer, Integer>() {
            @Override
            public Integer call(Integer integer) {
                return integer * 2;
            }
        })
                //必须将这些操作及时绑定到监听者,否则这些操作将丢失
                .subscribe(subscriber);
    }

一开始的时候,我们说过rxJava中的监听者模式跟建造者模式不一样,其中有一点就是rxJava中操作符的顺序会影响最后的结果,而建造者模式中不会。

如果上面传入一些整数,过滤掉其中的奇数,然后将这些数乘以2,最后打印出来的这个需求中,改成传入一些整数,将这些数乘以2,然后过滤掉其中的奇数,最后打印出来,两种需求最后打印的结果是一样吗?(你们觉得呢?)

传入一些整数,过滤掉其中的奇数,然后将这些数乘以2,最后打印出来的结果如下:

传入一些整数,将这些数乘以2,然后过滤掉其中的奇数,最后打印出来的结果如下:

结果一目了然,大家可以把代码中的filtermap操作符的顺序调换一下看看是不是上面的结果。

map操作符类似的还有flatMap,不过flatMap返回的是个 Observable 对象。

假设有个Student对象,现在要打印学生的所有课程Course的名字。它们的类结构如下:

public static class Student {
        String name;
        List<Course> courseList;

        public static Student create() {

            Student student = new Student();
            student.name = "";
            student.courseList = new ArrayList<>(1);
            for (int i = 0; i < 10; i++) {
                student.courseList.add(new Course(i + ""));
            }

            return student;
        }

        public static class Course {
            String name;//课程名
            double score;//学分

            public Course(String n) {
                name = n;
                score = 0;
            }
        }
    }

如果按照上面的介绍,你是不是已经想到手动取出所有课程,然后用from和map取出课程名?

no no no ,那样太low了,不符合rxJava的风格。看看flatMap是怎么大展身手的吧:

private void flatMap() {

        Observable.create(new Observable.OnSubscribe<Student>() {
            @Override
            public void call(Subscriber<? super Student> subscriber) {

                Student student = Student.create();
                subscriber.onNext(student);
            }
        }).flatMap(new Func1<Student, Observable<Student.Course>>() {
            @Override
            public Observable<Student.Course> call(Student student) {

                return Observable.from(student.courseList);
            }
        }).subscribe(new Subscriber<Student.Course>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Student.Course course) {

                v(course.name);
            }
        });
    }

吓尿了没?flatMap操作符的具体原理可以去看扔物线大大的那篇文章。

Schedulers

还有两个特别的操作符需要单独拿出来,那就是subscribeOnobserveOn

先来看看扔物线大大给 Android 开发者的 RxJava 详解文章中的描述:

subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。

observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

说起来有点绕,具体什么意思呢?那就是切换线程。

还拿之前的hello rxJava说事,只是比之前加了切换线程的操作,如下:

private void schedulers() {

        //打印线程名
        v("schedulers(): " + Thread.currentThread().getName());

        //创建数据源
        Observable<String> observable = Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {

                        //打印线程名
                        v("create Observable: " + Thread.currentThread().getName());

                        //添加数据
                        subscriber.onNext("hello rx");
                        //数据添加完成
                        subscriber.onCompleted();
                    }
                })
                //** 在work线程订阅事件
                .subscribeOn(Schedulers.io())
                //** 在main线程观察操作
                .observeOn(AndroidSchedulers.mainThread());

        //创建观察者
        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onCompleted() {

                //打印线程名
                v("subscriber onCompleted():" + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {

                //打印线程名
                v("subscriber onNext():" + Thread.currentThread().getName());
            }
        };
        //绑定数据源和观察者
        observable.subscribe(subscriber);
    }

(注意:要使用AndroidSchedulers.mainThread(),必须在build.gradle中添加rxAndroid的依赖库哦:compile ‘io.reactivex:rxandroid:1.2.0‘

下面是控制台的打印结果:

可以看到:

  • 由于指定了subscribeOn(Schedulers.io()),所以Observable.create()创建Observable对象时,Observable的call()方法在rx的IO线程执行;
  • 由于指定了observeOn(AndroidSchedulers.mainThread()),所以subscriber接收数据以及回调完成事件时,均在主线程执行。

我们把两个线程换一下,再来看打印结果。代码如下:

                //** 在main线程订阅事件
                .subscribeOn(AndroidSchedulers.mainThread())
                //** 在work线程观察操作
                .observeOn(Schedulers.io());

日志的第一句schedulers(): main是由于整段代码都是在Activity发起的,当然是main线程,所以不用管,只看后面3句。

可以看到,它们的线程变成了我们指定的线程。rxJava中切换线程两句话就搞定,妈妈再也不用担心我阻塞UI了~

结合我们的实际开发经验,如果我们要在sd卡中Pictures目录中取出最多10张.png格式的图片,然后设置到ImageView中,代码就可以这么写:

    private void from() {

        //得到所有图片的路径
        String[] dirArr = getExternalFilesDir(Environment.DIRECTORY_PICTURES).list();
        Observable
                //from操作符,遍历dirArr数组
                .from(dirArr)
                //filter得到上面传下来的单个元素,然后过滤非.png格式的图片
                .filter(new Func1<String, Boolean>() {
                    @Override
                    public Boolean call(String s) {
                        return s.endsWith(".png");
                    }
                })
                //经过上面两步得到合适的图片之后,将图片路径转成bitmap
                .map(new Func1<String, Bitmap>() {
                    @Override
                    public Bitmap call(String s) {

                        //外部方法:根据图片路径获取bitmap
                        return readBitmapFromPath(s);
                    }
                })
                //最多取10张图片
                .take(10)
                //防止阻塞ui,指定 subscribe() 发生在 IO 线程
                .subscribeOn(Schedulers.io())
                //最后指定 Subscriber 的回调发生在主线程
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Bitmap>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Bitmap bitmap) {

                        //主线程
                        setBitmap2Imageview(bitmap);
                    }
                });
    }

    private void setBitmap2Imageview(Bitmap bitmap) {
        //根据实际业务修改逻辑
    }

    private Bitmap readBitmapFromPath(String path) {
        //根据实际业务修改逻辑

        return Bitmap.createBitmap(10, 10, Bitmap.Config.ALPHA_8);
    }

除了Schedulers.io()、AndroidSchedulers.mainThread(),还有其他几个调度器,如下:

Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。

Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

AndroidSchedulers.mainThread():Android专有,它指定的操作将在 Android 主线程运行。

链式调用的这种行云流水的感觉有没有吸引到你呢?逻辑清晰,代码简洁,最重要的是很多事情已经不需要你自己去考虑了:省心。

error

如果上面的这些还吸引不了你,我想你胃口有点大了,那就再给你来点点心吧。

不知道平时开发中大家是怎么处理crash的,很多时候我们都是到处try-catch,但是现在你已经不需要担心这些问题了,rxJava调用的过程中,所有的异常都会跑到onError()的回调当中。

不信?

    private void error() {
        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {

                subscriber.onNext("");
            }
        }).filter(new Func1<String, Boolean>() {
            @Override
            public Boolean call(String s) {
                return s.charAt(2) > ‘a‘;
            }
        }).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

                v(e.toString());
            }

            @Override
            public void onNext(String s) {

                s.substring(3);
            }
        });
    }

代码第11行和第28行中,如果放在平时是不是已经数组越界然后程序就crash了,但是在rxJava中会直接进入onError中,程序依然可以正常运行。

大大降低了我们程序crash的概率啊有木有!!再也不会动不动就被产品经理骂的狗血淋头了有木有!!!T^T

当然并不是教大家投机取巧,只是感慨rxJava中这种优雅的处理crash的方式让人觉得很舒服,难道你不心动吗?

最后

不知不觉就啰嗦了这么多,刚开始接触rxJava的时候,心里就感觉:这屎一样的东西都特么写的是啥?后来渐渐了解之后,又开始觉得这屎味道还不错啊。。

祝大家的进阶之路一切顺利。

参考

给 Android 开发者的 RxJava 详解

深入浅出RxJava(一:基础篇)

深入浅出RxJava( 二:操作符 )

深入浅出RxJava( 三:响应式的好处 )

深入浅出RxJava( 四:在Android中使用响应式编程 )

时间: 2024-10-01 07:36:10

rxJava入门学习笔记的相关文章

Hadoop入门学习笔记---part4

紧接着<Hadoop入门学习笔记---part3>中的继续了解如何用java在程序中操作HDFS. 众所周知,对文件的操作无非是创建,查看,下载,删除.下面我们就开始应用java程序进行操作,前提是按照<Hadoop入门学习笔记---part2>中的已经在虚拟机中搭建好了Hadoop伪分布环境:并且确定现在linux操作系统中hadoop的几个进程已经完全启动了. 好了,废话不多说!实际的例子走起. 在myeclipse中新建一个java工程: 在项目工程中新建一个lib包用于存放

Hadoop入门学习笔记---part1

随着毕业设计的进行,大学四年正式进入尾声.任你玩四年的大学的最后一次作业最后在激烈的选题中尘埃落定.无论选择了怎样的选题,无论最后的结果是怎样的,对于大学里面的这最后一份作业,也希望自己能够尽心尽力,好好做.正是因为选题和hadoop有关,现在正式开始学习hadoop.将笔记整理于此,希望与志同道合的朋友共同交流. 作者:itRed 邮箱:[email protected] 个人博客链接:http://www.cnblogs.com/itred 好了,废话不多说.进入正题!开始hadoop的学习

汇编入门学习笔记 (六)—— si、di,双重循环

疯狂的暑假学习之  汇编入门学习笔记 (六)-- si.di,双重循环 参考: <汇编语言> 王爽 第7章 1. and和or指令,与[bx+idata] and和or,就不多说了. [bx+idata] 这样写是可以的,某些情况下,比较方便. [bx+idata] 也可以写成 idata[bx] 直接见例子: 把'ABcde' 跟 'fGHig' 都改成大写(ASCII中大写字母与小写字母二进制中,只有第五位不同,大写字母是0,小写字母是1) assume cs:code,ds:data d

汇编入门学习笔记 (八)—— 转移指令

疯狂的暑假学习之  汇编入门学习笔记 (八)--  转移指令 參考: <汇编语言> 王爽 第9章 能够改动ip或者同一时候改动cs和ip的指令统称为转移指令. 8086CPU转移行为分为: 段内转移:仅仅改动ip 段间转移:同一时候改动cs和ip 段内转移按ip改动的范围可分为: 短转移:ip改动范围 -128~127 近转移:ip改动范围 -32768~32767 转移指令分为: 无条件转移指令.如 jmp 条件转移指令 循环指令.如 loop 过程. 中断. 1. offset,nop指令

Hadoop入门学习笔记---part3

2015年元旦,好好学习,天天向上.良好的开端是成功的一半,任何学习都不能中断,只有坚持才会出结果.继续学习Hadoop.冰冻三尺,非一日之寒! 经过Hadoop的伪分布集群环境的搭建,基本对Hadoop有了一个基础的了解.但是还是有一些理论性的东西需要重复理解,这样才能彻底的记住它们.个人认为重复是记忆之母.精简一下: NameNode:管理集群,并且记录DataNode文件信息: SecondaryNameNode:可以做冷备份,对一定范围内的数据作快照性备份: DataNode:存储数据:

汇编入门学习笔记 (三) —— 第一个程序

疯狂的暑假学习之  汇编入门学习笔记 (三)-- 第一个程序 参考:<汇编语言> 王爽  第四章 1.一个源程序从写到执行的过程 第一步:编写汇编源程序 第二步:对源程序进行编译连接 第三步:在操作系统中执行 2.源程序 代码: assume cs:first first segment start: mov ax,2 add ax,ax add ax,ax mov ax,4C00H int 21H first ends end start 代码解释: assume .segment.ends

Python 实现 CNKI批量下载 和FireFox Extension 入门学习笔记

?                                 Python 实现 CNKI批量下载 和FireFox Extension 入门学习笔记? 由于需要也是为了督促自己学习新的东西,我原本想要尝试着写一个爬虫程序,能够在cnki上自动得将论文进行批量下载,学习过程中遇到了诸多情况,cnki也真是专业,不得不佩服cnki的强大. 下面进入正题: 学习.实验环境:ubuntu 14.04 工具:Eclipse ,  FireFox, FireBug,HttpFox 编程语言:pyth

汇编入门学习笔记 (十四)—— 直接定址表

疯狂的暑假学习之  汇编入门学习笔记 (十四)-- 直接定址表 参考: <汇编语言> 王爽 第16章 1. 描述单元长度的标号 普通的标号:a,b assume cs:code code segment a:db 1,2,3,4,5,6,7,8 b:dw 0 start: mov si,offset a mov di,offset b mov ah,0 mov cx,8 s: mov al,cs:[si] add cs:[di],ax inc si loop s mov ax,4c00h in

汇编入门学习笔记 (四)—— [BX] 和 loop指令

疯狂的暑假学习之  汇编入门学习笔记 (四)-- [BX]  和 loop指令 参考:<汇编语言> 王爽 第5章 1.[BX] mov al,[1] 在debug中,会把bs:1 中数据赋给al,但在在masm中不会把bs:1 中数据赋给al,而是把 [1] 认为是 1 赋给al. 如果要实现在debug中的mov al,[1],在masm中就需要[bx] 如: mov bx,1 mov al,[bx] 还可以用 bs:[1] 的方式 如: mov al,bs:[1] 2.loop 循环 要使