[RxJS] Creating Observable From Scratch

Get a better understanding of the RxJS Observable by implementing one that‘s similar from the ground up.

class SafeObserver {
  constructor(destination) {
    this.destination = destination;
  }

  next(value) {
    const destination = this.destination;
    if (destination.next && !this.isUnsubscribed) {
      destination.next && destination.next(value);
    }
  }

  error(err) {
    const destination = this.destination;
    if (!this.isUnsubscribed) {
      if (destination.error) {
        destination.error(error);
      }
      this.unsubscribe();
    }
  }

  complete() {
    const destination = this.destination;
    if (!this.isUnsubscribed) {
      if (destination.complete) {
        destination.complete();
      }
      this.unsubscribe();
    }
  }

  unsubscribe() {
    this.isUnsubscribed = true;
    if (this._unsubscribe) {
      this._unsubscribe();
    }
  }
}

class Observable {
  constructor(_subscribe) {
    this._subscribe = _subscribe;
  }

  subscribe(observer) {
    const safeObserver = new SafeObserver(observer);
    safeObserver._unsubscribe = this._subscribe(safeObserver);
    return () => safeObserver.unsubscribe();
  }
}

const myObservable = new Observable((observer) => {
  let i = 0;
  const id = setInterval(() => {
    if (i < 10) {
      observer.next(i++);
    } else {
      observer.complete();
    }
  }, 100);

  return () => {
    console.log(‘unsubbed‘);
    clearInterval(id);
  };
});

const observer = {
  next(value) { console.log(‘next -> ‘ + value); },
  error(err) { },
  complete() { console.log(‘complete‘); }
};

const foo = myObservable.subscribe(observer);

foo.unsubscribe();
时间: 2024-12-17 23:08:22

[RxJS] Creating Observable From Scratch的相关文章

[rxjs] Creating An Observable with RxJS

Create an observable var Observable = Rx.Observable; var source = Observable.create(function(observe){ var person = { name: "Zhentian", message: "Hello World!" }; observe.onNext(person); observe.onCompleted(); }); var sub = source.subs

rxjs创建异步数据的Observable

interval和timer:定时产生数据 interval的参数是1000,在1秒的时刻吐出0,2s吐出1,3s吐出2,........ 这个数据流不会完结,因为interval不会主动调用下游的complete,要想停止这个数据的序列,必须要做退订的动作. import { Observable } from 'rxjs'; import 'rxjs/add/observable/interval'.... let source$ = Observable.interval(1000); s

rxjs 入门--环境配置

原文: https://codingthesmartway.com/getting-started-with-rxjs-part-1-setting-up-the-development-environment-creating-observables/ ---------------------------------------------------------------- Getting Started With RxJS – Part 1: Setting Up The Develo

rxjs简单入门

rxjs全名Reactive Extensions for JavaScript,Javascript的响应式扩展, 响应式的思路是把随时间不断变化的数据.状态.事件等等转成可被观察的序列(Observable Sequence),然后订阅序列中那些Observable对象的变化,一旦变化,就会执行事先安排好的各种转换和操作 rxjs适用于异步场景,即前端交互中接口请求.浏览器事件以及自定义事件.通过使用rxjs带给我们前所未有的开发体验. 统一异步编程的规范,不管是Promise.ajax还是

构建流式应用—RxJS详解

讲之前先说一点哈,插入的图片,不能正常显示,图片一半被遮盖了,如果想看,鼠标右击然后图片地址,图片另存为去看.如果有知道怎么解决的,可以在下面给我评论,我会及时的修改. 好啦,开始: 目录 常规方式实现搜索功能 RxJS · 流 Stream RxJS 实现原理简析 观察者模式 迭代器模式 RxJS 的观察者 + 迭代器模式 RxJS 基础实现 Observable Observer RxJS · Operators Operators ·入门 一系列的 Operators 操作 使用 RxJS

[Angular 2] Managing State in RxJS with StartWith and Scan

The scan operator in RxJS is the main key to managing values and states in your stream. Scan behaves just as a reduce function would, but scan is able to collect values from streams over time. This lesson covers using startWith to set the initial acc

[Angular 2] Rendering an Observable Date with the Async and Date Pipes

Instead of simply pushing numbers on a timer into the template, now we'll move on to pushing actual Dates. We'll still use the Async pipe, but we'll also add on the Date pipe with some formatting to display the Date just the way we want it. import {C

[Angular 2] Rendering an Observable with the Async Pipe

Angular 2 templates use a special Async pipe to be able to render out Observables. This lesson covers the syntax used to create an Observable in Angular 2 and then to render it out in the template. import {Component} from 'angular2/core'; import {boo

angular2 学习笔记 ( rxjs 流 )

RxJS 博大精深,看了好几篇文章都没有明白. 范围牵扯到了函数响应式开发去了... 我对函数式一知半解, 响应式更是第一次听到... 唉...不过日子还是得过...混着过先呗 我目前所理解的很浅, 大致上是这样的概念. 1.某些场景下它比promise好用, 它善于过滤掉不关心的东西. 2.它是观察者模式 + 迭代器模式组成的 3.跟时间,事件, 变量有密切关系 4.世界上有一种东西叫 "流" stream, 一个流能表示了一段时间里,一样东西发生的变化. 比如有一个值, 它在某段时