RxJS入门(3)----深入Sequence

在之前的系列中,我吧sequence翻译成了序列,题目我就不翻译了,感觉翻译过来的有点失去了点啥。其他好多地方都是用stream(流)来比喻和形容。

可视化Observable

在Rxjs编程中你已经学了一些关于使用最频繁的操作符了。讨论什么是sequence的操作符感觉有点抽象。为了帮助开发者更容易的理解操作符,我们使用marble diagrams(弹子图?翻译估计有问题)来标准可视化。他们很形象的代表了异步的数据流,并且你可以在其他的rxjs资料里面找到。

  • 让我们看下range操作符,它返回一个发射特定范围内整数的Observable:如下图所示:

    这个长箭头代表着这个Observable,x轴代表者时间,每一个圆圈代表着内部调用onNext()时Observable发射的值。在产生第三个值之后,range调用了上图中用垂直线表示的onCompleted函数。

  • 让我们看下包含若干Observable的例子。merge操作符操作两个不同的Observable,并且返回合并值的新的Observable。
  • interval操作符返回一个在规定的毫秒间隔时间内产生增加数的Observable。
  • 在下面的例子中,我们将会合并两个使用了interval的不同的Observable,这两个Observable都是在不同的时间间隔而产生值:
var a = Rx.Observable.interval(200).map(function(i) {
return ‘A‘ + i;
});
var b = Rx.Observable.interval(100).map(function(i) {
return ‘B‘ + i;
});
Rx.Observable.merge(a, b).subscribe(function(x) {
console.log(x);
});

结果如下:

》B0, A0, B1, B2, A1, B3, B4…

  • merge操作符的示意图如下:

    这里,沿着y轴虚线箭头指向了A和B序列中的每个元素变化之后的最终结果。C序列代表着作为结果的Observable,它包含了A和B序列合并后的元素。如果不同的序列的元素同时发射,合并序列的元素顺序是随机的。

基础的序列(sequence)操作符

  • 几乎在Rxjs转变Observable的数十个操作符里,也有在任何一门语言里使用最多的有收集-处理能力的如:map、filter、reduce。在JavaScrpit中,你可以在数组的实例中找到这些操作符(函数)。
  • Rxjs遵循着JavaScript的约定,你有机会可以找到几乎和数函数作一样的操作符,实际上,我们将会展示数组和Observable的真实使用,用以呈现这两者的API是有多么的相似。
  • map
  • map是用的最多的序列转化操作符。它需要一个Observable和一个函数,并且把函数应用于源Observable的每一个值。它返回一个新的转化后值的Observable。

    在上面两种情况,src都是不会改变的。

    这个代码,和下面的代码,使用logValue定义:

    var logValue = function(val){console.log(val)};

    他可能是我们传递给map去做一些异步计算改变那个值的函数。在某些状况下,map可能不会如期盼的那样工作,对于这些状况,更好的办法是使用flatMap操作符。

  • filter
  • filter需要一个Observable和一个函数,并且它会使用这个函数去测试Observable中的每一个元素。它将会返回一个序列,这个序列中所的元素都是那个函数返回true的值。

  • reduce
  • reduce(也做fold)需要一个Observable并且返回一个新的只包含一个单个item的Observable,这个单个的item是某个函数应用到每一元素的的结果。这个函数接受当前元素和这个函数上一次调用的结果。

    reduce是一个处理某个序列很强大的操作符。实际上它是一种被称为聚合操作符(aggregate operators)的一个完整子集的基础实现。

  • Aggregate Operators
  • 聚合操作符处理某个序列并且返回单个的值。例如:Rx.Observable.first需要一个Observable和一个可选的断言函数并且返回第一个满足断言函数的元素。
  • 计算某个序列的平均值也是一个聚合操作中。Rxjs提供了average操作的实例,但是有这个选着的缘故,我们使用reduce来实现下。每一个聚合操作都能被仅仅使用reduce来实现。
var avg = Rx.Observable.range(0, 5)
.reduce(function(prev, cur) {
return {
sum: prev.sum + cur,
count: prev.count + 1
};
}, { sum: 0, count: 0 })
.map(function(o) {
return o.sum / o.count;
});
var subscription = avg.subscribe(function(x) {
console.log(‘Average is: ‘, x);
});

》Average is: 2

  • 面代码的解释,我就不翻译了,直接看那就能看懂。设想下,如若现在我们要计算一个步行者的平均速度,用reduce很好实现,但是假设,在时间轴上,步行者永远走下去,那么像reduce一样的聚合操作符将永远不会调用它的观察者(obeserver)的onNext函数。
  • 很高兴的是,Rxjs团队已经想到这种情况了,并且给我们提供了一个scan操作符,它扮演着像reduce的角色,但是它会发送每一个中间的结果。代码如下:
