[RxJS] Aggregating Streams With Reduce And Scan using RxJS

What is the RxJS equivalent of Array reduce? What if I want to emit my reduced or aggregated value at each event? This brief tutorial covers Observable operators reduce() and scan(), their differences and gotchas.

In ES5, the Array‘s reduce function works like this:

var ary = [0,1,2,3,4];

var res = ary.reduce(function(preVal, item){
  return preVal+item ;
}, 0);

console.log(res); //10

In RxJS, also has reduce function:

It gives the same result.

var source = Rx.Observable.fromArray([0,1,2,3,4]);

source.reduce(function(preVal, item){
  return preVal+item ;
}, 0).subscribe(function(res){
  console.clear();
  console.log(res);
});

Let‘s do it async:

We will wait for 2.5 seconds until it gives result "10". This means the reduce() function in RxJS, it will not exec until the observable finihsed.

var source = Rx.Observable.interval(500).take(5);

source.reduce(function(preVal, item){
  return preVal+item ;
}, 0).subscribe(function(res){
  console.clear();
  console.log(res);
});

So if we just write:

var source = Rx.Observable.interval(500);

And never finish it, we won‘t get result by reduce() funtion.

Use sacn() function instead of reduce() to get result each time:

var source = Rx.Observable.interval(500).take(5);

source.scan(0, function(preVal, item){
  return preVal+item ;
}).subscribe(function(res){
  console.log(res);
});

/*
0
1
3
6
10
*/

when I run this, you‘ll see each time it ticks in, I get the next value, the next reduced value. One nice difference with scan though is, since it doesn‘t wait for completion, if I were to run this again, it‘s actually going to give me a result every time.

时间: 2024-08-25 17:29:31

[RxJS] Aggregating Streams With Reduce And Scan using RxJS的相关文章

[RxJS] Combining Streams with CombineLatest

Two streams often need to work together to produce the values you’ll need. This lesson shows how to use an input stream and an interval stream together and push an object with both values through the stream. const Observable = Rx.Observable; const st

[RxJS] Combining streams in RxJS

Source: Link We will looking some opreators for combining stream in RxJS: merge combineLatest withLatestFrom concat forkJoin flatMap / switchMap  Merge:  Observable.merge behaves like a "logical OR" to have your stream handle one interaction OR

[RxJS] Sharing Streams with Share

A stream will run with each new subscription added to it. This lesson shows the benefits of using share so that the same stream can be shared across multiple subscriptions. const timer$ = starters$ .switchMap(intervalActions) .startWith(data) .scan((

[RxJS] Reactive Programming - Using cached network data with RxJS -- withLatestFrom()

So now we want to replace one user when we click the 'x' button. To do that, we want: 1. Get the cached network data for generating the userList. 2. Then get a random user from the cached data. 3. Showing the user in the list. We have the function to

[RxJS] Implement pause and resume feature correctly through RxJS

Eventually you will feel the need for pausing the observation of an Observable and resuming it later. In this lesson we will learn about use cases where pausing is possible, and what to do when pausing is impossible. const resume$ = new Rx.Subject();

[RxJS] Learn How To Use RxJS 5.5 Beta 2

The main changes is about how you import rxjs opreators from now on. And introduce lettable opreator. import { range } from 'rxjs/observable/range'; import { map, filter, scan } from 'rxjs/operators'; const source$ = range(0, 10); source$.pipe( filte

[Angular 2] Managing State in RxJS with StartWith and Scan

The scan operator in RxJS is the main key to managing values and states in your stream. Scan behaves just as a reduce function would, but scan is able to collect values from streams over time. This lesson covers using startWith to set the initial acc

CUDA系列学习(五)GPU基础算法: Reduce, Scan, Histogram

喵~不知不觉到了CUDA系列学习第五讲,前几讲中我们主要介绍了基础GPU中的软硬件结构,内存管理,task类型等:这一讲中我们将介绍3个基础的GPU算法:reduce,scan,histogram,它们在并行算法中非常常用,我们在本文中分别就其功能用处,串行与并行实现进行阐述. 1. Task complexity task complexity包括step complexity(可以并行成几个操作) & work complexity(总共有多少个工作要做). e.g. 下面的tree-str

构建流式应用—RxJS详解

讲之前先说一点哈,插入的图片,不能正常显示,图片一半被遮盖了,如果想看,鼠标右击然后图片地址,图片另存为去看.如果有知道怎么解决的,可以在下面给我评论,我会及时的修改. 好啦,开始: 目录 常规方式实现搜索功能 RxJS · 流 Stream RxJS 实现原理简析 观察者模式 迭代器模式 RxJS 的观察者 + 迭代器模式 RxJS 基础实现 Observable Observer RxJS · Operators Operators ·入门 一系列的 Operators 操作 使用 RxJS