rxjs系列 -- Observale与Observer

在RxJS中,一个数据流的完整流向至少需要包含Observable和Observer。Observable是被观察者,Observer是观察者,Observer订阅Observable,Observable向Observer推送数据以完成整个过程。

可以说一个完整的RxJS数据流就是Observable和Observer之间的互动游戏。

Observable实现了下面两种设计模式:

观察者模式
迭代器模式

由于不是写设计模式的文章,所以一笔带过,感兴趣的可以自己看下相关的书,需要一提的是,任何一种设计模式,指的都是解决某个特定类型问题的方法。问题复杂多变,往往不是靠单独一种设计模式能够解决的,更需要多种设计模式的组合,而RxJS的Observable就是观察者模式和迭代器模式的组合。

Observale与Observer的互动

订阅

首先我们先创建一个Observable,作为数据源

 const { Observable } = rxjs;    const sourceObservable$ = new Observable(observer => {
        observer.next(‘hello‘);
        observer.next(‘rxjs‘);
    });

然后再设置一个observer,作为订阅者

const Observer = {        next: res => console.log(res)
    };

最后Observer订阅Observable,数据开始流动

   sourceObservable$.subscribe(Observer);    // hello
    // rxjs

一个非常简单的数据流就完成了。

状态流转

在上面的例子中,只有next这一种状态,不断地往下游传递数据,但实际上,数据流在流动过程中有三种状态:

next,正常流转到下一个状态
error,捕获到异常,流动中止
complete,数据流已经完成,流动终止

接下来改一下上面的代码,补上另外两种状态

const sourceObservable$ = new Observable(observer => {
        observer.next(‘hello‘);
        observer.error(‘the data is wrong!‘);
        observer.next(‘rxjs‘);
        observer.complete();
    });    const Observer = {        next: res => console.log(res),        error: err => console.log(err),        complete: () => console.log(‘complete!‘)
    }

    sourceObservable$.subscribe(Observer);    // hello
    // the data is wrong!

现在再看Observer是不是有种似曾相识的感觉?没错,Observer其实就是一个迭代器了。

至此,不难发现,Observable与Observer的互动过程其实就是,Observer通过观察者模式向Observable注入了一个迭代器,通过next游标控制数据源Observable的数据流动,并根据实际流动过程中可能发生的情况将状态流转到error或者complete。

取消订阅

现在Observable与Observer已经通过subscribe建立了联系,但有时候我们需要把这种联系断开,比如组件销毁的时候。这时候就需要取消订阅了,看下面的例子

const sourceObservable$ = new Observable(observer => {        let number = 1;
        setInterval(() => {
            observer.next(number++);
        }, 1000);
    });    const Observer = {        next: res => console.log(res),        error: err => console.log(err),        complete: () => console.log(‘complete!‘)
    }    const subscription = sourceObservable$.subscribe(Observer);

    setTimeout(() => {        // 取消订阅
        subscription.unsubscribe();
    }, 4000);    // 1
    // 2
    // 3
    // 4

需要注意的是,在本例子中,虽然取消订阅了,但是作为数据源的sourceObservable$并没有终结,因为始终没有调用complete方法,只是Observer不再接收推送的数据了

为了便于观察其中的差异,我们将sourceObservable$改一下

const sourceObservable$ = new Observable(observer => {        let number = 1;
        setInterval(() => {            console.log(‘subscribe:‘ + number);
            observer.next(number++);
        }, 1000);
    });    // subscribe: 1
    // 1
    // subscribe: 2
    // 2
    // subscribe: 3
    // 3
    // subscribe: 4
    // 4
    // subscribe: 5
    // subscribe: 6
    // ...

Hot Observable和Cold Observable

假设我们有这样一个场景,一个Observable被两个ObserverA、ObserverB先后相隔N秒订阅了,那么ObserverB是否需要接收订阅之前的数据呢?

其实没有固定的答案,是否接收需要根据实际的业务场景来定,正是因为如此,所以便有了Hot Observable以及Cold Observable。

Hot Observable:热被观察对象,类似于直播,看到的内容是从你打开直播的那一刻开始的,之前的内容已经错过。只能接收订阅那一刻开始的数据。
Cold Observable:冷被观察对象,类似于录播,看到的内容是你打开的视频的第一秒种开始的。每次订阅都会从头开始接收数据

先看下Hot Observable,每次订阅只推送当前的数据,所以不会在每次订阅时重置数据推送,代码如下:

 // 先产生数据
    let number = 1;    const sourceObservale$ = new Observable(observer => {        let num = number;
        setInterval(() => {
            observer.next(num++);
            number = num;
        }, 1000);
    });    const ObserverA = ObserverB = {        next: item => console.log(item),        error: err => console.log(err),        complete: () => console.log(‘complete!‘)
    }

    sourceObservale$.subscribe(ObserverA);

    setTimeout(() => {
        sourceObservale$.subscribe(ObserverB);
    }, 2000);    // 1 => A
    // 2 => A
    // 3 => A
    // 3 => B
    // 4 => A
    // 4 => B
    // ..

而对于Cold Observable,每次订阅都是重头开始推送,所以每一次订阅都会重置数据推送,代码如下:

const sourceObservale$ = new Observable(observer => {        // 订阅时产生数据
        let number = 1;
        setInterval(() => {
            observer.next(number++);
        }, 1000);
    });    // 中间不变
    ...    // 1 => A
    // 2 => A
    // 3 => A
    // 1 => B
    // 4 => A
    // 2 => B
    // ..