var avg = Rx.Observable.interval(1000)
.scan(function (prev, cur) {
return {
sum: prev.sum + cur,
count: prev.count + 1
};
}, { sum: 0, count: 0 })
.map(function(o) {
return o.sum / o.count;
});
var subscription = avg.subscribe( function (x) {
console.log(x);
});

使用如上方式,我们就可以聚合某个消耗时间长或者没有时间限制的序列了。在上面的代码中,我们每秒产生了一个增量的整数,并且用scan取代了先前的reduce。我们就以每秒为间隔取得了到目前为止所有的平均值。

- flatMap

- 如果你又一个Observable它的结果是许多嵌套的Observable将则怎么做?大部分的时间,你想到的就是统一这些嵌套的Observable的元素到一个单个的序列中。这也是flatMap所要明确干的事。

- flatMap操作符需要一个Observable参数,这个参数的元素也是Observable,并且返回只有一个孩子Observable的平坦值的Observable。

我们可以看到到每一在A(A1,A2,A3)中的元素也是Observable序列,一旦我们对A应用flatMap转化功能,我们将获得一个Observable,它包含所A所有不同孩子的所有元素。

  • flatMap是一个强大的操作符,但是它比起我们目前学到的其他操作符都要难理解些,把它认为是这些Observable的concatAll()函数。
  • contcatAll是需要一个数组的数组函数,并且返回一个平坦的单一的数组,这个数组包含所有子数组的值,而不是这些子数组本身。我们可以使用reduce去建一个这样的函数:
function concatAll(source) {
return source.reduce(function(a, b) {
return a.concat(b);
});
}

我们可以这样使用它:

concatAll([[0, 1, 2], [3, 4, 5], [6, 7, 8]]);
// [0, 1, 2, 3, 4, 5, 6, 7, 8]
  • flatMap做同样的事,但不是平坦的arrays而是observable……
  • Canceling Sequences
  • 在Rxjs中,我们可以取消正在运行的Observable。相比起回调函数或者promise(有部分promise实现支持取消)这些异步交互的方式,这就是优势。
  • 有隐式或者是明确的这两者主要的取消Observable的方式。

    1)明确的取消:dispose

    Observable本身并没有取消方法。当我们订阅到一个Observable的时候我么就是获得了一个Disposable对象,这个dispose对象代表这个特殊的订阅。这样我们就可以在这个对象中调用dispose方法了,之后这个特殊的订阅就会停止从Observable中接受通知。

    在接下来的例子中,我们订阅了两个counter的Observable,counter每秒发射一个自增的整数。两秒后我们取消第二个订阅(subscription),之后我们看到它的输出停止了,但是第一个订阅者的输出还是在进行的。

var counter = Rx.Observable.interval(1000);
var subscription1 = counter.subscribe(function(i) {
console.log(‘Subscription 1:‘, i);
});
var subscription2 = counter.subscribe(function(i) {
console.log(‘Subscription 2:‘, i);
});
setTimeout(function() {
console.log(‘Canceling subscription2!‘);
subscription2.dispose();
}, 2000);

结果将会如下:

》Subscription 1: 0

Subscription 2: 0

Subscription 1: 1

Subscription 2: 1

Canceling subscription2!

Subscription 1: 2

Subscription 1: 3

Subscription 1: 4

2)通过Operator的隐式取消

大部分的时候,操作符将会为你自动取消subscription。像range或者take,当序列结束或者满足某个操作的条件时,他们将会取消订阅。更高级的如withLastestFrom或者flatMapLastest等操作符,当它们动态创建Observable时,将会在需要订阅的时候内部自动地创建和销毁。

当我们使用Observable包装外部没有提供取消功能的API接口时,当取消Observable时,Observable将会停止发送通知,但是里面的API将不会被注销。例如你使用一个包装着promise的Observable,当取消时Observable会停止发射,但是里面的promise不会停止。

  • 如下代码,我们试图取消关联到某个包装了promise Observable的订阅,同时,我们也把promise按照传统的方式了进行一个操作。这个promise将会在5秒后被执行,但是取消订阅会立马被执行。
var p = new Promise(function(resolve, reject) {
window.setTimeout(resolve, 5000);
});
p.then(function() {
console.log(‘Potential side effect!‘);
});
var subscription = Rx.Observable.fromPromise(p).subscribe(function(msg) {
console.log(‘Observable resolved!‘);
});
subscription.dispose();

5秒之后我们将会看到:

》Potential side effect!

  • 如果我们取消到Observable的订阅,它将会很有效的停止重Observable接受通知。但是这个promise的then方法还会继续执行,这展示给我们:取消Observable并不会取消它内部的promise。
  • 因此,在Observable内部使用外部的API接口必须知道里面的细节是很重要的。你可以想象:你已经取消了一个序列,但是它内部的一些api任然在运行,并且给你的程序带来一些副作用,这些错误真的很难被捕获。

