[RxJS] Combining streams in RxJS

Source: Link

We will looking some opreators for combining stream in RxJS:

  • merge
  • combineLatest
  • withLatestFrom
  • concat
  • forkJoin
  • flatMap / switchMap

 Merge: 

Observable.merge behaves like a "logical OR" to have your stream handle one interaction OR another.

let btn1 = document.querySelector("#btn1");
let btn2 = document.querySelector("#btn2");

let btn1Click$ = Rx.Observable.fromEvent(btn1, "click");
let btn2Click$ = Rx.Observable.fromEvent(btn2, "click");

let btn1Log$ = btn1Click$.map( (ev) => {
  console.log("Button 1 clicked");
});
let btn2Log$ = btn2Click$.map( (ev) => {
  console.log("Button 2 clicked");
});
let clicks$ = Rx.Observable.merge(btn1Log$, btn2Log$);

clicks$.subscribe();

combineLatest:

Ofter used when one of streams value changed, then produce a side effect:

var source1 = Rx.Observable.interval(1000)
  .map(function (i) { return ‘First: ‘ + i; });

var source2 = Rx.Observable.interval(2000)
  .map(function (i) { return ‘Second: ‘ + i; });

// Combine latest of source1 and source2 whenever either gives a value
var source = Rx.Observable.combineLatest(
    source1,
    source2
  ).take(4);

var subscription = source.subscribe(
  function (x) {
    console.log(x);
  },
  function (err) {
    console.log(‘Error: %s‘, err);
  },
  function () {
    console.log(‘Completed‘);
  });

/*
["First: 0", "Second: 0"]
["First: 1", "Second: 0"]
["First: 2", "Second: 0"]
["First: 2", "Second: 1"]
"Completed"
*/

withLatestFrom: 

var source1 = Rx.Observable.interval(1000)
  .map(function (i) { return i; });

var btn = document.querySelector("#btn");
var source2 = Rx.Observable.fromEvent(btn, "click");

var source =source1
.withLatestFrom(
  source2,
  (source1, click) => ({timer: source1, clicks: click.x})
).take(4);

var subscription = source.subscribe(
  function (x) {
    console.log(x);
  },
  function (err) {
    console.log(‘Error: %s‘, err);
  },
  function () {
    console.log(‘Completed‘);
  });

Read the difference between combineLatest and withLatestFrom: Link.

concat:

Concat will combine two observables into a combined sequence, but the second observable will not start emitting until the first one has completed.

let first = Rx.Observable.interval(1000).take(3).do( (i) => { console.log("First: ", i);});

let second = Rx.Observable.interval(500).take(3).do( (i) => { console.log("Second: ", i);});

first.concat(second).subscribe();

/*
"First: "
0
"First: "
1
"First: "
2
"Second: "
0
"Second: "
1
"Second: "
2
*/

forkJoin:
We use forkJoin to execute observables in parallel. One common use case of this is making multiple http requests in parallel. In my sample I am forkJoining two very simple observables, but the key point is that the subscriber won‘t receive any values until both observables have completed.

let first = Rx.Observable.interval(1000).take(6);

let second = Rx.Observable.interval(500).take(3);

Rx.Observable.forkJoin(first, second).subscribe(
  (res) =>{
    console.log(res); // [5, 2]
  },
  (err) => {
    console.log(err);
  },
  () => {
    console.log("Completed");  // Completed
  }
);

flatMap / switchMap

flatMap and switchMap basic are the same.

Just switchMap only care about the latest value, will ignore the previous value. So good to use with http reuqest.

The reason to use flatMap is because inside Observable you migth return another Observable, such as:

var btn = document.querySelector("#btn");

var click$ = Rx.Observable.fromEvent(btn, "click");

var promise$ = Rx.Observable.fromPromise( jquery.http(‘xxx‘));
var xhrCall$ = click$.flatMap( () => {
    return promise$;
});

xhrCall$.subscribe( (res) => {
  console.log(res);
})

Inside Observalbe return another Observable, will create a 2-d Observable, just like inside map ruturn another array, will create 2-d array.

So we need to flatten it.

时间: 2024-10-07 16:58:26

[RxJS] Combining streams in RxJS的相关文章

[RxJS] Combining Streams with CombineLatest

Two streams often need to work together to produce the values you’ll need. This lesson shows how to use an input stream and an interval stream together and push an object with both values through the stream. const Observable = Rx.Observable; const st

RxJS入门2之Rxjs的安装

RxJS V6.0+ 安装 RxJS 的 import 路径有以下 5 种: 1.创建 Observable 的方法.types.schedulers 和一些工具方法 import { Observable, Subject, asapScheduler, pipe, of, from, interval, merge, fromEvent, SubscriptionLike, PartialObserver } from 'rxjs'; 2.操作符 operators import { map

[RxJS] Aggregating Streams With Reduce And Scan using RxJS

What is the RxJS equivalent of Array reduce? What if I want to emit my reduced or aggregated value at each event? This brief tutorial covers Observable operators reduce() and scan(), their differences and gotchas. In ES5, the Array's reduce function

[RxJS] Sharing Streams with Share

A stream will run with each new subscription added to it. This lesson shows the benefits of using share so that the same stream can be shared across multiple subscriptions. const timer$ = starters$ .switchMap(intervalActions) .startWith(data) .scan((

[RxJS] Error Handling in RxJS

Get your code back on the happy path! This lesson covers a variety of ways to handle exceptions thrown by Observables in RxJS. Operators covered are: catch, onErrorResumeNext, retry and retryWhen. We have the code which throw error when hit 3. This e

[RxJS] Stream Processing With RxJS vs Array Higher-Order Functions

Higher order Array functions such as filter, map and reduce are great for functional programming, but they can incur performance problems. var ary = [1,2,3,4,5,6]; var res = ary.filter(function(x, i, arr){ console.log("filter: " + x); console.lo

[RxJS 6] The Retry RxJs Error Handling Strategy

When we want to handle error observable in RxJS v6+, we can use 'retryWhen' and 'delayWhen': const courses$: Observable<Counse[]> = http$ .pipe( tap(() => console.log("HTTP request")), map(res => Object.values(res['payload'])), share

Angular2中的RxJS

RxJS库 RxJS(Reactive Extensions)是个Angular提供的第三方库,实现异步观察模式(asynchronous observable pattern). 启用RxJS操作 RxJS非常大,通常只要我们所需要的特性就好了.Angular在rxjs/Observable模块中提供了简版的Observable,但是它缺乏我们所需要的所有的操作,包括上面提到的map方法. 我们在应用启动时引入所有RxJS操作: import 'rxjs/Rx'; 首先我们观'rxjs/Obs

Rxjs 操作符

1. javascript解决异步编程方案 解决javascript异步编程方案有两种,一种是promise对象形式,还有一种是是Rxjs库形式,Rxjs相对于Promise来说,有好多Promise没有的特性和功能,使用起来更便捷简单: 2. Rxjs 简单介绍 Rxjs 是Reactive Extensions JavaScript 的简写,响应式异步编程:同Promise对象一样,是解决JS异步编程的一种解决方案: 3. Rxjs使用 1. Rxjs是一个库,需要使用npm进行安装: //