RxJS入门(9)----调度(Bending Time with Schedulers)

如题,感觉题目没有翻译好,见英文知其义。

  • 我一知道RxJS,我们开始把它用到我的项目中了。在一段时间后,我想,我知道能如何有效的使用它了,但是这里有一个令人心烦的问题:我如何知道使用的操作符是异步的还是同步的?换句话说,什么时候利用操作符准确的发送通知?这看起来是正确使用RxJs的机器重要的部分,但是这让我感觉很模糊。
  • interval很明显是异步的,所以它必须在像setTimeout内部使用来发射值。但是如果使用range了?它也发射异步的?它是否阻止事件循环?from了?我曾经处处使用这些操作符,但我并不知道它们内部的并发模型。
  • 之后我就学习了Schedulers。
  • 在你的程序中Schedulers是一个强有力的机制来恰到好处地来处理并发性。通过允许你改变它们的并发模型,你可以很细腻的掌握一个Observable是如何发射通知的。在本章中,你将学习到怎样使用Schedulers和应用到普通的场景中。我们将会聚焦测试,这里Schedulers将会尤其有用,并且你将学会你自己的Schedulers。

Using Schedulers

  • 使用Scheduler是一种发生再将来的时间表的动作。Rxjs中的每个操作符内部可选择性地使用一个Scheduler,在最有可能的场景下提供最佳表现。
  • 看看如何在操作符里改变Scheduler,和这样做的结果。首先我们创建一个由1000个整数的数组:
var arr = [];
for (var i=0; i<1000; i++) {
arr.push(i);
}
  • 之后,我们根据arr创建一个Observable,并且通过订阅强迫它发射所有的通知。在下面的代码中,我们测试发射所有通知所耗费的时间:
var timeStart = Date.now();
Rx.Observable.from(arr).subscribe(
function onNext() {},
function onError() {},
function onCompleted() {
console.log(‘Total time: ‘ + (Date.now() - timeStart) + ‘ms‘);
});

》”Total time: 6ms”

  • 6毫秒–不错!from内部使用 Rx.Scheduler.currentThread,它调度任务指向在当前任务执行完成之后。一旦它开始了,它同步的处理所有的通知。
  • 现在让我们 Scheduler改为Rx.Scheduler.default:
var timeStart = Date.now();
Rx.Observable.from(arr, null, null, Rx.Scheduler.default).subscribe(
function onNext() {},
function onError() {},
function onCompleted() {
console.log(‘Total time: ‘ + (Date.now() - timeStart) + ‘ms‘);
});

》”Total time: 5337ms”

  • Wow,我们的代码运行比currentThread Scheduler慢了几千倍。这是由于default Scheduler异步的运行每个通知。我们可以通过在订阅之后添加一个简单的log输出来证明。
  • 使用currentThread Scheduler:
Rx.Observable.from(arr).subscribe( ... );
console.log(‘Hi there!’);

“Total time: 8ms”

“Hi there!”

  • 使用 default Scheduler:
Rx.Observable.from(arr, null, null, Rx.Scheduler.timeout).subscribe( ... );
console.log(‘Hi there!’);

“Hi there!”

“Total time: 5423ms”

  • 因为observer使用 default Scheduler异步地发射它,我们的console.log statement(它是同步的)在Observable发射任何通知前执行。使用currentThread Scheduler所有的通知都是同步的,所以console.log statement仅当Observable发射了所有的通知后才执行。
  • 因此,Schedulers真正可以改变我们Observable工作。在这种情况下,性能受到了了异步处理的一个大的可用的数组的影响。但是我们可以使用Schedulers提高性能。例如,我们在做一些代价大的Observable的操作前,我们可以切换Scheduler来执行。
