史上最浅显易懂的RxJava入门教程

RxJava是一个神奇的框架,用法很简单,但内部实现有点复杂,代码逻辑有点绕。我读源码时,确实有点似懂非懂的感觉。网上关于RxJava源码分析的文章,源码贴了一大堆,代码逻辑绕来绕去的,让人看得云里雾里的。既然用拆轮子的方式来分析源码比较难啃,不如换种方式,以造轮子的方式,将源码中与性能、兼容性、扩展性有关的代码剔除,留下核心代码带大家揭秘 RxJava 的实现原理。

什么是RxJava

将上面的例子进行代码抽象,步骤如下:

    1. 提供观察者(因为你是关心杂志内容的人 所以你是观察该事件的人)
    1. 提供被观察者(只要有新的杂志出来 就需要通知关心的人 所以报社是被观察的对象)
    1. 订阅(也就是 观察者&被观察者之间要相互关联 以便被观察的对象一变化 就会马上通知观察该事件的对象)


上面示例的演示代码如下:
//1.创建被观察者
Observable<String> observable =
Observable.create(new Observable.OnSubscribe<String>() {
br/>@Override
public void call(Subscriber<? super String> subscriber) {
//4.开始发送事件
//事件有3个类型 分别是onNext() onCompleted() onError()
//onCompleted() onError() 一般都是用来通知观察者 事件发送完毕了,两者只取其一。
subscriber.onNext("Hello Android !");
subscriber.onNext("Hello Java !");
subscriber.onNext("Hello C !");
subscriber.onCompleted();
}
});

//2.创建观察者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "onCompleted ");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError: "+e.getLocalizedMessage());
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: "+s);
    }
};

//3.订阅
observable.subscribe(subscriber);

输出如下:
com.m520it.rxjava I/IT520: onNext: Hello Android !
com.m520it.rxjava I/IT520: onNext: Hello Java !
com.m520it.rxjava I/IT520: onNext: Hello C !
com.m520it.rxjava I/IT520: onCompleted
代码运行的原理

不知道大家注意到没有,subscribe()订阅的不再是观察者,而是特定的onNext接口对象。类似的函数如下,我们可以根据需要实现对应的订阅:

public Subscription subscribe(final Observer observer)
public Subscription subscribe(final Action1 onNext)
public Subscription subscribe(final Action1 onNext, Action1 onError)
public Subscription subscribe(final Action1 onNext, Action1 onError, Action0 onCompleted)

这里还有一个forEach函数有类似的功能:

public void forEach(final Action1 onNext)
public void forEach(final Action1 onNext, Action1 onError)
public void forEach(final Action1 onNext, Action1 onError, Action0 onComplete)

##7.Subject变种

上面2节中既介绍了被观察者变种,又介绍了观察者变种,这里再介绍一种雌雄同体的对象(既作为被观察者使用,也可以作为观察者)。

针对不同的场景一共有四种类型的Subject。他们并不是在所有的实现中全部都存在。

###AsyncSubject

一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。它会把这最后一个值发射给任何后续的观察者。

以下贴出代码:
//创建被观察者final AsyncSubject<String> subject = AsyncSubject.create();//创建观察者
Subscriber<String> subscriber = new Subscriber<String>() {
br/>@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}

@Override
public void onError(Throwable e) {
    Log.i(TAG, "onError");
}

@Override
public void onNext(String s) {
    Log.i(TAG, "s:" + s);

}

};//订阅事件
subject.subscribe(subscriber);//被观察者发出事件 如果调用onCompleted(),onNext()则会打印最后一个事件;如果没有,onNext()则不打印任何事件。
subject.onNext("Hello Android ");
subject.onNext("Hello Java ");
subject.onCompleted();
输出:
s:Hello Java onCompleted
然而,如果原始的Observable因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知。

上面的观察者被观察者代码相同,现在发出一系列信号,并在最后发出异常 代码如下:

subject.onNext("Hello Android ");
subject.onNext("Hello Java ");//因为发送了异常 所以onNext()无法被打印
subject.onError(null);
###BehaviorSubject

当观察者订阅BehaviorSubject时,他会将订阅前最后一次发送的事件和订阅后的所有发送事件都打印出来,如果订阅前无发送事件,则会默认接收构造器create(T)里面的对象和订阅后的所有事件,代码如下:
BehaviorSubject subject=BehaviorSubject.create("NROMAL");

