[RxJS] Creating Observable From Scratch

Get a better understanding of the RxJS Observable by implementing one that‘s similar from the ground up.

class SafeObserver {
  constructor(destination) {
    this.destination = destination;

  next(value) {
    const destination = this.destination;
    if (destination.next && !this.isUnsubscribed) {
      destination.next && destination.next(value);

  error(err) {
    const destination = this.destination;
    if (!this.isUnsubscribed) {
      if (destination.error) {

  complete() {
    const destination = this.destination;
    if (!this.isUnsubscribed) {
      if (destination.complete) {

  unsubscribe() {
    this.isUnsubscribed = true;
    if (this._unsubscribe) {

class Observable {
  constructor(_subscribe) {
    this._subscribe = _subscribe;

  subscribe(observer) {
    const safeObserver = new SafeObserver(observer);
    safeObserver._unsubscribe = this._subscribe(safeObserver);
    return () => safeObserver.unsubscribe();

const myObservable = new Observable((observer) => {
  let i = 0;
  const id = setInterval(() => {
    if (i < 10) {
    } else {
  }, 100);

  return () => {

const observer = {
  next(value) { console.log(‘next -> ‘ + value); },
  error(err) { },
  complete() { console.log(‘complete‘); }

const foo = myObservable.subscribe(observer);

