欢迎指错与讨论 : )
当前RxJS版本:5.0.0-beta.10。更详细的内容尽在RxJS官网http://reactivex.io/rxjs/manual/overview.html。文章比较长,可以通过快捷键 command+f 或者 alt+f 搜索主要内容。
- 前言
RxJS在ng2、react( mobx ) 中扮演一个重要角色,因此笔者想学好RxJS,提前做好准备。本文95%非原创,而是笔者对RxJS官网基础篇的翻译,如有错漏请指出。本文主要内容:简介和六大概念(Observable、Observer、Subscription、Subject、Operators、Scheduler)。
- 安装
import {Observable} from ‘rxjs/Observable‘ 按需打包,减轻bundle.js大小
- 一些概念
- Observable( 可被观察的 ) : 是一个包含来自未来、可以被使用的值( value )或事件( event )的集合
- Observe( 观察者 ):是一个知道如何监听、处理来自Obervable的值的函数集合
- Subscription( 订阅 ):代表着Observable的执行动作,我们可以使用它来停止Obervable继续执行
- Operators( 操作 ):一系列可以操作集合的pure function,像是过滤( filter )、转换( map )等等
- Subject( ):相当于一个事件发射器,是唯一能够向多个Observer广播值( value )的唯一手段
- Schedulers( 调度 ):是一个中央调度员,帮助我们控制并发,协调计算( setTimeout、requestAnimationFrame等 )
- 基础
- 创建流: Observable.create、of、from、fromEvent( target,eventType )、fromPromise、bindCallback( 把callback写法转换为链式写法 )
let exists = Rx.Observable.bindCallback(fs.exists); exists(‘file.txt‘).subscribe( exist=>console.log(`exist? : ${exist} `) )
- 事件流: filter、delay( 延时 )、throttleTime( 时间间隔 )、debounceTime( 事件暂停x毫秒后 )、take( 执行x次后停止 )、takeUtil( 取消订阅 )
let inputStream = Rx.Observable.fromEvent(document.querySelector(‘input‘), ‘keyup‘) let stopStream = Rx.Observable.fromEvent(document.querySelector(‘button‘), ‘click‘) inputStream.throttleTime(200) // 每隔200毫秒才释放一次 .takeUtil(stopStream) // 当触发时,停止订阅 .subscribe(($event)=> {console.log(`${$event.target.value}`)}) // 事件对象
- 转换流、过滤流:map、distinct( 过滤重复 )、distince( 过滤连续重复 )
// 输入hello world input.pluck(‘target‘, ‘value‘).distinctUntilChanged() .subscribe(value => console.log(value)); // "helo wrd" input.pluck(‘target‘, ‘value‘).distinct() .subscribe(value => console.log(value)); // "helo world"
- 创建一个应用
- RxJS的工具函数都是pure、无状态的一类函数。但我们的应用中往往需要记录大量的state,那在Rx如何记录状态呢?类似于redux,rx中的scan函数能够使用reduce函数,将流的结果合并。我们需要做的仅仅是保存这个状态( state )值
// 基础模式 var button = document.querySelector(‘button‘); Rx.Observable.fromEvent(button, ‘click‘) // 内部的reduce函数能够积累(此处是相加,若是对象可以是Object.assign),并相应变化 .scan(count => count + 1, 0) // Set the count on an element each time it changes .subscribe(count => document.querySelector(‘#count‘).innerHTML = count);
- 一个更好的模式
var button = document.querySelector(‘button‘); Rx.Observable.fromEvent(button, ‘click‘) // 返回了一个, 类似redux中 经过switch(action.type)的reducer 与函数 .map(() => state => Object.assign({}, state, {count: state.count + 1})) // 直接把函数作用于初始化state,scan同时能保存这个更新后的state .scan((state, changeFn) => changeFn(state), {count: 0}) .subscribe(state => document.querySelector(‘#count‘).innerHTML = state.count);
- 对于某一个组件还可以
var state = Rx.Observable.merge( increase, decrease, input ).scan((state, changeFn) => changeFn(state), { count: 0, inputValue: ‘‘ });
- Observable( 被观察者 )
通常而言,Observable都会延迟产生值 ,比如当我们subscribe一个observable的时候它才会向我们发送这些值 let observable = Rx.Observable.range(1,3)
- pull( 拉取 ) 每个函数都是一个数据的生产者,每个调用函数的那个‘人‘,都会希望从这个函数中能够获得(pull) 唯一的返回值
- push( 推送 ) 在数据生产者中( 如函数 ),会在特定时候把数据推送至消费者,消费者在获得数据之前啥也不会做
- Observable与函数、promsise的对比:函数是当调用才同步计算,并最终只返回一个值的;promise是会或者不会返回一个值;Observable是当调用才同步或者异步地计算,并可能产生0到无穷多个值的。Observable就像一个没有参数的函数,并不断生成一些值供我们使用,因此它也像是一个事件发射机( EventEmitters )。在Observable中subscribe就像call一个函数,你订阅它,它才会被‘启动‘。同一个Observable对于不同的subscribe,是不会共享结果的( 通常情况下这样子的,但可以通过调用api来共享 )。
- Observable四大核心:创建 、订阅 、 执行 、销毁 。
订阅( subscribe )。当对一个Observable调用多个subscribe函数并创建多个observe时,observe之间不会共享任何东西,因为在Observable.create内部是对observe列表调用各自的回调的 Observable.create(function subscribe(observe){...})
执行( Executing )。Next函数能够将数据传递给Observer,同时在执行期间,能在Observable内部调用多个Next( )函数。同时建议在Observabl内部使用try/catch语法。
销毁Observe
var observable = Rx.Observable.from([10, 20, 30]); var subscription = observable.subscribe(x => console.log(x)); // Later: subscription.unsubscribe();
销毁Observable
var observable = Rx.Observable.create(function subscribe(observer) { var intervalID = setInterval(() => { ... }, 1000); return function unsubscribe() { clearInterval(intervalID); }; });
- Observer( 观察者 )
什么是观察者?观察者其实是数据的消费者,把来自Observble的数据拿过来使用。同时,Observer的本质是一系列的回调函数,是来自于Observable传递数据后的回调函数。我们可以直接通过subscribe函数创建观察者
observable.subscribe( x => console.log(‘Observer got a next value: ‘ + x), err => console.error(‘Observer got an error: ‘ + err), () => console.log(‘Observer got a complete notification‘) );
- Subscription( 订阅 )
什么是Subscription?它其实是代表着Observable的‘执行‘的对象,我们可以通过它的 unsubscribe 方法销毁Observable的执行。同时我们能使用 add 方法,一次销毁多个
var subscription = observable1.subscribe(x => console.log(‘first: ‘ + x)); var childSubscription = observable2.subscribe(x => console.log(‘second: ‘ + x)); subscription.add(childSubscription); setTimeout(() => { subscription.unsubscribe(); }, 1000);
- Subject( )
什么是Subject?它是在Rx中一种比较特殊的Observable( 同时它也是Observer ),它能够让值( value )同时向多个Observer传播( 广播 )。而一般的Observable都是‘ 单播 ‘形式,即:每一个订阅了同一个Observable的observer,实际上是拥有不同的、独立的Observable的执行( 原文:each subscribed Observer owns an independent execution of the Observable ),而Subject是多播的。
// Observable对比Subject let source = Rx.Observable.create((observer)=>{ observer.next(‘1‘); observer.next(‘2‘); }); source.subscribe((x)=>{console.log(x);}); source.subscribe((x)=>{console.log(x);}); // 输出 1 2 1 2 let subject = new Rx.Subject(); subject.subscribe((x)=>{console.log(`${x}`);}); subject.subscribe((x)=>{console.log(`${x}`);}); subject.next(1); subject.next(2); // 输出 1 1 2 2
// Subject可以在‘多播‘情景下对Observable进行优化 // 明显看到在subject下,Observable只执行了一次 var source = Rx.Observable.create((observer)=>{ console.log(`source was called`); observer.next(1);observer.next(2);observer.next(3); }); source.subscribe({next: (v) => console.log(‘observerA: ‘ + v)}); source.subscribe({next: (v) => console.log(‘observerB: ‘ + v)}); // 输出为 ‘source was called‘ observerA: 1 observerA: 2 // ‘source was called‘ observerB: 1 observerB: 2 var source = Rx.Observable.create((o)=>{ console.log(`source was called`); o.next(1);o.next(2); }); var subject = new Rx.Subject(); var multicasted = source.multicast(subject); // 原理是`subject.subscribe({...})`:返回的multicasted是一个connectableObservable multicasted.subscribe({next: (v) => console.log(‘observerA: ‘ + v)}); multicasted.subscribe({next: (v) => console.log(‘observerB: ‘ + v)}); // 原理是 `source.subscribe(subject)`: multicasted.connect(); // 输出 ‘source was called’ observerA: 1 observerB: 1 // observerA: 2 observerB: 2
可以单个取消订阅状态 observer.subscribe( ) ,也可以直接销毁Subject subscriptionConnect.unsubscribe( )
- refCount( ) 该api能监听当前connectableObservable的‘连接状态‘,当有大于0个subscribe挂在它上面它会自动执行 connect ,无subscribtion的时候会自动执行 unsubscribe( )
var refCounted = source.multicast(subject).refCount();
- BehaviorSubject() 它能储存上一次从Observable发过来的值,每当有新的observer的时,会把改值立即发送给它
var subject = new Rx.BehaviorSubject(0)// 初始值
- ReplaySubject( 重现 ) 是BehaviorSubject的加强版,它能储存从Observable发送过来的一系列值,当有新的observer连接时,会把这些值发送给它。
var subject = new Rx.ReplaySubject(3); // 缓存空间为3
- AsyncSubject(异步)仅将从Observable发送过来的最后一个值发送给Observe
var subject = new Rx.AsyncSubject();
- Operators
什么是Operators?Operators是Rx中最有用的一系列函数,它们建立在Observable之上,并能优雅地以同步代码的写法,将不同的异步流代码链式组合到一起。同时,Operator都是pure的,因为这些函数不会直接修改传入进来的Observable,而是经过修饰之后返回一个新Observable。
在链式操作中,下一个operator会根据上一个operator修改后的Observable继续工作。
- 具体请参考API
http://reactivex.io/rxjs/manual/overview.html#creation-operators
- Scheduler
Scheduler有咩用?Scheduler能控制一个订阅的开始、数据的传递。它由三部分组成:
- scheduler是一种数据结构,它知道如何基于优先级或者其他标准对任务队列进行储存
- scheduler是一个执行环境,它指何时何地地执行任务
- scheduler有一个‘ 时钟 ‘的概念,它能让我们自己定义,当Observable向Observer时会处于什么环境下( context )
// observeOn( RX.Scheduler.async ) var observable = Rx.Observable.create(function (observer) { observer.next(1);observer.next(2); }) .observeOn(Rx.Scheduler.async); console.log(‘just before subscribe‘); observable.subscribe({next: x => console.log(‘got value ‘ + x)}); console.log(‘just after subscribe‘); // 输出 // just before subscribe just after subscribe // got value 1 got value 2
- Scheduler类型
- Scheduler.queue 基于当前的事件框架在一个队列上工作,当我们需要遍历操作时可以使用它
- Scheduler.asap
- Scheduler.async 在内部,Schedules会使用setInterval,当我们需要以时间为基线时就使用它