RXJS Observable的冷,热和Subject

一、Observable的冷和热

Observable 热:直播。所有的观察者,无论进来的早还是晚,看到的是同样内容的同样进度,订阅的时候得到的都是最新时刻发送的值。

Observable 冷:点播。 新的订阅者每次从头开始。

冷的Observable例子:

一开始有个订阅者,

两秒后又有个订阅者,这两个序列按照自己的节奏走的,不同步。每个流进行都会从interval的0开始。

console.log(‘RxJS included?‘, !!Rx);

const count$ = Rx.Observable.interval(1000).take(5);
const sub1 = count$.subscribe((val)=>{
  console.log(val);
});

setTimeout(function(){
  const sub2 = count$.subscribe((val)=>{
  console.log(val);
});
},2000);

热的Observable例子

第二个订阅者直接从2开始起,跟第一个订阅者看到的内容是一样的。

const count$ = Rx.Observable.interval(1000).take(5).share();
const sub1 = count$.subscribe((val)=>{
  console.log(val);
});

setTimeout(function(){
  const sub2 = count$.subscribe((val)=>{
  console.log(val);
});
},2000);

二、Subject

Subject即是观察者Observer,也是被观察对象Observable,同时实现了这两个接口。

意味着

  • 一方面它可以作为流的组成的一方,输出的一方。
  • 另一方面,它可以作为流的观察一方,接收一方。

Subject分为ReplaySubject和BehaviorSubject。

ReplaySubject:这种Subject会保留最新的n个值

BehaviorSubject:是ReplaySubject的特殊形式。 保留最新的一个值

1、subscribe的等价写法

subscribe 后面写的一个函数,相当于语法糖,快捷方式,临时创建冷一个observer对象。

默认情况应该是传入一个observer对象

console.log(‘RxJS included?‘, !!Rx);

const counter$ = Rx.Observable.interval(1000).take(5);

const subject = new Rx.Subject();

const observer1 = {
  next: (val)=>{console.log(‘1: ‘ +val);},
  error: (err)=>{console.log(‘ERROR>> 1:‘+ err);},
  complete: ()=>{console.log(‘1 is complete‘);}
}

const observer2 = {
  next: (val)=>{console.log(‘2: ‘ +val);},
  error: (err)=>{console.log(‘ERROR>> 2:‘+ err);},
  complete: ()=>{console.log(‘2 is complete‘);}
}

//等价写法
counter$.subscribe(val =>{console.log(val);});
counter$.subscribe(observer2); 

2、两个observer ,两次subscribe

console.log(‘RxJS included?‘, !!Rx);

const counter$ = Rx.Observable.interval(1000).take(5);

const subject = new Rx.Subject();

const observer1 = {
  next: (val)=>{console.log(‘1: ‘ +val);},
  error: (err)=>{console.log(‘ERROR>> 1:‘+ err);},
  complete: ()=>{console.log(‘1 is complete‘);}
}

const observer2 = {
  next: (val)=>{console.log(‘2: ‘ +val);},
  error: (err)=>{console.log(‘ERROR>> 2:‘+ err);},
  complete: ()=>{console.log(‘2 is complete‘);}
}

counter$.subscribe(observer1);

setTimeout(function(){
  counter$.subscribe(observer2);
},2000);

问题:需要在两处执行subscribe,很多情况下是这样的,定义好这些序列应该在什么时候被触发,我执行执行一句subscribe(),两个序列都会这么执行。这种情况下就需要用subject()。

3、subject

subject即使observable,因为它可以subscribe observer。

也是observer,因为它可以被observable subscribe。

console.log(‘RxJS included?‘, !!Rx);

const counter$ = Rx.Observable.interval(1000).take(5);

const subject = new Rx.Subject();

const observer1 = {
  next: (val)=>{console.log(‘1: ‘ +val);},
  error: (err)=>{console.log(‘ERROR>> 1:‘+ err);},
  complete: ()=>{console.log(‘1 is complete‘);}
}

const observer2 = {
  next: (val)=>{console.log(‘2: ‘ +val);},
  error: (err)=>{console.log(‘ERROR>> 2:‘+ err);},
  complete: ()=>{console.log(‘2 is complete‘);}
}

//不再用counter$去subscribe,用subject去subscribe,
subject.subscribe(observer1);

setTimeout(function(){
  subject.subscribe(observer2);
},2000);

//定义好两边后,用counter$去subscribe
counter$.subscribe(subject);

用一句执行counter$.subscribe(subject),把定义好的序列,包括等待2秒的序列全部完成了。

4,subject是一个hot observable

往流里推送新值

第二个拿不到新值,因为第二个流订阅的时候,两个新值已经过去了。

5,ReplaySubject

replay把过去发生的事件进行重播。

ReplaySubject(2)把过去的2个事件进行重播。这样observer1 subscribe的时候就可以看到10和11。

6、BehaviorSubject只记住最新的值

总有一个最新值,总记住上一次的最新值

console.log(‘RxJS included?‘, !!Rx);

