手把手带你入门神秘的RxJava

1.什么是RxJava
? Rx是Reactive Extensions的简写,翻译为响应的扩展。也就是通过由一方发出信息,另一方响应信息并作出处理的核心框架代码。
? 该框架由微软的架构师Erik Meijer领导的团队开发,并在2012年11月开源。
? Rx库支持.NET、JavaScript和C++等,现在已经支持几乎全部的流行编程语言了。
? Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io。
? RxJava作为一个流行的框架,其源码依托在GitHub,除了支持RxJava,针对安卓系统也除了一个支持框架RxAndroid
2.RxJava简化代码
一般我们在安卓项目中,如果想从后台获取数据并刷新界面,代码大概如下,下面我们来看一个例子:
new Thread() {br/>@Override
public void run() {
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(new Runnable() {br/>@Override
public void run() {
imageCollectorView.addImage(bitmap);
}
});
}
}
}
}
}.start();
上面的代码经过多层嵌套后 可读性太差了!如果你用了RxJava 可以这样写:
Observable.from(folders)
.flatMap(new Func1<File, Observable<File>>() {
br/>@Override
public Observable<File> call(File file) {
return Observable.from(file.listFiles());
}
})
.filter(new Func1<File, Boolean>() {
br/>@Override
public Boolean call(File file) {
return file.getName().endsWith(".png");
}
})
.map(new Func1<File, Bitmap>() {
br/>@Override
public Bitmap call(File file) {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
br/>@Override
public void call(Bitmap bitmap) {
imageCollectorView.addImage(bitmap);
}
});
这样写的好处就是减少层次嵌套 提高了代码的可读性,除了简化代码,RxJava还可以为每个方法提供特定的运行线程。

3.引入框架
目前RxJava已经升级为2.0版本,但为了能够更好的理解RxJava,我们可以从1.0版本开始学习。也为了让我们的安卓项目能够更好的使用RxJava,可以在项目中引入gradle脚本依赖:
compile ‘io.reactivex:rxandroid:1.2.1‘
compile ‘io.reactivex:rxjava:1.1.6‘
现在 我们的项目已经支持RxJava的功能了。
4.响应式的核心
所谓的响应式,无非就是存在这样的2个部分,一部分负责发送事件/消息,另一部分负责响应事件/消息。
以前如果我们想看新闻,一般需要通过看报纸。比如,你对某个报刊杂志比较感兴趣,那么你首先要做3件事:

  1. 提供你家的地址
  2. 找到对应的报社
  3. 去报社订阅整个月的报纸
    经过了上面的流程,以后每天只要有新的报刊资料出来了,报社都会将杂志发送到你家。

    将上面的例子进行代码抽象,步骤如下:
  4. 提供观察者(因为你是关心杂志内容的人 所以你是观察该事件的人)
  5. 提供被观察者(只要有新的杂志出来 就需要通知关心的人 所以报社是被观察的对象)
  6. 订阅(也就是 观察者&被观察者之间要相互关联 以便被观察的对象一变化 就会马上通知观察该事件的对象)

    上面示例的演示代码如下:
    //1.创建被观察者
    Observable<String> observable =
    Observable.create(new Observable.OnSubscribe<String>() {
    br/>@Override
    public void call(Subscriber<? super String> subscriber) {
    //4.开始发送事件
    //事件有3个类型 分别是onNext() onCompleted() onError()
    //onCompleted() onError() 一般都是用来通知观察者 事件发送完毕了,两者只取其一。
    subscriber.onNext("Hello Android !");
    subscriber.onNext("Hello Java !");
    subscriber.onNext("Hello C !");
    subscriber.onCompleted();
    }
    });

    //2.创建观察者
    Subscriber<String> subscriber = new Subscriber<String>() {
    br/>@Override
    public void onCompleted() {
    Log.i(TAG, "onCompleted ");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }
    
    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }

    };

    //3.订阅
    observable.subscribe(subscriber);
    输出如下:
    com.m520it.rxjava I/IT520: onNext: Hello Android !
    com.m520it.rxjava I/IT520: onNext: Hello Java !
    com.m520it.rxjava I/IT520: onNext: Hello C !
    com.m520it.rxjava I/IT520: onCompleted
    代码运行的原理
    ? 上面的代码中,当观察者subscriber订阅了被观察者observable之后,系统会自动回调observable对象内部的call()。
    ? 在observable的call()方法实体中,发送了如onNext/onCompleted/onError事件后。
    ? 接着subscriber就能回调到到对应的方法。
    5.被观察者变种
    普通的Observable发送需要三个方法onNext, onError, onCompleted,而Single作为Observable的变种,只需要两个方法:
    ? onSuccess - Single发射单个的值到这个方法
    ? onError - 如果无法发射需要的值,Single发射一个Throwable对象到这个方法
    Single只会调用这两个方法中的一个,而且只会调用一次,调用了任何一个方法之后,订阅关系终止。
    final Single<String> single = Single.create(new Single.OnSubscribe<String>() {
    br/>@Override
    public void call(SingleSubscriber<? super String> singleSubscriber) {
    //先调用onNext() 最后调用onCompleted()
    //singleSubscriber.onSuccess("Hello Android !");
    //只调用onError();
    singleSubscriber.onError(new NullPointerException("mock Exception !"));
    }
    });

    Observer<String> observer = new Observer<String>() {br/>@Override
    public void onCompleted() {
    Log.i(TAG, "onCompleted ");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }
    
    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }

    };
    single.subscribe(observer);
    6.观察者变种
    Observer观察者对象,上面我们用Subscriber对象代替。因为该对象本身就是继承了Observer。
    该对象实现了onNext()&onCompleted()&onError()事件,我们如果对哪个事件比较关心,只需要实现对应的方法即可,代码如下:
    //创建观察者
    Subscriber<String> subscriber = new Subscriber<String>() {
    br/>@Override
    public void onCompleted() {
    Log.i(TAG, "onCompleted ");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }
    
    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }

    };

    //订阅
    observable.subscribe(subscriber);
    上面的代码中,如果你只关心onNext()事件,但却不得不实现onCompleted()&onError()事件.这样的代码就显得很臃肿。鉴于这种需求,RxJava框架在订阅方面做了特定的调整,代码如下:
    //为指定的onNext事件创建独立的接口
    Action1<String> onNextAction = new Action1<String>() {
    br/>@Override
    public void call(String s) {
    Log.i(TAG, "call: "+s);
    }
    };

    //订阅
    observable.subscribe(onNextAction);