arr
.groupBy(function(value) {
return value % 2 === 0;
})
.map(function(value) {
? return value.observeOn(Rx.Scheduler.default);
})
.map(function(groupedObservable) {
return expensiveOperation(groupedObservable);
});
  • 在上面的代码中,我们把数组中的值归为两大类:even和uneven的值。groupBy返回一个Observable,它发射每个group创建的Observable。同时,这里是最酷的那部分:在运行每个分组Observable的items代价大的操作前,我们使用observeOn把Scheduler变为default。这样,那些代价大的操作将会是异步地,不会阻塞事件的循环。
  • observeOn and subscribeOn
  • 在先前的部分,我们在一些Observable中使用observeOn操作符来改变Scheduler,observeOn和subscribeOn是操作符实例,它返回Observable实例副本,但是使用我们传递的Scheduler作为一个参数。
  • observeOn需要一个Scheduler并返回一个新的Observable,它使用这个Scheduler。在新的Scheduler中它会导致每个onNext调用的执行。
  • subscribeOn强制一个Observable运作特定的Scheduler来订阅和取消订阅的任务(不是通知)。像observeOn一样,它接收一个Scheduler作为参数。subscribeOn很有用,例如当我们正在运行浏览器,并在subscribe调用中执行一些很重要的任务,但是我们不想终止UI线程。
  • Basic Rx Schedulers
  • 让我额你更深入地看下我们刚刚使用的Schedulers。在RxJS中使用最多的就是immediate,default和currentThread。
  • Immediate Scheduler
  • immediate Scheduler 从Observable中同步的发送通知,因此,无论何时在 immediate Scheduler中一个action被调度,它将会立即执行,阻塞这个线程。Rx.Observable.range是内部使用 immediate Scheduler的操作符中的一个:
console.log(‘Before subscription‘);
Rx.Observable.range(1, 5)
.do(function(a) {
console.log(‘Processing value‘, a);
})
.map(function(value) { return value * value; })
.subscribe(function(value) { console.log(‘Emitted‘, value); });
console.log(‘After subscription‘);

Before subscription

Processing value 1

Emitted 1

Processing value 2

Emitted 4

Processing value 3

Emitted 9

Processing value 4

Emitted 16

Processing value 5

Emitted 25

After subscription

  • 这个程序的输出如我们期望的一样。每个console.log statement运行在当前item的通知之前。
  • When to Use It
  • immediate Scheduler适用于那些那些Observable,它们的每个通知都是可预测的且操作地代价不大。同时,Observable最终必须调用onCompleted。
  • Default Scheduler
  • default Scheduler异步地执行actions。你可以粗鲁的认为是一个保持序列队形的零毫秒延迟的setTimeout,它运行在最有效的异步实现的平台上(例如,Node.js的 process.nextTick或者是浏览器里的setTimeout)。
  • 让我们看看之前例子中的range,并让他使用default Scheduler跑起来,我们将使用observeOn操作符:
console.log(‘Before subscription‘);
Rx.Observable.range(1, 5)
.do(function(value) {
console.log(‘Processing value‘, value);
})
.observeOn(Rx.Scheduler.default)
.map(function(value) { return value * value; })
.subscribe(function(value) { console.log(‘Emitted‘, value); });
console.log(‘After subscription‘);

Before subscription

Processing value 1

Processing value 2

Processing value 3

Processing value 4

Processing value 5

After subscription

Emitted 1

Emitted 4

Emitted 9

Emitted 16

Emitted 25

  • 在输出中有很大的不同。我们的同步console.log

    statement 对每个值立即运行,但是使Observable运行在default Scheduler,它每个值的产生都是异步的。这意味着我们的log statements在do操作符里面是在平方之前处理完的。

  • When to Use It
  • default Scheduler从不阻塞事件的循环,因此,涉及到像异步的请求包含时间,这是很理想的处理方式。它可以被用在Observable用户不会完成,是因为在程序等待新通知(可能永远不会发生)的时间段内不会被阻塞。
  • Current Thread Scheduler
  • currentThread Scheduler和immediate Scheduler一样是同步的。但是,在我们的递归操作的情景中,它排队执行而不是立即执行。一个递归的操作符是一个它自己调度其他操作符的操作符。一个很好的例子就是repeat。repeat操作符,如果不给参数,一直重复之前的不定长的链中的Observable序列。
  • 你将会遇到麻烦,如果你调用repeat在一个另外一个操作符(它使用immediate Scheduler,如return)。让我们使用这个重复10,之后使用take来取重复中的第一个值,如下,代码将会打印10一次并退出:
// Be careful: the code below will freeze your environment!
Rx.Observable.return(10).repeat().take(1)
.subscribe(function(value) {
console.log(value);
});

》Error: Too much recursion

  • 砂锅面的代码导致了死循环。在订阅之上,return调用onNext(10)之后onCompleted,这导致repeat又重新订阅到return。由于return是运行immediate Scheduler,这个程序重复自己,引起了死循环并永远到不了take。
  • 但是如果我们使用 currentThread Scheduler作为第二个参数到return上,如下:
var scheduler = Rx.Scheduler.currentThread;
Rx.Observable.return(10, scheduler).repeat().take(1)
.subscribe(function(value) {
console.log(value);
});

》10

  • 现在,当重复子订阅到return,那个新的onNext调用将会排队,是因为之前的onCompleted还在发生。repeat就返回了一个一次性的对象给take,它调用onCompleted并通过销毁repeat取消重复。最后从subscribe的调用returns。
  • 最为一个大概的规则,currentThread应该被使用在iterate大序列上,当使用例如repeat的递归操作符时。
  • When to Use It
  • currentThread Scheduler在像repeat这样涉及递归操作的操作符是很有用的,一般包含嵌套操作符的迭代。

Scheduling for Animations

  • 对于快速的例如canvas或DOM动画的视觉更新,我们可以使用interval在一个很小的毫秒值或者是使用Scheduler,它使用一个类似setTimeout的内部函数来调度通知。
  • 但是两种方法都是不理想的。这两种方法中,在浏览器上我们会丢掉某些没有足够快处理的更新值。这些的发生是由于浏览器正在尝试渲染某个框架,它可能收到了渲染下一个的指令,所以它丢掉了当前的框架来保持速度。这导致了起伏的动画。在web上有好多这种情况。
  • 浏览器有一个本地化的方式来处理动画,他们提供了一个API,使用它动用requestAnimationFrame。requestAnimationFrame允许浏览器在最合适的动画时间队列优化性能,并帮助我们获取平滑的动画。
  • There’s a Scheduler for That
  • 这个RxDOM library有些补充的Schedulers,其中的一个便是requestAnimationFrame。
  • 对的,你猜到了。使用这个Scheduler我们可以改善我们的飞船游戏。在这个游戏里面,我们粗鲁的建立了一个40毫秒的刷新——每秒25次,通过interval Observable在这个速度上,之后使用combineLatest去更新所有的游戏场景在一个通过interval的给定速度上(因为它是一个更快的更新Observable)……但是谁知道使用这个技术浏览器丢弃的次数是多少!通过使用requestAnimationFrame我们可以获取更好的性能。
  • 让我们创建一个Observable,它使用 Rx.Scheduler.requestAnimationFrame作为它的Scheduler。注意到它与interval操作符工作的方式很相似:
function animationLoop(scheduler) {
return Rx.Observable.generate(
0,
function() { return true; }, // Keep generating forever
function(x) { return x + 1; }, // Increment internal value
function(x) { return x; }, // Value to return on each notification
Rx.Scheduler.requestAnimationFrame); // Schedule to requestAnimationFrame
}

现在,无论哪儿我们使用interval去动画的图片没秒25次,我们可以仅使用我们的animationLoop函数。所以我们Observable开始画星,它之前是:

var StarStream = Rx.Observable.range(1, 250)
.map(function() {
return {
x: parseInt(Math.random() * canvas.width),
y: parseInt(Math.random() * canvas.height),
size: Math.random() * 3 + 1
};
})
.toArray()
.flatMap(function(arr) {
return Rx.Observable.interval(SPEED).map(function() {
return arr.map(function(star) {
if (star.y >= canvas.height) {
star.y = 0;
}
star.y += 3;
return star;
});
});
});

现在变成了:

var StarStream = Rx.Observable.range(1, 250)
.map(function() {
return {
x: parseInt(Math.random() * canvas.width),
y: parseInt(Math.random() * canvas.height),
size: Math.random() * 3 + 1
};
})
.toArray()
.flatMap(function(arr) {
? return animationLoop().map(function() {
return arr.map(function(star) {
if (star.y >= canvas.height) {
star.y = 0;
}
star.y += 3;
return star;
});
});
});
  • 这给了我们更加平滑的动画,作为红利,这个代码就更加的清晰了!

Testing with Schedulers

  • 测试有可能使我们使用Schedulers最多的场景。这本书到目前为止我们一直在编码,没有对结果过多地考虑。但是在真正的软件项目中,我们需要编写测试以确保我们的代码向我们期望的那样运行。
  • 测试异步代码很困难。我们通常碰到一下问题:
  • 模拟异步事件很复杂且容易出错。测试的所有点就是为了避免bugs和错误,但是如果你测试本身有错误,他们将起不到帮助。
  • 如果我们想基于时间的准确的功能性测试,自动化测试变得真的很慢。例如,如果我们需要精确测试一个错误,它是获取远程文件4秒后引起的,每个测试都将花费如此多的时间来运行。如果我们一直运行测试,这将会影响我们的开发时间。
  • The TestScheduler
  • Rxjs提供了TestScheduler,一个为给测试提供帮助而设计的Scheduler。TestScheduler允许我们在合适模仿时间段并创建精确的测试,这里这些测试是100%可重复的。包含这些,它允许我们把相当数量的时间压缩到一瞬间来执行,与此同时维持测试的精确性。
  • TestScheduler是VirtualTimeScheduler的一个特例。VirtualTimeScheduler在虚拟的时间执行action而不真实的时间。Scheduled action在队列中并分配到虚拟时间的某个时刻。当它的时钟前进时,Scheduler按顺序执行action。由于是虚拟的时间,每件事都可以立即被执行,而不用等待指定的时间。看看如下代码:
var onNext = Rx.ReactiveTest.onNext;
QUnit.test("Test value order", function(assert) {
var scheduler = new Rx.TestScheduler();
var subject = scheduler.createColdObservable(
onNext(100, ‘first‘),
onNext(200, ‘second‘),
onNext(300, ‘third‘)
);
var result = ‘‘;
subject.subscribe(function(value) { result = value });
scheduler.advanceBy(100);
assert.equal(result, ‘first‘);
scheduler.advanceBy(100);
assert.equal(result, ‘second‘);
scheduler.advanceBy(100);
assert.equal(result, ‘third‘);
});
  • 在上面的代码中,我们测试了cold Observable按照正确顺序到达的某些值。为此,我们使用了TestScheduler中的createColdObservable的辅助方法来创建Observable,它在放回我们作为onNext参数传递的通知。在每个通知里面我们指定了通知将要发射的时间值。在这之后,我们订阅这个Observable,在createColdObservable中手工的让虚拟时间前进,检查它确定发射了那个期望的值。如果这个例子在正常的时间里运行,它将需要300ms,但是由于我们使用了TestScheduler来运行Observable,它将会立即执行,并遵循顺序。
  • Writing a Real-World Test

    -比起写一个真实世界的时间有关的任务的测试, 没有更好的办法来领悟如何使用虚拟时间来bend时间了。让我们回顾下地震例子中Buffering values的Observable:

quakes
.pluck(‘properties‘)
.map(makeRow)
.bufferWithTime(500)
.filter(function(rows) { return rows.length > 0; })
.map(function(rows) {
var fragment = document.createDocumentFragment();
rows.forEach(function(row) {
fragment.appendChild(row);
});
return fragment;
})
.subscribe(function(fragment) {
table.appendChild(fragment);
});
  • 为了使这段代码更加可测试,让我们用需要一个Scheduler参数的函数来封装这个Observable,在其中将使用bufferWithTime操作符。这是一个很好的方法来确定将要测试的Observables的Schedulers的参数。