const counter$ = Rx.Observable.interval(1000).take(5);

const subject = new Rx.BehaviorSubject();

subject.next(10);
subject.next(11);
const observer1 = {
  next: (val)=>{console.log(‘1: ‘ +val);},
  error: (err)=>{console.log(‘ERROR>> 1:‘+ err);},
  complete: ()=>{console.log(‘1 is complete‘);}
}

const observer2 = {
  next: (val)=>{console.log(‘2: ‘ +val);},
  error: (err)=>{console.log(‘ERROR>> 2:‘+ err);},
  complete: ()=>{console.log(‘2 is complete‘);}
}

//不再用counter$去subscribe,用subject去subscribe,
subject.subscribe(observer1);

setTimeout(function(){
  subject.subscribe(observer2);
},2000);

//定义好两边后,用counter$去subscribe
counter$.subscribe(subject);

取值的时候,会取得到最新的data,尽管在取值的时候也就是subscribre的时候值已经发射完了,尽管时机已经错失了还是能够得到它上一次发射之后的最新的一个值。

三、Angular中对Rx的支持

大量内置Observable支持:如Http,ReactiveForms,Route等。

Async Pipe是什么?有什么用?

Observable需要subscribe 一下,成员数组变量等于Observable得到的值。

使用Async Pipe可以直接使用Observable,还不用去取消订阅。

memberResults$: Observable<User[]>;

本文作者starof,因知识本身在变化,作者也在不断学习成长,文章内容也不定时更新,为避免误导读者,方便追根溯源,请诸位转载注明出处:https://www.cnblogs.com/starof/p/10505617.html 有问题欢迎与我讨论,共同进步。

原文地址:https://www.cnblogs.com/starof/p/10505617.html

时间: 2024-08-28 21:51:48

RXJS Observable的冷,热和Subject的相关文章

[RxJS] Split an RxJS observable conditionally with windowToggle

There are variants of the window operator that allow you to split RxJS observables in different ways. In this lesson we will explore the windowToggle variant and see one of its use cases in user interfaces. Let's say we want to build a new functional

[RxJS] Split an RxJS Observable into groups with groupBy

groupBy() is another RxJS operator to create higher order observables. In this lesson we will learn how groupBy works for routing source values into different groups according to a calculated key. const numbersObservable = Rx.Observable.interval(500)

Angular 4+ Http

HTTP: 使应用能够对远端服务器发起相应的Http调用: 你要知道: HttpModule并不是Angular的核心模块,它是Angualr用来进行Web访问的一种可选方式,并位于一个名叫@angual/http的独立附属模块中:也就是说:使用http之前要引入此模块; 1.基本使用: import { BrowserModule } from '@angular/platform-browser'; import { NgModule } from '@angular/core'; impo

[RxJS] Add debug method to Observable in TypeScript

Observable.prototype.debug = function(message: any) { return this.do( (next) => { if(!environment.production) { console.log(message, next); } }, (err) => { if(!environment.production) { console.error(message, err) } }, () => { if(!environment.pro

[RxJS] Creating Observable From Scratch

Get a better understanding of the RxJS Observable by implementing one that's similar from the ground up. class SafeObserver { constructor(destination) { this.destination = destination; } next(value) { const destination = this.destination; if (destina

关于rxjs subject订阅分发实现Angular的全局数据管理与同步更新

自定义实现angular中数据的状态管理,如有不妥请指正 一.先介绍一下rxjs中subject: Import {subject}from’rxjs’ Subject 数据的订阅与分发,结合报刊的发布与订阅进行功能的模拟,subject即是observeable对象也是observer对象,subject对于后期没有数据更新时所添加的订阅者是不怎么友好的,因为不跟新数据时订阅者就不在收到返回的数值     const interval$ = interval(1000).pipe(take(1

[RxJS] Connection operator: multicast and connect

We have seen how Subjects are useful for sharing an execution of an RxJS observable to multiple observers. However, this technique requires some laborious setting up. In this lesson we will learn about the multicast() operator which helps solve the s

Angular2中的RxJS

RxJS库 RxJS(Reactive Extensions)是个Angular提供的第三方库,实现异步观察模式(asynchronous observable pattern). 启用RxJS操作 RxJS非常大,通常只要我们所需要的特性就好了.Angular在rxjs/Observable模块中提供了简版的Observable,但是它缺乏我们所需要的所有的操作,包括上面提到的map方法. 我们在应用启动时引入所有RxJS操作: import 'rxjs/Rx'; 首先我们观'rxjs/Obs

rxjs简单入门

rxjs全名Reactive Extensions for JavaScript,Javascript的响应式扩展, 响应式的思路是把随时间不断变化的数据.状态.事件等等转成可被观察的序列(Observable Sequence),然后订阅序列中那些Observable对象的变化,一旦变化,就会执行事先安排好的各种转换和操作 rxjs适用于异步场景,即前端交互中接口请求.浏览器事件以及自定义事件.通过使用rxjs带给我们前所未有的开发体验. 统一异步编程的规范,不管是Promise.ajax还是