不知道大家注意到没有,subscribe()订阅的不再是观察者,而是特定的onNext接口对象。类似的函数如下,我们可以根据需要实现对应的订阅:

public Subscription subscribe(final Observer observer)
public Subscription subscribe(final Action1 onNext)
public Subscription subscribe(final Action1 onNext, Action1 onError)
public Subscription subscribe(final Action1 onNext, Action1 onError, Action0 onCompleted)

这里还有一个forEach函数有类似的功能:

public void forEach(final Action1 onNext)
public void forEach(final Action1 onNext, Action1 onError)
public void forEach(final Action1 onNext, Action1 onError, Action0 onComplete)

##7.Subject变种

上面2节中既介绍了被观察者变种,又介绍了观察者变种,这里再介绍一种雌雄同体的对象(既作为被观察者使用,也可以作为观察者)。

针对不同的场景一共有四种类型的Subject。他们并不是在所有的实现中全部都存在。

###AsyncSubject

一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。它会把这最后一个值发射给任何后续的观察者。

以下贴出代码:
//创建被观察者final AsyncSubject<String> subject = AsyncSubject.create();//创建观察者
Subscriber<String> subscriber = new Subscriber<String>() {
br/>@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}

@Override
public void onError(Throwable e) {
    Log.i(TAG, "onError");
}

@Override
public void onNext(String s) {
    Log.i(TAG, "s:" + s);

}

};//订阅事件
subject.subscribe(subscriber);//被观察者发出事件 如果调用onCompleted(),onNext()则会打印最后一个事件;如果没有,onNext()则不打印任何事件。
subject.onNext("Hello Android ");
subject.onNext("Hello Java ");
subject.onCompleted();
输出:
s:Hello Java onCompleted
然而,如果原始的Observable因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知。

上面的观察者被观察者代码相同,现在发出一系列信号,并在最后发出异常 代码如下:

subject.onNext("Hello Android ");
subject.onNext("Hello Java ");//因为发送了异常 所以onNext()无法被打印
subject.onError(null);
###BehaviorSubject

当观察者订阅BehaviorSubject时,他会将订阅前最后一次发送的事件和订阅后的所有发送事件都打印出来,如果订阅前无发送事件,则会默认接收构造器create(T)里面的对象和订阅后的所有事件,代码如下:
BehaviorSubject subject=BehaviorSubject.create("NROMAL");

Subscriber subscriber = new Subscriber() {br/>@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}

@Override
public void onError(Throwable e) {
    Log.i(TAG, "onError");
}

