RxJava----操作符:cold&hot Observable

Observable 数据流有两种类型:hot 和 cold。

cold Observables

只有当有订阅者订阅的时候, Cold Observable 才开始执行发射数据流的代码。并且每个订阅者订阅的时候都独立的执行一遍数据流代码。 Observable.interval 就是一个 Cold Observable。每一个订阅者都会独立的收到他们的数据流。

        try {
            Observable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS);
            Subscription firstSubs = cold.subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    log("First: " + aLong);
                }
            });
            Thread.sleep(500);
            Subscription secondSubs = cold.subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    log("Second: " + aLong);
                }
            });
            Thread.sleep(500);
            firstSubs.unsubscribe();
            secondSubs.unsubscribe();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

结果:

First: 0
First: 1
First: 2
Second: 0
First: 3
Second: 1
First: 4

虽然这两个 Subscriber 订阅到同一个Observable 上,只是订阅的时间不同,他们都收到同样的数据流,但是同一时刻收到的数据是不同的。

我们之前见到的 Observable 都是 Cold Observable。 Observable.create 创建的也是 Cold Observable,而 just, range, timer 和 from 这些创建的同样是 Cold Observable。

hot Observables

Hot observable 不管有没有订阅者订阅,他们创建后就开发发射数据流。 一个比较好的示例就是 鼠标事件。 不管系统有没有订阅者监听鼠标事件,鼠标事件一直在发生,当有订阅者订阅后,从订阅后的事件开始发送给这个订阅者,之前的事件这个订阅者是接受不到的;如果订阅者取消订阅了,鼠标事件依然继续发射。

Publish

Cold Observable 和 Hot Observable 之间可以相互转化。使用 publish 操作函数可以把 Cold Observable 转化为 Hot Observable。

public final ConnectableObservable<T> publish()

publish 返回一个 ConnectableObservable 对象,这个对象是 Observable 的之类,多了三个函数:

public final Subscription connect()
public abstract void connect(Action1<? super Subscription> connection)
public Observable<T> refCount()

另外还有一个重载函数,可以在发射数据之前对数据做些处理:

public final <R> Observable<R> publish(Func1<? super Observable<T>,? extends Observable<R>> selector)

之前介绍的所有对 Observable 的操作都可以在 selector 中使用。你可以通过 selector 参数创建一个 Subscription ,后来的订阅者都订阅到这一个 Subscription 上,这样可以确保所有的订阅者都在同一时刻收到同样的数据。

这个重载函数返回的是 Observable 而不是 ConnectableObservable, 所以下面讨论的操作函数无法在这个重载函数返回值上使用。

connect

ConnectableObservable 如果不调用 connect 函数则不会触发数据流的执行。当调用 connect 函数以后,会创建一个新的 subscription 并订阅到源 Observable (调用 publish 的那个 Observable)。这个 subscription 开始接收数据并把它接收到的数据转发给所有的订阅者。这样,所有的订阅者在同一时刻都可以收到同样的数据。

        try {
            ConnectableObservable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
            cold.connect();
            Subscription firstSubs = cold.subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    log("First: " + aLong);
                }
            });
            Thread.sleep(500);
            Subscription secondSubs = cold.subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    log("Second: " + aLong);
                }
            });
            Thread.sleep(500);
            firstSubs.unsubscribe();
            secondSubs.unsubscribe();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

结果:

First: 0
First: 1
First: 2
Second: 2
First: 3
Second: 3

Hot observables–停止接收数据

停止接收数据有两种方式:

  • 断开长连接,再次connect,那么源Observable从新发射数据,unsubscribe_connect
  • 停止订阅,再定订阅的话,继续接收长连接返回的数据,unsubscribe_subscribe