function quakeBatches(scheduler) {
return quakes.pluck(‘properties‘)
.bufferWithTime(500, null, scheduler || null)
.filter(function(rows) {
return rows.length > 0;
});
}
  • 让我保留本质但是简单的按步分解下下面的代码。这些代码需要一个json对象的Observable,它包含一个properties属性,缓存并每500毫秒的批次释放,并过滤过来的空批次。
  • 我们想验证代码的有效性,每次运行测试确保我们的缓存工作如期望的一样,但我们不想都等若干秒。这正是虚拟时间和TestScheduler将会帮助我们的地方:
? var onNext = Rx.ReactiveTest.onNext;
var onCompleted = Rx.ReactiveTest.onCompleted;
var subscribe = Rx.ReactiveTest.subscribe;
? var scheduler = new Rx.TestScheduler();
? var quakes = scheduler.createHotObservable(
onNext(100, { properties: 1 }),
onNext(300, { properties: 2 }),
onNext(550, { properties: 3 }),
onNext(750, { properties: 4 }),
onNext(1000, { properties: 5 }),
onCompleted(1100)
);
? QUnit.test("Test quake buffering", function(assert) {
? var results = scheduler.startScheduler(function() {
return quakeBatches(scheduler)
}, {
created: 0,
subscribed: 0,
disposed: 1200
});
? var messages = results.messages;
console.log(results.scheduler === scheduler);
? assert.equal(
messages[0].toString(),
onNext(501, [1, 2]).toString()
);
assert.equal(
messages[1].toString(),
onNext(1001, [3, 4, 5]).toString()
);
assert.equal(
messages[2].toString(),
onCompleted(1100).toString()
);
});
  • 让我们按步骤分析下:
  • 1:我们从ReactiveTest中获取一些帮助函数开始。这是一些在虚拟时间里的注册事件:onNext,onCompleted,subscribe。
  • 2:我们创建了一个新的TestScheduler,它将驱动所有的测试。
  • 3:使用TestScheduler的createHotObservable方法,我们创建了一个假的Observable,它将会在虚拟时间的特定时刻模拟通知。尤其是,它将会在第一个秒内发射5个通知,1100毫秒完成。每次它发射一个特定properties属性的对象。
  • 4:我们使用任何的测试框架来测试。例如,我选择了QUnit。
  • 5:我们使用startScheduler方法创建一个Observable,它使用了一个 test Scheduler。第一个参数是一个函数,它创建了运行了我们SchedulerObservable。这种情况下,我们简单返回我们的quakeBatches函数,我们传递TestScheduler给它。第二个参数是一个包含了不同虚拟时间的对象,这是我们想创建Observable,订阅到它,并销毁它。对我们的例子来说,我们订阅从虚拟0时间开始并且我们销毁这个Observable在虚拟的1200毫秒之后。
  • 6:startScheduler返回一个scheduler和message属性的对象。在message中我们可以Observable在虚拟时间发射的所有通知。
  • 7:501毫秒(仅仅在第一个buffer time限制后)后第一个明确测试Observable产生了值1和2
  • 我们的第二个明确的测试在1001毫秒后,Observable蚕丝了持续的值3,4,5。最后,我们的第三个明确的测试检查了,当我们指定了热Observable quakes,1100毫秒后这个序列是完全完成了。
  • 上面的代码是一个可靠的方法,它高效的测试了我们的异步Observable,没有必须跳转到模仿异步条件的大圈子。我们简单地只当了时间,这是我们希望代码在虚拟时间里的响应,同时,我们使用一个 test Scheduler来运行所有的操作。
时间: 2024-10-09 00:56:44

RxJS入门(9)----调度(Bending Time with Schedulers)的相关文章

Java入门——定时调度

Java入门——定时调度 Timer类 Timer类是一种线程设施,可以用来实现在某一个时间或者某一段时间后安排某一个任务执行一次或者定时重复执行,该功能要与TimeTask配合使用. 每一个Timer对象对应的是一个线程,因此计时器所执行的任务应该迅速完成,否者可能会延迟后续任务的执行. 序号 方法 类型 描述 1 Timer() 构造 创建一个计时器,并且启动这个计时器 2 cancle() 普通 终止该计时器,并放弃所有已经安排的任务 3 purge() 普通 将已经取消的任务移除,一般用