@Override
public void onNext(Object o) {
    Log.i(TAG, "onNext: " + o);
}

};
//subject.onNext("Hello Android !");//subject.onNext("Hello Java !");//subject.onNext("Hello C !");//这里开始订阅 如果上面的3个注释没去掉,则Hello C的事件和订阅后面的事件生效//如果上面的三个注释去掉 则打印构造器NORMAL事件生效后和订阅后面的事件生效
subject.subscribe(subscriber);

subject.onNext("Hello CPP !");
subject.onNext("Hello IOS !");
PublishSubject

PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。

需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据,因此这里有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。

代码如下:
PublishSubject subject= PublishSubject.create();

Action1<String> onNextAction1 = new Action1<String>(){

@Override
public void call(String s) {
    Log.i(TAG, "onNextAction1 call: "+s);
}

};

Action1<String> onNextAction2 = new Action1<String>(){

@Override
public void call(String s) {
    Log.i(TAG, "onNextAction2 call: "+s);
}

};

subject.onNext("Hello Android !");
subject.subscribe(onNextAction1);
subject.onNext("Hello Java !");
subject.subscribe(onNextAction2);
subject.onNext("Hello IOS !");

输出如下:

onNextAction1 call: Hello Java !
onNextAction1 call: Hello IOS !
onNextAction2 call: Hello IOS !
ReplaySubject

ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。

代码如下:

ReplaySubject subject= ReplaySubject.create();

Action1<String> onNextAction1 = new Action1<String>(){

@Override
public void call(String s) {
    Log.i(TAG, "onNextAction1 call: "+s);
}

};

Action1<String> onNextAction2 = new Action1<String>(){

@Override
public void call(String s) {
    Log.i(TAG, "onNextAction2 call: "+s);
}

};

subject.onNext("Hello Android !");
subject.subscribe(onNextAction1);
subject.onNext("Hello Java !");
subject.subscribe(onNextAction2);
subject.onNext("Hello IOS !");

输出如下:

onNextAction1 call: Hello Android !
onNextAction1 call: Hello Java !
onNextAction2 call: Hello Android !
onNextAction2 call: Hello Java !
onNextAction1 call: Hello IOS !
onNextAction2 call: Hello IOS !
###Subject总结

AsyncSubject无论何时订阅 只会接收最后一次onNext()事件,如果最后出现异常,则不会打印任何onNext()
BehaviorSubject会从订阅前最后一次oNext()开始打印直至结束。如果订阅前无调用onNext(),则调用默认creat(T)传入的对象。如果异常后才调用,则不打印onNext()
PublishSubject只会打印订阅后的任何事件。
ReplaySubject无论订阅在何时都会调用发送的事件。

以上用代码演示了 RxJava 一些基础功能是如何实现的,希望能给大家带来不一样的启发。但这只是一个小小的分享,离真正能运用于工程的 Rx 框架还差太远。这也让我们明白到,一个健壮的框架,需要考虑太多东西,比如代码的可拓展性和可读性,性能优化,可测试性,兼容性,极端情况等等。但有时要想深入理解一个复杂框架的实现原理,就需要剥离这些细节代码,多关注主干的调用逻辑,化繁为简。

手把手带你入门神秘的RxJava

原文地址:https://blog.51cto.com/13007966/2456664

时间: 2024-10-10 16:22:10

手把手带你入门神秘的RxJava的相关文章

微服务开发实战(spring-cloud/spring-cloud-alibaba/dubbo),一个案例,手把手带你入门

平日里,都是看别人的文章,虽开公众号写了不少,但像样的不多.年末了,年终总结也没来得及写,为了输出点像样的东西,立刻就着手这个系列.一个键一个字母的敲,边敲边写,文章还在持续更新中,直至完整.相信通过这个系列的系统练习,能有一个大跨步的提升. 专栏简介(是什么?) 结合SpringCloud.SpringCloudAlibaba.Dubbo等开源套件,基于某商场停车业务需求,进行微服务开发实战,力争通过一个案例的实操,掌握微服务架构中常用的技能点,轻松入门. 为什么要写这个专栏(为什么?) 微服

Django:手把手带你入门

一.开发环境: Python:2.7:PyCharm 4 二.Django的安装: 打开Pycharm --> 新建项目--> 看图操作--> 自动安装Django环境 三.创建第一个网站: 1.①打开views.py ②导入 ③编写一个index函数(先写成硬编码的形式) 2.①打开urls.py ②导入上图编写的函数 ③在urlpatterns数组内新增一项 3.在右上角找到启动按钮,并单击 4.找到并点击下图网址,打开网页 打开后 现在我们输入新网址:http://127.0.0.