Subscriber subscriber = new Subscriber() {br/>@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}

@Override
public void onError(Throwable e) {
    Log.i(TAG, "onError");
}

@Override
public void onNext(Object o) {
    Log.i(TAG, "onNext: " + o);
}

};
//subject.onNext("Hello Android !");//subject.onNext("Hello Java !");//subject.onNext("Hello C !");//这里开始订阅 如果上面的3个注释没去掉,则Hello C的事件和订阅后面的事件生效//如果上面的三个注释去掉 则打印构造器NORMAL事件生效后和订阅后面的事件生效
subject.subscribe(subscriber);

subject.onNext("Hello CPP !");
subject.onNext("Hello IOS !");
PublishSubject

PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。

需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据,因此这里有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。

代码如下:
PublishSubject subject= PublishSubject.create();

Action1<String> onNextAction1 = new Action1<String>(){

@Override
public void call(String s) {
    Log.i(TAG, "onNextAction1 call: "+s);
}

};

Action1<String> onNextAction2 = new Action1<String>(){

@Override
public void call(String s) {
    Log.i(TAG, "onNextAction2 call: "+s);
}

};

subject.onNext("Hello Android !");
subject.subscribe(onNextAction1);
subject.onNext("Hello Java !");
subject.subscribe(onNextAction2);
subject.onNext("Hello IOS !");

输出如下:

onNextAction1 call: Hello Java !
onNextAction1 call: Hello IOS !
onNextAction2 call: Hello IOS !
ReplaySubject

ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。

代码如下:

ReplaySubject subject= ReplaySubject.create();

Action1<String> onNextAction1 = new Action1<String>(){

@Override
public void call(String s) {
    Log.i(TAG, "onNextAction1 call: "+s);
}

};

Action1<String> onNextAction2 = new Action1<String>(){

@Override
public void call(String s) {
    Log.i(TAG, "onNextAction2 call: "+s);
}

};

subject.onNext("Hello Android !");
subject.subscribe(onNextAction1);
subject.onNext("Hello Java !");
subject.subscribe(onNextAction2);
subject.onNext("Hello IOS !");

输出如下:

onNextAction1 call: Hello Android !
onNextAction1 call: Hello Java !
onNextAction2 call: Hello Android !
onNextAction2 call: Hello Java !
onNextAction1 call: Hello IOS !
onNextAction2 call: Hello IOS !
###Subject总结

AsyncSubject无论何时订阅 只会接收最后一次onNext()事件,如果最后出现异常,则不会打印任何onNext()
BehaviorSubject会从订阅前最后一次oNext()开始打印直至结束。如果订阅前无调用onNext(),则调用默认creat(T)传入的对象。如果异常后才调用,则不打印onNext()
PublishSubject只会打印订阅后的任何事件。
ReplaySubject无论订阅在何时都会调用发送的事件。

史上最浅显易懂的RxJava入门教程

原文地址:https://blog.51cto.com/13007966/2456668

时间: 2024-10-18 06:57:44

史上最浅显易懂的RxJava入门教程的相关文章

史上最全的OpenCV入门教程!这篇够你学习半个月了!万字长文入门

一.Python OpenCV 入门 欢迎阅读系列教程,内容涵盖 OpenCV,它是一个图像和视频处理库,包含 C ++,C,Python 和 Java 的绑定. OpenCV 用于各种图像和视频分析,如面部识别和检测,车牌阅读,照片编辑,高级机器人视觉,光学字符识别等等. 你将需要两个主要的库,第三个可选:python-OpenCV,Numpy 和 Matplotlib. Windows 用户: python-OpenCV:有其他的方法,但这是最简单的. 下载相应的 wheel(.whl)文件

史上最全的Websocket入门教程

websocket简介websocket是什么? 答: 它是一种网络通信协议,是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议.为什么需要websocket? 疑问? 我们已经有了 HTTP 协议,为什么还需要另一个协议?它能带来什么好处? 答: 因为 HTTP 协议有一个缺陷:通信只能由客户端发起     我们都知道轮询的效率低,非常浪费资源(因为必须不停连接,或者 HTTP 连接始终打开), 因此websocket应运而生. 简介 WebSocket 使得客户端和服务