RxJS入门2之Rxjs的安装

RxJS V6.0+ 安装 RxJS 的 import 路径有以下 5 种: 1.创建 Observable 的方法.types.schedulers 和一些工具方法 import { Observable, Subject, asapScheduler, pipe, of, from, interval, merge, fromEvent, SubscriptionLike, PartialObserver } from 'rxjs'; 2.操作符 operators import { map

RxJS入门(3)----深入Sequence

在之前的系列中,我吧sequence翻译成了序列,题目我就不翻译了,感觉翻译过来的有点失去了点啥.其他好多地方都是用stream(流)来比喻和形容. 可视化Observable 在Rxjs编程中你已经学了一些关于使用最频繁的操作符了.讨论什么是sequence的操作符感觉有点抽象.为了帮助开发者更容易的理解操作符,我们使用marble diagrams(弹子图?翻译估计有问题)来标准可视化.他们很形象的代表了异步的数据流,并且你可以在其他的rxjs资料里面找到. 让我们看下range操作符,它返

RxJS入门(7)----创建一个完整的web application

在本章中,使用Rxjs,我们将创建一个典型的web application.我们将转化Dom Object Model(DOM)并且使用node.js中的websocket做c/s的交互. 用户界面部分,我们将使用RxJs-Domlibrary,这同样是Rxjs团队做的库,提供了方便的操作符来处理DOM和浏览器相关的使我们的编码更容易.服务器端:我们将是使用两个建立很好的node库,并用Observable封装他们的一些API到我们的application中. 在这一章之后,你将能使用RxJs创

RxJS入门(8)----创建一个完整的web application

上接(7) Getting Real-Time Updates from Twitter 我们计划的的第二部分是做一个实时的仪表给地震,添加从Twitter相关的地球上正在发生的不同地震报告和信息.为了实现这个,我们需要创建一个小的Node.js程序,它获取tweets相关的地震的流. Setting Up Our Node.js Environment 配置我们的Node.js程序.包括RxJS,我们将会使用两个比较重要的第三方modules使我们的编程会更容易:ws和twit.其他任何相似的

RxJS入门之函数响应式编程

一.函数式编程 1.声明式(Declarativ) 和声明式相对应的编程?式叫做命令式编程(ImperativeProgramming),命令式编程也是最常见的?种编程?式. //命令式编程: function double(arr) { const results = [] for (let i = 0; i < arr.length; i++){ results.push(arr[i] * 2) } return results } function addOne(arr){ const r

rxjs 入门--环境配置

原文: https://codingthesmartway.com/getting-started-with-rxjs-part-1-setting-up-the-development-environment-creating-observables/ ---------------------------------------------------------------- Getting Started With RxJS – Part 1: Setting Up The Develo

从零开始入门 K8s | 调度器的调度流程和算法介绍

导读:Kubernetes 作为当下最流行的容器自动化运维平台,以声明式实现了灵活的容器编排,本文以 v1.16 版本为基础详细介绍了 K8s 的基本调度框架.流程,以及主要的过滤器.Score 算法实现等,并介绍了两种方式用于实现自定义调度能力. 调度流程 调度流程概览 Kubernetes 作为当下最主流的容器自动化运维平台,作为 K8s 的容器编排的核心组件 kube-scheduler 将是我今天介绍的主角,如下介绍的版本都是以 release-1.16 为基础,下图是 kube-sch

RxJava(11-线程调度Scheduler)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51821940 本文出自:[openXu的博客] 目录: 使用示例 subscribeOn原理 多次subscribeOn的情况 observeOn原理 调度器的种类 各种操作符的默认调度器 源码下载 ??RxJava中 使用observeOn和subscribeOn操作符,你可以让Observable在一个特定的调度器上执行,observeOn指示一个Observable在一个特定的调度器