从这里也可以看出,Observable是具有惰性求值的,只有在被订阅的时候才会执行内部逻辑,而Cold Observable则更进一步,在没有被订阅的时候,连数据都不会产生。

睿江云官网链接:http://www.eflycloud.com/#register?salesID=6DGNUTUAV

原文地址:https://blog.51cto.com/13475644/2424606

时间: 2024-11-01 21:52:18

rxjs系列 -- Observale与Observer的相关文章

构建流式应用—RxJS详解

讲之前先说一点哈,插入的图片,不能正常显示,图片一半被遮盖了,如果想看,鼠标右击然后图片地址,图片另存为去看.如果有知道怎么解决的,可以在下面给我评论,我会及时的修改. 好啦,开始: 目录 常规方式实现搜索功能 RxJS · 流 Stream RxJS 实现原理简析 观察者模式 迭代器模式 RxJS 的观察者 + 迭代器模式 RxJS 基础实现 Observable Observer RxJS · Operators Operators ·入门 一系列的 Operators 操作 使用 RxJS

设计模式之泛化系列「观察者模式」(Observer)

Observer设计模式是一个泛化(泛化是把特殊代码转换成通用目的代码的过程)系列的设计模式之一.它解决了主题对象和观察者对象会紧紧地耦合在一起的问题. 简单地说,观察者模式定义了一个一对多的信赖关系,让一个或多个观察者对象监察一个主题对象.这样一个主题对象在状态上的变化能够通知所有的信赖于此对象的那些观察者对象,使这些观察者对象能够自动更新.类图如下: 抽象主题(Subject)角色:主题角色把所有观察者对象的引用保存在一个列表里.每个主题都可以有任何数量的观察者.主题提供了可以添加或删除观察

Java 设计模式系列(十六)观察者模式(Observer)

Java 设计模式系列(十六)观察者模式(Observer) 观察者模式是对象的行为模式,又叫发布-订阅(Publish/Subscribe)模式.模型-视图(Model/View)模式.源-监听器(Source/Listener)模式或从属者(Dependents)模式. 观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象.这个主题对象在状态上发生变化时,会通知所有观察者对象,使它们能够自动更新自己. 一.观察者模式的结构 Subject:目标对象,通常具有如下功能:

Vue双向绑定的实现原理系列(三):监听器Observer和订阅者Watcher

监听器Observer和订阅者Watcher 实现简单版Vue的过程,主要实现{{}}.v-model和事件指令的功能 主要分为三个部分 github源码 1.数据监听器Observer,能够对数据对象的所有属性进行监听; 实现数据的双向绑定,首先要对数据进行劫持监听,所以我们需要设置一个监听器Observer,用来监听所有属性 2.Watcher将数据监听器和指令解析器连接起来,数据的属性变动时,执行指令绑定的相应回调函数, 1.如果属性发上变化了,就需要告诉订阅者Watcher看是否需要更新

[RxJS] Subject: an Observable and Observer hybrid

This lesson teaches you how a Subject is simply a hybrid of Observable and Observer which can act as a bridge between the source Observable and multiple observers, effectively making it possible for multiple observers to share the same Observable exe

Android Data Binding 系列(二) -- Binding与Observer实现

写在前面 上篇文章 Android Data Binding 系列(一) – 详细介绍与使用 介绍了 Data Binding 的基础及其用法,本文接上篇,结合DataBindingDemo 来学习下 Data Binding 的实现. 绑定实现 Activity在inflate layout时,通过DataBindingUtil来生成绑定,从代码看,是遍历contentView得到View数组对象,然后通过数据绑定library生成对应的Binding类,含Views.变量.listeners

vue系列---响应式原理实现及Observer源码解析(一)

_ 阅读目录 一. 什么是响应式? 二:如何侦测数据的变化? 2.1 Object.defineProperty() 侦测对象属性值变化 2.2 如何侦测数组的索引值的变化 2.3 如何监听数组内容的增加或减少? 2.4 使用Proxy来实现数据监听 三. Observer源码解析 回到顶部 一. 什么是响应式? 我们可以这样理解,当一个数据状态发生改变的时候,那么与这个数据状态相关的事务也会发生改变.用我们的前端专业术语来讲,当我们JS中的对象数据发生改变的时候,与JS中对象数据相关联的DOM

使用 RxJS 实现 JavaScript 的 Reactive 编程

简介 作为有经验的JavaScript开发者,我们会在代码中采用一定程度的异步代码.我们不断地处理用户的输入请求,也从远程获取数据,或者同时运行耗时的计算任务,所有这些都不能让浏览器崩溃.可以说,这些都不是琐碎的任务,它是确切的需求,我们学着去避开同步计算,让模型的时间和延时成为问题的关键.对于简单的应用程序,直接使用JavaScript的主事件系统,甚至使用jQuery库帮助也很常见.然而,还没有适当的模式来扩展的简单代码,解决这些异步问题,满足更丰富的应用特性,满足现代web用户的需求,这些

rxjs5.X系列 —— transform api 笔记

欢迎指导与讨论:) 前言 本文是笔者翻译 RxJS 5.X 官网各类operation操作系列的的第一篇 -- transform转换.如有错漏,希望大家指出提醒O(∩_∩)O.更详细的资料尽在rxjs官网 [http://reactivex.io/rxjs/manual/overview.htm]与带有demo例子的网站[http://xgrommx.github.io/rx-book/content/observable]. 本文有关于transform操作的内容:buffer.buffer