【转】史上最浅显易懂的Git教程!

之前一直在找git的学习教程,网上搜到很多,但是大多数写的都非常简单或者混乱,你知道技术男的思维就是以为他抛一个专业术语出来,以为你都懂……或者简单写两句,插个图,他觉得他懂了,你也能懂,事实上初学者看得非常痛苦.现在终于下面这个教程,简直写得“包教包会”的程度,按照作者的体系一课时一课时地学下来,掌握基本的git操作没问题,不要以为光看看就会用了,一定要自己敲!感谢作者廖雪峰同志. 传送门:史上最浅显易懂的Git教程!

.Net魔法堂:史上最全的ActiveX开发教程——ActiveX与JS间交互篇

一.前言 经过上几篇的学习,现在我们已经掌握了ActiveX的整个开发过程,但要发挥ActiveX的真正威力,必须依靠JS.下面一起来学习吧! 二.JS调用ActiveX方法 只需在UserControl子类中(即自定义的ActiveX控件中),编写公共方法即可. C# [Guid("0203DABD-51B8-4E8E-A1EB-156950EE1668")] public partial class Uploader : UserControl, IObjectSafety { p

史上最简单的 Spring MVC 教程(十)

1 前言 在史上最简单的 Spring MVC 教程(九)中,咱们已经实现了图片的上传及显示功能,那么接下来,在本篇博文中,咱们更进一步,以实体类(Person)中的字段"name"和控制器(PersonController)中的方法 updatePersonList 为例,实现参数的校验功能. 2 注解示例 - 参数校验 老规矩,首先给出项目结构图: 在给出代码之前, 咱们先明确参数校验的步骤: 导入参数校验的 jar 包: 在实体上配置需要校验的属性: 在控制器的方法中用注解 @V

.Net魔法堂:史上最全的ActiveX开发教程——部署篇

一.前言 接<.Net魔法堂:史上最全的ActiveX开发教程——发布篇>,后我们继续来部署吧! 二. 挽起衣袖来部署   ActiveX的部署其实就是客户端安装ActiveX组件,对未签名和已签名的ActiveX,分别有对应的部署方式. 1. 部署未签名的ActiveX 未签名的ActiveX控件不受浏览器端信任,默认是不被允许安装的 1. 将网站加入 **可信站点** 2. 在“可信站点”和“Internet”下的 **自定义级别** 中确认“对未标记为可安全执行脚本的ActiveX控件初

史上最全面的SignalR系列教程-目录汇总

1.引言 最遗憾的不是把理想丢在路上,而是理想从未上路. 每一个将想法变成现实的人,都值得称赞和学习. 致正在奔跑的您! 2.SignalR介绍 SignalR实现服务器与客户端的实时通信 ,她是一个面向 ASP.NET 开发人员的库,可简化将实时 web 功能添加到应用程序的过程. 实时 web 功能是让服务器代码将内容推送到连接的客户端立即可用,而不是让服务器等待客户端请求新数据的能力. 3.百度百科给它的定义 实现实时通信. 什么是实时通信的Web呢?就是让客户端(Web页面)和服务器端可

史上最无私的MTK入门晋级资料

史上最全 最无私的MTK入门晋级资料 hello 大家好,我十年做机经验,这些天整理乱七八糟的文档一大堆,文档这东西,对有用的人来说是宝,没用的人来说是垃圾,对菜鸟来说是宝,对高手来说是垃圾,不求赚钱,拿来共享共享,希望大家喜欢,希望做机的健健康康,越做越好,希望准备入行的"机油"快速上手.废话不说,直接上档! 下载地址:链接 MTK是一个多线程的开发环境.什么叫多线程?你完全不必要去网上搜索相关的信息.你只需要知道MTK程序运行起来后,可能会有很多个程序在同时运行,比如你在打电话的时

史上最全的Python电子书教程资源下载(转)

网上搜集的,点击即可下载,希望提供给有需要的人^_^   O'Reilly.Python.And.XML.pdf 2.02 MB   OReilly - Programming Python 2nd.pdf 6.98 MB   Orielly.Learning.Python.pdf 3.17 MB   Dive into Python-中文版(python研究,很好的书).chm 573.92 KB   Foundations of Python Network Programming, Sec