unsubscribe_connect

  • connect 函数返回的是一个 Subscription,和 Observable.subscribe返回的结果一样。
  • 可以使用这个 Subscription 来取消订阅到 ConnectableObservable。
  • 如果调用 这个 Subscription 的 unsubscribe 函数,可以停止把数据转发给 Observer,但是这些 Observer 并没有从 ConnectableObservable 上取消注册,只是停止接收数据了。
  • 如果再次调用 connect , 则 ConnectableObservable 开始一个新的订阅,在 ConnectableObservable 上订阅的 Observer 会再次开始接收数据。
        try {
            ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
            Subscription connectSubs = connectable.connect();
            connectable.subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    log(aLong.toString());
                }
            });
            Thread.sleep(500);
            log("Close connection");
            connectSubs.unsubscribe();//取消订阅,结束数据流
            Thread.sleep(500);
            log("Reconnecting");
            connectable.connect();//再次连接,数据流重新发射
            Subscription subs = connectable.subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    log(aLong.toString());
                }
            });
            Thread.sleep(500);
            subs.unsubscribe();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

结果:

0
1
Close connection
Reconnecting
0
1

通过调用 connect 来重新开始订阅,会创建一个新的订阅。如果源 Observable 为 Cold Observable 则数据流会重新执行一遍。

unsubscribe_subscribe

如果你不想结束数据流,只想从 publish 返回的 Hot Observable 上取消注册,则可以使用 subscribe 函数返回的 Subscription 对象。

        try {
            ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
            Subscription connectSubs = connectable.connect();
            Subscription firstSubs = connectable.subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    log(aLong.toString());
                }
            });
            Thread.sleep(500);
            log("Close first Subscription");
            firstSubs.unsubscribe();//只是取消订阅,不结束数据流
            Thread.sleep(500);
            log("Start second Subscription");
            Subscription secondSubs = connectable.subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    log(aLong.toString());
                }
            });
            Thread.sleep(500);
            secondSubs.unsubscribe();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

结果:

0
1
Close first Subscription
Start second Subscription
5
6

refCount

  • refCount 返回一个特殊的 Observable, 这个 Observable 只要有订阅者就会继续发射数据。
  • 如果没有订阅者,就停止
  • 再次订阅,重新发射
        try {
            Observable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS).publish().refCount();
            Subscription firstSubs = cold.subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    log("First: " + aLong);
                }
            });
            Thread.sleep(500);
            Subscription secondSubs = cold.subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    log("Second: " + aLong);
                }
            });
            Thread.sleep(500);
            log("Unsubscribe second");
            secondSubs.unsubscribe();
            Thread.sleep(500);
            log("Unsubscribe first");
            firstSubs.unsubscribe();

            log("First connection again");
            Thread.sleep(500);
            firstSubs = cold.subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    log("First: " + aLong);
                }
            });
            Thread.sleep(500);
            firstSubs.unsubscribe();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

结果:

First: 0
First: 1
First: 2
Second: 2
First: 3
Second: 3
First: 4
Unsubscribe second
Second: 4
First: 5
First: 6
Unsubscribe first
First connection again
First: 0
First: 1

replay

在publish的基础上添加了缓存

public final ConnectableObservable<T> replay()

         try {
            ConnectableObservable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS).doOnNext(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    log("doNext:" + aLong.toString());
                }
            }).replay();
//            }).replay(2);
            cold
                    .doOnSubscribe(new Action0() {
                        @Override
                        public void call() {
                            log("Subscribed");
                        }
                    })
                    .doOnUnsubscribe(new Action0() {
                        @Override
                        public void call() {
                            log("Unsubscribed");
                        }
                    });
            Subscription connectSubs = cold.connect();
            log("Subscribe first" + "--time:" + System.currentTimeMillis());
            Subscription firstSubs = cold.subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    log("First: " + aLong + "--time:" + System.currentTimeMillis());
                }
            });
            Thread.sleep(700);
            log("Subscribe second" + "--time:" + System.currentTimeMillis());
            Subscription secondSubs = cold.subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    log("Second: " + aLong + "--time:" + System.currentTimeMillis());
                }
            });
            Thread.sleep(500);
            firstSubs.unsubscribe();
            secondSubs.unsubscribe();
            Thread.sleep(500);
            connectSubs.unsubscribe();//可以通过connectSubs断开长连接
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

结果:

emo D/pepe: Subscribe first--time:1462778519645
doNext:0
First: 0--time:1462778519845
doNext:1
First: 1--time:1462778520044
doNext:2
First: 2--time:1462778520245
Subscribe second--time:1462778520346
Second: 0--time:1462778520346
Second: 1--time:1462778520346
Second: 2--time:1462778520346
doNext:3
First: 3--time:1462778520444
Second: 3--time:1462778520444
doNext:4
First: 4--time:1462778520644
Second: 4--time:1462778520644
doNext:5
First: 5--time:1462778520843
Second: 5--time:1462778520843
doNext:6
doNext:7

通过上面的结果可以发现:

  • 第二个订阅者订阅后,会立即接收缓存的数据
  • 并且接收的缓存数据和最新发射的数据没有时间上的冲突,马上同步接收数据

replay 有 8个重载函数:

ConnectableObservable<T> replay()
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector)
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector,int bufferSize)
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector,int bufferSize, long time, java.util.concurrent.TimeUnit unit)
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector,long time, java.util.concurrent.TimeUnit unit)
ConnectableObservable<T> replay(int bufferSize)
ConnectableObservable<T> replay(int bufferSize, long time, java.util.concurrent.TimeUnit unit)
ConnectableObservable<T> replay(long time, java.util.concurrent.TimeUnit unit)

有三个参数 bufferSize、 selector 和 time (以及指定时间单位的 unit)

  • bufferSize 用来指定缓存的最大数量。当新的 Observer 订阅的时候,最多只能收到 bufferSize 个之前缓存的数据。
  • time, unit 用来指定一个数据存货的时间,新订阅的 Observer 只能收到时间不超过这个参数的数据。
  • selector 和 publish(selector) 用来转换重复的 Observable。

cache

  • cache 操作函数和 replay 类似,但是隐藏了 ConnectableObservable ,并且不用管理 subscription 了。
  • 只要第一个订阅者订阅了,内部的 ConnectableObservable 就链接到源 Observable上了,并且不会取消订阅了。
  • 后来的订阅者会收到之前缓存的数据,但是并不会重新订阅到源 Observable 上。.
  • 即使所有订阅者都取消,内部的ConnectableObservable仍然也不会停止
  • 也就是一个长连接一直存在,因为在内部实现connect,没有返回connectSubs,也就无法调用connectSubs.unsubscribe();