别怕,手把手带你撕、拉、扯下SpringMVC的外衣

提到框架,就不得不提一下看源码,我们平时总是想求大神带我们飞,然而看源码就是一个向大神学习的最直接的一种方式,然而我们每次鼓起勇气看源码前是这样的但是一点开源码,顿时代码如洪流涌入,你的内心可能是这样的所以我在之前别怕看源码,一张图搞定Mybatis的Mapper原理的时候也提到过, Mybatis的源码相对其他框架而言比较简单, 比较适合刚开始克服恐惧心理看源码实战, 由于Struts2前不久又传出安全性问题, 所以Java开发中, 表现层框架基本都是SpringMVC,那么我们就来撕.拉.扯

前端深入之css篇丨初探【transform】,手把手带你实现1024程序员节动画

原文:前端深入之css篇丨初探[transform],手把手带你实现1024程序员节动画 写在前面 马上就2020年了,不知道小伙伴们今年学习了css3动画了吗? 说起来css动画是一个很尬的事,一方面因为公司用css动画比较少,另一方面大部分开发者习惯了用JavaScript来做动画,所以就导致了许多程序员比较排斥来学习css动画(至少我是),但是一个不懂css动画的前端工程师不能称之为掌握css3,其实当你真正学习css动画之后,你会被它的魅力所吸引的,它可以减少代码量.提高性能. 值此10

手把手带你自制Linux系统之二 简易Linux制作

手把手带你自制Linux系统之二 简易Linux制作 本文利用CentOS5.5自带内核制作一个Mini Linux. 打开准备工作中创建的CentOS,为另一个虚拟机MiniLinux添加一个最小Linux所需要的文件. 1. 创建分区 为准备好的磁盘创建两个主分区,大小分别为20M和512M. 使用fdisk命令创建分区详细过程: fdisk /dev/hda 创建第一个20M分区依次输入: n --> p --> 1 --> <Enter> --> +20M 这几

手把手带你搭建开发环境

俗话说得好,工欲善其事,必先利其器,开着记事本就想去开发 Android 程序显然不是明智之举,选择一个好的 IDE可以极大幅度地提高你的开发效率,因此本节我就将手把手带着你把开发环境搭建起来. 需要java环境jdk    jdk环境搭建教程 需要的软件 1.Android SDK Android SDK 是谷歌提供的 Android 开发工具包,在开发 Android 程序时,我们需要通过引入该工具包,来使用 Android 相关的 API 2. Eclipse     相信所有 Java

可能是史上最强大的js图表库——ECharts带你入门

PS:之前的那篇博客Highcharts——让你的网页上图表画的飞起 ,评论中,花儿笑弯了腰 和 StanZhai 两位仁兄让我试试 ECharts ,去主页看到<Why ECharts ?>简单了解了一下之后,ECharts很快吸引了我.里面引自马云的那句话“互联网还没有搞清楚的时候,移动互联网来了,移动互联没有搞清楚的时候,大数据来了”我是第一次听到,实在震撼了我啊(孤陋寡闻...). 本来没打算写什么的.可是作为一个后端开发者,看了半天文档也迷迷糊糊,查了一堆资料也没搞懂Echarts那

手把手带你做一个超炫酷loading成功动画view Android自定义view

写在前面: 本篇可能是手把手自定义view系列最后一篇了,实际上我也是一周前才开始真正接触自定义view,通过这一周的练习,基本上已经熟练自定义view,能够应对一般的view需要,那么就以本篇来结尾告一段落,搞完毕设的开题报告后去学习新的内容. 有人对我说类似的效果网上已经有了呀,直接拿来就可以用,为什么还要写.我个人的观点是:第三方控件多数不能完全满足UI的要求,如果需要修改,那么必须理解他的实现,所以很有必要自己去写一款出来,成为程序的创造者,而不单单是使用者.所以,写一写已经实现的效果,

我在爱板网写的-- 【望月追忆】带你入门STM32F0系列文章

[望月追忆]带你入门STM32F0之前传:STM32F0资料 [望月追忆]带你入门STM32F0之环境搭建 [望月追忆]带你入门STM32F0之一:STM32F0概述 [望月追忆]带你入门STM32F0之二:SysTick时钟介绍 [望月追忆]带你入门STM32F0之二:点亮你的小灯 [望月追忆]带你入门STM32F0之三:按键----查询方式 [望月追忆]带你入门STM32F0之四:按键----外部中断 [望月追忆]带你入门STM32F0之四:串口 [望月追忆]带你入门STM32F0之五:小项