Get a better understanding of the RxJS Observable by implementing one that‘s similar from the ground up.
class SafeObserver { constructor(destination) { this.destination = destination; } next(value) { const destination = this.destination; if (destination.next && !this.isUnsubscribed) { destination.next && destination.next(value); } } error(err) { const destination = this.destination; if (!this.isUnsubscribed) { if (destination.error) { destination.error(error); } this.unsubscribe(); } } complete() { const destination = this.destination; if (!this.isUnsubscribed) { if (destination.complete) { destination.complete(); } this.unsubscribe(); } } unsubscribe() { this.isUnsubscribed = true; if (this._unsubscribe) { this._unsubscribe(); } } } class Observable { constructor(_subscribe) { this._subscribe = _subscribe; } subscribe(observer) { const safeObserver = new SafeObserver(observer); safeObserver._unsubscribe = this._subscribe(safeObserver); return () => safeObserver.unsubscribe(); } } const myObservable = new Observable((observer) => { let i = 0; const id = setInterval(() => { if (i < 10) { observer.next(i++); } else { observer.complete(); } }, 100); return () => { console.log(‘unsubbed‘); clearInterval(id); }; }); const observer = { next(value) { console.log(‘next -> ‘ + value); }, error(err) { }, complete() { console.log(‘complete‘); } }; const foo = myObservable.subscribe(observer); foo.unsubscribe();
时间: 2024-12-17 23:08:22