[RxJS] Implement RxJS `switchMap` by Canceling Inner Subscriptions as Values are Passed Through

switchMap is mergeMap that checks for an "inner" subscription. If the "inner" subscription exists, switchMap unsubscribes from that "inner" subscription which effectively "cancels" any pending pushes.

import { fromEvent, of, Subscriber } from "rxjs"
import {
  scan,
  delay,
  mergeMap,
  switchMap
} from "rxjs/operators"

class MySwitchMapSubscriber extends Subscriber {
  innerSubscription

  constructor(sub, fn) {
    super(sub)

    this.fn = fn
  }

  _next(value) {
    console.log(`outer`, value)
    const o$ = this.fn(value)

    if (this.innerSubscription) {
      this.innerSubscription.unsubscribe()
    }

    this.innerSubscription = o$.subscribe({
      next: value => {
        console.log(`  inner`, value)
        this.destination.next(value)
      }
    })
  }
}

const mySwitchMap = fn => source =>
  source.lift({
    call(sub, source) {
      source.subscribe(
        new MySwitchMapSubscriber(sub, fn)
      )
    }
  })

const observable$ = fromEvent(
  document,
  "click"
).pipe(
  scan(i => i + 1, 0),
  mySwitchMap(value => of(value).pipe(delay(500)))
)

const subscriber = {
  next: value => {
    console.log(value)
  },
  complete: () => {
    console.log("done")
  },
  error: value => {
    console.log(value)
  }
}

observable$.subscribe(subscriber)

原文地址:https://www.cnblogs.com/Answer1215/p/9715065.html

时间: 2024-10-17 19:42:20

[RxJS] Implement RxJS `switchMap` by Canceling Inner Subscriptions as Values are Passed Through的相关文章

[RxJS] Implement RxJS `mergeMap` through inner Observables to Subscribe and Pass Values Through

Understanding sources and subscribers makes it much easier to understand what's going on with mergeMap under the hood. Where a typical operator invokes destination.next directly, mergeMap wraps destination.next inside of a new source/subscriber combo

[RxJS] Implement pause and resume feature correctly through RxJS

Eventually you will feel the need for pausing the observation of an Observable and resuming it later. In this lesson we will learn about use cases where pausing is possible, and what to do when pausing is impossible. const resume$ = new Rx.Subject();

如何理解 RxJS?RxJS的中文API和使用教程

如何理解 RxJS? 我先附上RxJS的中文教程地址方便大家去了解和使用 中文使用教程:http://rxjs-china.org/_book/ 官方中文文档:https://buctwbzs.gitbooks.io/rxjs/content/ 好啦,我们开始讲一下如何理解它 RxJS 可能对很多人而言是一个从没听说过的新名词,那么 RxJS 到底是什么呢?本文中将予以简要介绍 在 Angular 2 中,我们遇到了一个新的概念 —— RxJS. 对很多人而言,这可能是一个比较难以理解的地方.所

[RxJS] What RxJS operators are

We have covered the basics of what is Observable.create, and other creation functions. Now lets finally dive into operators, which are the focus of this course. We will see how operators are simple pure functions attached to the Observable type. var

Angular2中的RxJS

RxJS库 RxJS(Reactive Extensions)是个Angular提供的第三方库,实现异步观察模式(asynchronous observable pattern). 启用RxJS操作 RxJS非常大,通常只要我们所需要的特性就好了.Angular在rxjs/Observable模块中提供了简版的Observable,但是它缺乏我们所需要的所有的操作,包括上面提到的map方法. 我们在应用启动时引入所有RxJS操作: import 'rxjs/Rx'; 首先我们观'rxjs/Obs

angular2 学习笔记 ( rxjs 流 )

RxJS 博大精深,看了好几篇文章都没有明白. 范围牵扯到了函数响应式开发去了... 我对函数式一知半解, 响应式更是第一次听到... 唉...不过日子还是得过...混着过先呗 我目前所理解的很浅, 大致上是这样的概念. 1.某些场景下它比promise好用, 它善于过滤掉不关心的东西. 2.它是观察者模式 + 迭代器模式组成的 3.跟时间,事件, 变量有密切关系 4.世界上有一种东西叫 "流" stream, 一个流能表示了一段时间里,一样东西发生的变化. 比如有一个值, 它在某段时

rxjs全局命名空间

通常我们学习某个js库的功能时,我们会直接在html中用script引入该js库来写一些demo.笔者在学习rxjs时,用script标签引入时,就掉到了坑里. 从6.0.0-alpha.4版本起,(目前版本为6.3.0)全局命名空间为rxjs,使用方法如下: <script src="https://cdn.bootcss.com/rxjs/6.0.0-alpha.4/rxjs.umd.js"></script> <script> /* * htt

rxjs经典场景之http请求

在实际开发中,相信小伙伴们都碰到过接口的串行调用以及并行调用,而并行调用又有两种,一种是只需最先返回的结果即可,这种比较少见,有时候接口变迁,为了平缓过渡,方便容错才会出现:另一种则是需要所有请求的接口数据才能进行下一步,这种比较常见,如账号可用余额是否可以购买商品等. 普通的Ajax请求 在前端远古时代,碰到以上情况,串行通过嵌套调用来处理,并行通过判断是否均完成请求的方式来处理.但是这样子处理实际上并不能说符合预期.首先,嵌套调用的方式,如果里面再涉及到其他回调的话,满满的回调地狱即视感.其

使用 RxJS 实现 JavaScript 的 Reactive 编程

简介 作为有经验的JavaScript开发者,我们会在代码中采用一定程度的异步代码.我们不断地处理用户的输入请求,也从远程获取数据,或者同时运行耗时的计算任务,所有这些都不能让浏览器崩溃.可以说,这些都不是琐碎的任务,它是确切的需求,我们学着去避开同步计算,让模型的时间和延时成为问题的关键.对于简单的应用程序,直接使用JavaScript的主事件系统,甚至使用jQuery库帮助也很常见.然而,还没有适当的模式来扩展的简单代码,解决这些异步问题,满足更丰富的应用特性,满足现代web用户的需求,这些