Handling Errors

  • 在回调函数中,我们之所以可以使用传统的try/catch机制,是因为它是同步的。由于它运行在任何异步代码之前,所以它将捕获不到任何错误。
  • 在回调函数中的解决方式是:传递这个错误作为回调函数的一个参数,这样虽然可以起到作用,但是会使代码变得相当脆弱。
  • 下面让我们看看Observable是如和捕获错误:
  • The onError Handler
  • 当我们说起observer时候,必须记住三个函数:onNext、onCompleted、onError。onError是Observable中有效处理错误的关键。
  • 为了展示它如何工作,如下将会有一个简单的函数,它需要json串的数组并且返回一个Observable,这个Observable发送用JSON.parse转化那些json串后的对象。
function getJSON(arr) {
return Rx.Observable.from(arr).map(function(str) {
var parsedJSON = JSON.parse(str);
return parsedJSON;
});
}
  • 通过getJSON我们将会传递三个json串,第二个串会包含一个语法错误,因此JSON.parse无法解析它。接着我们通过提供onNext和onError处理器来订阅那个结果:
getJSON([
‘{"1": 1, "2": 2}‘,
‘{"success: true}‘, // Invalid JSON string
‘{"enabled": true}‘
]).subscribe(
function(json) {
console.log(‘Parsed JSON: ‘, json);
},
function(err) {
console.log(err.message);
}
);

结果如下:

》Parsed JSON: { 1: 1, 2: 2 }

JSON.parse: unterminated string at line 1 column 8 of the JSON data

  • 针对数组的第一个结果 Observable发射了一个JSON转译对象,但是第二个会抛出一个异常,onError处理器将会捕获这个异常,并打印它。默认的行为是,无论什么时候,只要异常一发生,Observable将会停止发射,并且onCompleted将不会被调用。
  • 捕获异常
  • 到目前为止我们已经知道如何侦测一个异常,并且去做些什么。但是我们还没没能响应我们接着要做的事。Observable实例提供了一个catch操作符,它允许我们对一个Observable的错误进行响应之后而继续进行其他Observable。
  • catch操作符需要一个以异常为入参的Observable或者函数,它返回另外一个Observable。在我们的例子中,由于在原始的Observable中有错误,我们想Observable发射一个包含异常属性的JSON对象。
function getJSON(arr) {
return Rx.Observable.from(arr).map(function(str) {
var parsedJSON = JSON.parse(str);
return parsedJSON;
});
}
var caught = getJSON([‘{"1": 1, "2": 2}‘, ‘{"1: 1}‘]).catch(
Rx.Observable.return({
error: ‘There was an error parsing JSON‘
})
);
caught.subscribe(
function(json) {
console.log(‘Parsed JSON: ‘, json);
},
// Because we catch errors now, `onError` will not be executed
function(e) {
console.log(‘ERROR‘, e.message);
}
);
  • 上述代码中,我们创建了一个叫做caught的新的Observable,它使用catch操作符捕获初始Observable里的异常。如果有异常,它将继续这个序列通过使用仅仅发射一个异常属性item的Observable来描述这个错误,结果如下:

    》Parsed JSON: Object { 1: 1, 2: 2 }

    Parsed JSON: Object { error: “There was an error parsing JSON” }

  • 下面就是catch操作符的marble diagram(弹子图):

  • 注意到“X”在序列上的,它代表着一个异常(错误)。不同的Observable值的形状:三角形代表着这些值来自另外一个Observable,在这里,那个Observable是我们异常情况下返回的Observable。
  • catch对序列中的异常的交互来说非常有用,并且它的好多行为跟传统的try-catch块是很相似的。在好多情况下,忽略序列中某个项发生的异常并且让这个序列继续下去是很方便的。在这些情况下,我们可以使用retry操作符。
  • Retrying Sequences
  • 有些时候仅仅是发生错误而不需要我们去做些什么。例如,一个由于用户零星Internet链接或者远程服务器宕机而导致的远程数据请求的超时,在这种情况下,这将会是一个很好的办法,如果我们一直请求我们需要的数据直到成功为止。那个retry做了如下操作:
//This will try to retrieve the remote URL up to 5 times.
Rx.DOM.get(‘/products‘).retry(5)
.subscribe(
function(xhr) { console.log(xhr); },
function(err) { console.error(‘ERROR: ‘, err); }
);
  • 在上面的代码中,我们创建了一个函数,它返回了一个使用XMLHttpRequest向一个url重复获取内容的Observable。由于我们的链接有可能会不靠谱,我们在订阅之前增加了retry(5),确保在异常的情况下,在Observable挂起和报错之前会尝试5次。
  • 当我们使用retry的时候,有两件事很重要:
  • 1)如果我们不传任何参数,它将会无限次尝试,直到Observable结束并且没有异常。这对程序来说是非常危险的,一旦Observable一直报错的话。如果我们使用同步的多个Observable,它将会有同样无限循环的结果。
  • 2)retry将会重新retry整个的Observable,即使某些项没有异常。因为每次retry,它都会重新运行,当我们处理的某些项的时候,将会导致意外的结果,这一点也很重要。
时间: 2025-01-09 15:41:29

RxJS入门(3)----深入Sequence的相关文章

RxJS入门(7)----创建一个完整的web application

在本章中,使用Rxjs,我们将创建一个典型的web application.我们将转化Dom Object Model(DOM)并且使用node.js中的websocket做c/s的交互. 用户界面部分,我们将使用RxJs-Domlibrary,这同样是Rxjs团队做的库,提供了方便的操作符来处理DOM和浏览器相关的使我们的编码更容易.服务器端:我们将是使用两个建立很好的node库,并用Observable封装他们的一些API到我们的application中. 在这一章之后,你将能使用RxJs创

RxJS入门(9)----调度(Bending Time with Schedulers)

如题,感觉题目没有翻译好,见英文知其义. 我一知道RxJS,我们开始把它用到我的项目中了.在一段时间后,我想,我知道能如何有效的使用它了,但是这里有一个令人心烦的问题:我如何知道使用的操作符是异步的还是同步的?换句话说,什么时候利用操作符准确的发送通知?这看起来是正确使用RxJs的机器重要的部分,但是这让我感觉很模糊. interval很明显是异步的,所以它必须在像setTimeout内部使用来发射值.但是如果使用range了?它也发射异步的?它是否阻止事件循环?from了?我曾经处处使用这些操

RxJS入门(8)----创建一个完整的web application

上接(7) Getting Real-Time Updates from Twitter 我们计划的的第二部分是做一个实时的仪表给地震,添加从Twitter相关的地球上正在发生的不同地震报告和信息.为了实现这个,我们需要创建一个小的Node.js程序,它获取tweets相关的地震的流. Setting Up Our Node.js Environment 配置我们的Node.js程序.包括RxJS,我们将会使用两个比较重要的第三方modules使我们的编程会更容易:ws和twit.其他任何相似的

RxJS入门之函数响应式编程

一.函数式编程 1.声明式(Declarativ) 和声明式相对应的编程?式叫做命令式编程(ImperativeProgramming),命令式编程也是最常见的?种编程?式. //命令式编程: function double(arr) { const results = [] for (let i = 0; i < arr.length; i++){ results.push(arr[i] * 2) } return results } function addOne(arr){ const r

RxJS入门2之Rxjs的安装

RxJS V6.0+ 安装 RxJS 的 import 路径有以下 5 种: 1.创建 Observable 的方法.types.schedulers 和一些工具方法 import { Observable, Subject, asapScheduler, pipe, of, from, interval, merge, fromEvent, SubscriptionLike, PartialObserver } from 'rxjs'; 2.操作符 operators import { map

rxjs 入门--环境配置

原文: https://codingthesmartway.com/getting-started-with-rxjs-part-1-setting-up-the-development-environment-creating-observables/ ---------------------------------------------------------------- Getting Started With RxJS – Part 1: Setting Up The Develo

构建流式应用—RxJS详解

讲之前先说一点哈,插入的图片,不能正常显示,图片一半被遮盖了,如果想看,鼠标右击然后图片地址,图片另存为去看.如果有知道怎么解决的,可以在下面给我评论,我会及时的修改. 好啦,开始: 目录 常规方式实现搜索功能 RxJS · 流 Stream RxJS 实现原理简析 观察者模式 迭代器模式 RxJS 的观察者 + 迭代器模式 RxJS 基础实现 Observable Observer RxJS · Operators Operators ·入门 一系列的 Operators 操作 使用 RxJS

5.Source

!!!1.Avro Source 监听AVRO端口来接受来自外部AVRO客户端的事件流. 利用Avro Source可以实现多级流动.扇出流.扇入流等效果. 另外也可以接受通过flume提供的Avro客户端发送的日志信息. 支持的属性: !channels – !type – 类型名称,"AVRO" !bind – 需要监听的主机名或IP !port – 要监听的端口 threads – 工作线程最大线程数 selector.type selector.* interceptors –

rxjs简单入门

rxjs全名Reactive Extensions for JavaScript,Javascript的响应式扩展, 响应式的思路是把随时间不断变化的数据.状态.事件等等转成可被观察的序列(Observable Sequence),然后订阅序列中那些Observable对象的变化,一旦变化,就会执行事先安排好的各种转换和操作 rxjs适用于异步场景,即前端交互中接口请求.浏览器事件以及自定义事件.通过使用rxjs带给我们前所未有的开发体验. 统一异步编程的规范,不管是Promise.ajax还是