public final Observable<T> cache()
public final Observable<T> cache(int capacity)

        try {
            Observable<Long> obs = Observable.interval(100, TimeUnit.MILLISECONDS)
                    .take(8)
                    .doOnNext(new Action1<Long>() {
                        @Override
                        public void call(Long aLong) {
                            log("doNext:" + aLong.toString());
                        }
                    })
                    .cache()
                    .doOnSubscribe(new Action0() {
                        @Override
                        public void call() {
                            log("Subscribed");
                        }
                    })
                    .doOnUnsubscribe(new Action0() {
                        @Override
                        public void call() {
                            log("Unsubscribed");
                        }
                    });
            Subscription subscription = obs.subscribe();
            Thread.sleep(500);
            subscription.unsubscribe();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

结果:

Subscribed
doNext:0
doNext:1
doNext:2
doNext:3
doNext:4
0Unsubscribed
doNext:5
doNext:6
doNext:7

上面的示例中,doOnNext 打印源 Observable 发射的每个数据。而 doOnSubscribe 和doOnUnsubscribe 打印缓存后的 Observable 的订阅和取消订阅事件。可以看到当订阅者订阅的时候,数据流开始发射,取消订阅数据流并不会停止。

项目源码 GitHub求赞,谢谢!

引用:

RxJava 教程第三部分:驯服数据流之 hot & cold Observable - 云在千峰

时间: 2024-11-14 22:13:50

RxJava----操作符:cold&hot Observable的相关文章

RxJava操作符——条件和布尔操作符(Conditional and Boolean Operators)

RxJava系列教程: 1. RxJava使用介绍 [视频教程] 2. RxJava操作符 ? Creating Observables(Observable的创建操作符) [视频教程] ? Transforming Observables(Observable的转换操作符) [视频教程] ? Filtering Observables(Observable的过滤操作符) [视频教程] ? Combining Observables(Observable的组合操作符) [视频教程] ? Erro

RxJava操作符(03-变换操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51649975 本文出自:[openXu的博客] 目录: Buffer FlatMap flatMapIterable concatMap switchMap GroupBy Map cast Scan Window 源码下载 ??变换操作符的作用是对Observable发射的数据按照一定规则做一些变换操作,然后将变换后的数据发射出去,接下来看看RxJava中主要有哪些变换操作符: 1. B

RxJava操作符 -创建型

操作符类型 创建操作 变换操作 过滤操作 组合操作 错误处理 辅助操作 条件和布尔操作 算术和聚合操作 连接操作 转换操作 创建操作 create 你可以使用create操作符从头开始创建一个Observable,给这个操作符传递一个接受观察者作为参数的函数,编写这个函数让它的行为表现为一个Observable–恰当的调用观察者的onNext,onError和onCompleted方法. 一个形式正确的有限Observable必须尝试调用观察者的onCompleted正好一次或者它的onErro

RxJava操作符总结之过滤

RxJava操作符总结之过滤 jsut() just(T t1, T t2, T t3 ....) ,just能够传入多个同样类型的參数,并将当前參数一个接着一个的发送. Observable.just("1","2","3") .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); 1

Android RxJava操作符一览

前言 把现在接触到的操作符全部整理进来,方便查阅,遇到新的也会添加进来.和RxJavaLearn 的README.md同步更新. 操作符决策树 直接创建一个Observable(创建操作) 组合多个Observable(组合操作) 对Observable发射的数据执行变换操作(变换操作) 从Observable发射的数据中取特定的值(过滤操作) 转发Observable的部分值(条件/布尔/过滤操作) 对Observable发射的数据序列求值(算术/聚合操作) 创建操作 用于创建Observab

RxJava操作符(05-结合操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51656736 本文出自:[openXu的博客] 目录: CombineLatest Join Merge StartWith Switch Zip 源码下载 结合操作就是将多个Observable发射的数据按照一定规则组合后发射出去,接下来看看RxJava中的结合操作符: 1. CombineLatest ??当两个Observables中的任何一个发射数据时,使用一个函数结合每个Obse

RxJava操作符(04-过滤操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51656494 本文出自:[openXu的博客] 目录: Debounce Distinct ElementAt Filter First Last IgnoreElements SampleThrottleFirst SkipSkipLast TakeTakeLast 源码下载 "过滤操作",顾名思义,就是过滤掉Observable发射的一些数据,不让他发射出去,也就是忽略丢弃掉

RxJava操作符(02-创建操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51645348 本文出自:[openXu的博客] 目录: Create Defer EmptyNeverThrow From Interval Just Range Repeat Timer 源码下载 ??在上一篇博客中我们初步体验了Rxjava的使用,领略了RxJava对于异步操作流编码的简洁风格,接下来的一系列博客,我们主要学习RxJava中的操作符,也就是RxJava的一些API,由于

RxJava操作符(09-算术/聚合操作&amp;连接操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51692493 本文出自:[openXu的博客] 目录: 算术聚合 Count Concat Reduce 连接操作 Publish Connect RefCount Replay 源码下载 算术&聚合 1. Count ??Count操作符将一个Observable转换成一个发射单个值的Observable,这个值表示原始Observable发射的数据的数量. ?? 如果原始Observa

RxJava操作符(二) __变换操作

RxJava变换操作符 这周真心比较累,一直都在加班,今天才有点自己的时间来学习新的内容,外包工作苦啊! 上周学习了各种创建操作符,像create,from,Just,Defer-.等等,这周中也工作中也用了不少,有时间也需要总结一下自己在工作中使用的操作符.好了,现在来开始学习一个变换操作符吧,不知道什么意思没关系,一个一个去试错吧. map 官方的翻译是对于Observable发射的每一项数据,都会应用一个函数,执行变换操作,然后返回一个发射这些结果的Observable. 还是举个例子吧,