简介
作为有经验的JavaScript开发者,我们会在代码中采用一定程度的异步代码。我们不断地处理用户的输入请求,也从远程获取数据,或者同时运行耗时的计算任务,所有这些都不能让浏览器崩溃。可以说,这些都不是琐碎的任务,它是确切的需求,我们学着去避开同步计算,让模型的时间和延时成为问题的关键。对于简单的应用程序,直接使用JavaScript的主事件系统,甚至使用jQuery库帮助也很常见。然而,还没有适当的模式来扩展的简单代码,解决这些异步问题,满足更丰富的应用特性,满足现代web用户的需求,这些仍然是困难的。我们越来越发现我们的应用代码正变得复杂,难以维护,难以测试。问题的本质是异步计算本身就是难以管理的,而RxJS可以解决这个问题。
RxJS解决的问题
任何应用最重要的一个目标之一就是在所有时刻保持响应。这意味着对于一个应用来说当它在处理用户输入或者凭借AJAX从服务器接受一些额外的数据时停止是一件不可接受的事情。通常来说,主要的问题是IO(输入/输出)运行(从磁盘或者网络读取)比CPU执行指令慢太多。这同时实用于客户端和服务器端。让我门来看看客户端。在JavaScript中,解决问题的方案始终是充分利用浏览器的多重连接并且用回调函数来大量产生一个独立的用来照顾一些长期运行的处理。这是一种反转控制控制形式,因为程序的控制不是被你操纵的(因为你不能预知某一处理什么时候会完成),而是在运行时间的责任下交还给你的。虽然对于小应用程序非常有用,但回调的使用使内容丰富的大型应用变得凌乱,它需要同时处理的数据来自用户以及远程HTTP的调用。我们都有过这样的经历:一旦你需要多块数据时你就陷入了流行的”末日金字塔“或者回调地狱。
01 |
makeHttpCall( ‘/items‘ , |
02 |
items => { |
03 |
for (itemId of items) { |
04 |
makeHttpCall(`/items/${itemId}/info`, |
05 |
itemInfo => { |
06 |
makeHttpCall(`/items/${itemInfo.pic}`, |
07 |
img => { |
08 |
showImg(img); |
09 |
}); |
10 |
}); |
11 |
} |
12 |
}); |
13 |
14 |
beginUiRendering(); |
这段代码有很多的问题。 其中之一就是风格。当你在这些嵌套的回调函数中添加越来越多的逻辑,这段代码就会变得很复杂很难理解。因为循环还产生了一个更加细微的问题。for循环是同步的控制流语句,这并不能很好的配合异步调用,因为会有延迟,这可能会产生很奇怪的bug。
这个问题一直都是JavaScript开发者的大麻烦,所以JavaScript在SE6中引入了Promises。 Promises帮助开发者解决这些类似的问题,它提供了一个非常流畅的接口来捕获时间并且提供一个回调方法then()。上面的代码就变成了:
1 |
makeHttpCall( ‘/items‘ ) |
2 |
.then(itemId => makeHttpCall(`/items/${itemId}/info`)) |
3 |
.then(itemInfo => makeHttpCall(`/items/${itemInfo}.pic}`)) |
4 |
.then(showImg); |
这毫无疑问是一个进步。理解这段代码的难度显著下降。然而,尽管Promises在处理这种单值(或单个错误)时非常高效,它有也一些局限性。Promimses在处理用户连续输入的数据流时效率怎么样呢? 这时Promises处理起来也并不高效,因为它没有事件的删除、分配、重试等等的语法定义。接下来开始讲解RxJS。
RxJS 初探
RxJS是一个解决异步问题的JS开发库.它起源于 Reactive Extensions 项目,它带来了观察者模式和函数式编程的相结合的最佳实践。 观察者模式是一个被实践证明的模式,基于生产者(事件的创建者)和消费者(事件的监听者)的逻辑分离关系.
况且函数式编程方式的引入,如说明性编程,不可变数据结构,链式方法调用会使你极大的简化代码量。(和回调代码方式说再见吧)。
若想仔细了解函数式编程,请访问这里(the Functional Programming in JavaScript RefCard )。
如果你熟悉了函数式编程,请把RxJS理解为异步化的Underscore.js。
RxJS 引入了一个重要的数据类型——流(stream)。
理解流( Streams)
流(Streams)无非是随时间流逝的一系列事件。流(Streams)可以用来处理任何类型的事件,如:鼠标点击,键盘按下,网络位数据,等等。你可以把流作为变量,它有能力从数据角度对发生的改变做出反应。
变量和流都是动态的,但表现有些不同;为了理解它,让我们看一个简单的例子。考虑以下简单的算术运算:
1 |
var a = 2; |
2 |
var b = 4; |
3 |
var c = a + b; |
4 |
console.log(c); //-> 6 |
5 |
6 |
a = 10; // reassign a |
7 |
console.log(c); //-> still 6 |
尽管变量a变为了10,但这是一种通过设计方式保证所依赖的变量不变。这就是最大的不同。事件引发的改变总是从事件源(生产者)
向下传递到所有事件监听方(消费者)。假如说把变量看成流,下面就会这样:
1 |
var A$ = 2; |
2 |
var B$ = 4; |
3 |
var C$ = A$ + B$; |
4 |
console.log(C$); //-> 6 |
5 |
6 |
A$ = 10; |
7 |
console.log(C$); //-> 16 |
这样你看到了,流的方式重新定义的变量值的动态行为。(作为习惯,我喜欢使用$符号在流变量命名里)
换句话说,C$是把两个流变量A$和B$进行合并操作。当一个新值被推进了A$,C$立刻响应式的变更为16。当前这只是一个
牵强的例子,距离真正的语法还很远。这个例子解释了变量在事件流中的变化过程。
现在让我们开始学习RxJS。
可观察数据类型
或许RxJS库最重要的部分是可观察数据类型的定义。这种类型被用于包装一个数据片段(按钮事件,键盘事件,鼠标事件,数字,字符串或者队列),这样它就有了流式数据类型的优点。最简单的观察对象是这种单变量形式,例如:
1 |
var streamA$ = Rx.Observable.of(2); |
我们重新使用上面的例子,这次是真正的RxJS语法。这回使用了新的API,我要详细的讲一下:
1 |
const streamA$ = Rx.Observable.of(2); |
2 |
const streamB$ = Rx.Observable.of(4); |
3 |
const streamC$ = Rx.Observable.concat(streamA$, streamB$) |
4 |
.reduce((x, y) => x + y); |
5 |
6 |
streamC$.subscribe(console.log); //prints 6 |
运行这个例子输出值为6。不像之前的伪代码,在变量被定义后实际上不能对流对象重新赋值。如何必须那样做的话就要重新创建一个新的流变量,因为流变量是不可变的数据类型。既然是不可变的,通常我们可以安全的使用ES6规范里的不可变关键字const使代码更清晰明确。
为了给streamA$推一系列新值,你必须改变streamA$定义的方式:
1 |
const streamA$ = Rx.Observable.of(2, 10) |
2 |
... |
3 |
streamC$.subscribe(console.log); //prints 16 |
现在订阅streamC$将会得到值16。就像我之前提到的,流只是一个在时间轴上的事件传输序列。以下是可视化图例。
创建可观察序列对象
很多不同的方法都可以创建可观察序列对象。这里是一些普通使用的例子:
方法 | 说明 |
---|---|
of(arg) | 把参数转换成可观察序列对象 |
from(iterable) | 把可迭代的队列参数转换成可观察序列对象 |
fromPromise(promise) | 把promise对象参数转换成可观察序列对象 |
fromEvent(element, eventName) | 通过增加一个事件监听器用于监听匹配的Dom元素,jQuery元素,Zepto元素,Angular元素,Ember.js元素或者EventEmitter等,
来创建可观察序列对象 |
流式编程的另一个不同点是触发机制。可观察序列类型对象是后触发的(lazy data types),就是说当有订阅者订阅的时候什么也不执行(这种方式不会有事件发出来)。它的订阅机制是被观察者(Observer)触发的。
观察者(Observer)
观察者代表模型的消费者一端。它负责被可观察序列对象发送过来的值进行处理和反馈。观察者的API简单,基于迭代者模式它定义了
next方法。当事件执行结果推向到可观察对象的时候,这个方法就会被调用。之前streamC$.subscribe(console.log) 这种简写的方式,其实就是背后创建了观察者对象(Observer)。创建的过程如下:
01 |
const observer = Rx.Observer.create( |
02 |
function next(val) { |
03 |
console.log(val); |
04 |
}, |
05 |
06 |
function error(err) { |
07 |
; // 事件异常情况执行 |
08 |
}, |
09 |
10 |
function complete() { |
11 |
; // |
12 |
} |
13 |
14 |
); |
观察者也定义了处理异常的API,意味着会发出执行过程中的异常信号通知。所有的观察者对象里的方法都是可选择的,其实你只需要
订阅一下就可以(调用subscribe方法)。最普遍的方式是需要提供一下对应的业务方法映射到next方法里。这个方法里需要一个具体的业务逻辑,例如写文件,屏幕打印日志,追加到DOM里,不管怎么说需要你来完成的。
订阅
只要订阅一个Observable对象就会返回一个订阅对象,另外当完成后你可以使用unsubscribe方法释放掉对象流。这种释放机制真的很优美,它解决了原生JS在事件处理完成后正确释放资源的缺点。原生JS在这一块之前总是会出问题。
为了说明这一点,我创建一个Observable对象来监听所有的点击事件:
很明显这是一个无限触发的点击事件流(事件完成方法永远不会调用)。如果我要停止监听事件,我只需要简单的调用unsubscribe方法。这个方法也会清理和释放事件的句柄资源或者临时对象资源。
现在你知道了如何创建和销毁事件流,那么再看看怎么使用它来解决具体的问题吧。在我的模型里采用数字来说明这些API的使用,当然你可以把他们应用到任何的业务逻辑。
流式序列化
RxJS框架的核心思想是提供一个统一的数据处理的编程模型,不考虑数据类型,以及是否同步异步(远程HTTP调用)等因素。RxJS使用简单又熟悉的API。通过函数式方法扩展了原生JavaScript的数组集合(被认为是扩展的组数),这些方法是map,filter和reduce等。
方法 | 说明 |
---|---|
map(fn) | 重新构建observable序列对象为新的形式 |
filter(predicate) | 通过给定的predicate函数,过滤掉匹配的observable序列对象 |
reduce(accumulator, [seed]) | 通过收集器函数accumulator返回一个单一结果。seed是收集器的初始化数据。 |
使用从数字1到5的数组,我将要过滤掉奇数,计算它们的平方和总和。按照传统的方式至少会使用到循环和条件判断。使用函数式方法就会这样:
其实这是很小的改变,就是通过Observable对象实例方法来进行数据处理。就像数组一样,被处理的对象来自于前一个方法的返回结果。目的在当前输入对象的范畴内执行特定的操作方法(map,filter,reduce),处理所需的业务逻辑。
下面的图展示了这个场景下的执行过程:
数组是一种可预测的流,因为执行期间所用数据都放在内存中。无论数据是以怎样的形式,假如这些数据是通过HTTP请求方式获得后被执行(或者封装成Promise对象),同样的代码仍然有效:
当然,我可以创建独立的无副作用的函数,并注入到可观测的序列中。这可以让你的代码看起来更直观:
这是RxJS的美丽之处:单个编程模型可以支持所有的情况。此外,可观测能让你序列化你的串行操作,使它们在一起运作良好,并让抽象化延迟远离你的代码。还需要注意的是,这种写代码的方式消除了循环和条件语句的复杂性,这是一种更高层的函数。
处理时间
了解RxJS 如何有效处理时间以及通过基于时间的延迟可以做运算符。这里有一个简短的列表用于随着时间推移创建最常见的状态:
名称 | 描述 |
---|---|
Rx.Observable.interval(period) | 在每一个时期返回一个可观测序列产生值 |
Rx.Observable.timer(dueTime) | 在dueTime运行后产生一个值,然后在每一个时期返回一个可观测序列 |
这些都是很好的模拟时间事件:
每半秒钟间隔(500)将排放值。因为这是一个无限流,我只有前5个,将可观测到一个有限的流,发送完成一个信号。
处理用户输入
你也可以使用Observable对象与DOM事件进行交互。使用Rx.Observable.fromEvent,我能监听到任何DOM事件信息。这有一个小例子:
在这个事例中,我可以处理点击事件,并能够在观察者(Observer)里执行任何操作。这里的map方法是转换接收到的点击事件,从点击事件里提取底层元素的href属性。
处理异步调用
处理用户输入不只是异步调用的唯一方式。RxJS框架也优雅地整合了ES6 Promise API,这样就可以获取远程数据。假如要从Github上获取用户并且提取用户名。RxJS框架的强大,让我们处理这些逻辑只用5行代码就够了。
这段代码引入了一对新方法,这里要解释一下。首先我把Github的用户列表REST API包装成Observable对象。在flatMap方法里通过makeHttpCall函数来对URL地址进行请求,去获得一个Promise化的AJAX返回对象。这时RxJS要等待返回结果以及Promise化处理。一旦完成,Github的响应数据就会在函数里被包装成数组存放进Observable对象里。接下来就是调用Observable对象的方法来处理数据。最后我构造了一个简单的函数来从数组集合里提取用户名属性。
Map与FlatMap对比
正如前面所说,Observable对象的map方法是对自身对象的数据值进行映射处理,返回是一个包含映射结果的新Observale对象。调用的方法能返回任意类型对象,甚至另外的Observable类型对象。上面的例子是我把带有URL信息的lambda表达式作为参数输入,返回结果是把promise类型对象又包装成Observalbe对象输出:
映射方法把Observalbe对象里数据结果进行映射产生一个新Observable对象。(在函数式编程里这种方式很普遍)我们要做的就是处理映射关系并把结果存放到一个Observalbe对象里,这有点像给洋葱剥皮。这正是flatMap方法的优点。上面的方法返回结果是Rx.Observable.fromPromise(...),所以需要处理promise对象。从一般经验来看,当你从其他类型对象中构建Observalbe对象时,需要使用flatMap方法。下面看一个例子就好理解了:
这段代码是以0.5秒间隔生成5个连续数字的界面。首先输出数字1到5,然后2到6,3到7等等。如下图所示:
释放可观察序列
前面提到了,RxJS框架的好处之一是对JavaScript事件进行了统一抽象,这样可以更好的释放和销毁事件。这就是基于观察者(Observer)里提供了这个功能去执行自己的清理方法。订阅Observable对象就会获得一个Subscription实例,通过这个实例就能得到观察者(Observer)对象实例。
这段代码创建一个简单的Observable对象。但这次不是对事件数据源或AJAX数据源进行包装,而是创建一个自定义的事件,会不间断的每隔一秒产生数字。因为是自定义事件所以需要通过订阅返回的函数调用来创建自己的资源释放程序。RxJS框架通过Subscription.unsubcribe()的调用执行去释放资源。这个例子中我的清理动作只是时间间隔函数。7秒之后,我释放了Observable对象所以引起输出数字暂停而不是无限制的打印数字。
合并流
你似乎认为这些Observalbe对象很重,实际上他们创建和释放是很容易的。就像变量一样,它们可以被合并在一起。下面看看如何进行合并操作。
合并多个流
merge方法负责执行合并操作,合成多个Observalble序列变成一个Observalble对象。这个操作方法就是把多个事件流合并成一个,在时间的维度里顺序一致。例如在一个HTML组件里有三个按钮,分别执行以下方式进行计数:上,下和清除。
另一种流的合并方式是可以通过concat()和concatAll()方法进行。
一个流与另一个合并
withLatestFrom 这个操作者是非常有用的,因为它允许您将一个observable序列合并到另一个正在使用选择器方法的序列中,除非原始的observable序列产生一个元素。为了展示这一点,假设我想在每一秒钟打印出GitHub的用户列表。直观上,我需要将一个基于时间的流与一个HTTP的流合并。
缓存
正如更早之前提到的,流是一种无状态的数据结构,这就意味着其状态从未滞留其中而是立即从生产者流向消费者。然而有时重要的是它能够临时存储一些数据,并且还可以再次基础上做出决定。想到的一个例子是关于跟踪双击一个元素。你是如何在不存储第一次点击的情况下监测到第二次点击?为此,我们可以使用缓存。有多种情况:
缓冲一段时间
你可以暂时保存一定数量的数据到内部数组,一旦满足计数阈值时,它将作为一个整体得到释放.
1 |
Rx.Observable.range(1, 9) .bufferCount(3) |
2 |
.subscribe(console.log); |
3 |
//-> prints [1, 2, 3] |
4 |
// [4, 5, 6] |
5 |
// [7, 8, 9] |
基于时间的缓冲数据
你也可以为一个预定义的时间缓冲. 我将创建一个简单的函数来模拟每秒钟从一组可用的邮箱地址发送邮件来显示. 如果我每秒钟发一封邮件, 和缓冲, 说, 五秒钟,一旦缓冲时间运行缓冲将发出一组邮件:
错误处理
上面的例子中,我们已经学习了在处理DOM事件或者获取远程数据时,几种关于 streams (流) 的异步操作方式。但是,这些例子里面并没有告诉我们,如果处理 stream 的过程中出现了错误或者异常,我们应该怎样处理。DOM事件handler里面比较简单,因为他们实际上不抛出错误。但是AJAX不在此列。如果你不是处理简单的数组,那么 stream 有很大的不可预测性,你必须考虑到会有错误或者异常发生。
对于错误,如果你传递了自己的 observer(观察器),你需要在内部使用try...catch 并调用 observer.onError(error)。这将允许你捕捉和处理相关错误,并最终 dispose。
当然,你也可以使用 .onErrorResumeNext。
捕捉异常(Catch )
所幸你可以像以前一样使用 catch 语句 (现在是一个操作符 operator) 。为了演示,我将在stream到达值5的时候手动创建一个错误。
当条件满足时,异常将被抛出并一路传播到 stream 事先注册的 Observer。你可能想优雅地捕获异常并显示一个比较友好的消息。
catch操作符允许你处理指定的错误并防止该错误传播到其他已注册的下游observer。这个操作符可以搭配另一个 Observable 将相应的错误传递出去,所以你可以通过这个方式在出错时建议某些默认值。注意:此时关联到那个 observer 的错误处理函数将不会被触发。
现在,如果我们想要标记一个不可恢复的情况,你可以 catch 且 throw 错误。在 catch 块内,这段代码将会导致异常解除。我注意到,其抛出异常时的其副作用将会波及其他期待处理的地方。在真正关键的部分这应该少用。
另一个选择是尝试重试。
重试
在可监控的范围内,你可以重试上一行操作确定的次数,之前的失败将会被解除。
最后,作为JavaScript 开发者,我们整天都跟事件或者异步计算打交道。我们在实现复杂UI或者复杂状态机时,为了保证在失败时能够保持响应,这些代码将变得越来越复杂。RxJS 真正实现了 Reactive
Manifesto 的两个最重要的基本原则,也就是Responsive (响应式) 和 Resilient(自适应性).
此外,RxJS 将这些计算指令提升为语言的第一等级特性,由此组成了JavaScript里面最先进的事件处理系统。这一切组成了一个统一的计算模型来处理这些异步计算指令。该模型包含了可读性高,易于组合的API,并剥离了一些零碎的细节(如延迟和等待时间等)。
更多参考资料
- http://rxmarbles.com/
- http://reactivex.io/
- http://callbackhell.com/
- http://xgrommx.github.io/rx-book/
- https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise
- https://gist.github.com/staltz/868e7e9bc2a7b8c1f754
- https://github.com/Reactive-Extensions/RxJS/tree/master/doc/designguidelines