Observable 数据流有两种类型:hot 和 cold。
cold Observables
只有当有订阅者订阅的时候, Cold Observable 才开始执行发射数据流的代码。并且每个订阅者订阅的时候都独立的执行一遍数据流代码。 Observable.interval 就是一个 Cold Observable。每一个订阅者都会独立的收到他们的数据流。
try {
Observable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS);
Subscription firstSubs = cold.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("First: " + aLong);
}
});
Thread.sleep(500);
Subscription secondSubs = cold.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("Second: " + aLong);
}
});
Thread.sleep(500);
firstSubs.unsubscribe();
secondSubs.unsubscribe();
} catch (InterruptedException e) {
e.printStackTrace();
}
结果:
First: 0
First: 1
First: 2
Second: 0
First: 3
Second: 1
First: 4
虽然这两个 Subscriber 订阅到同一个Observable 上,只是订阅的时间不同,他们都收到同样的数据流,但是同一时刻收到的数据是不同的。
我们之前见到的 Observable 都是 Cold Observable。 Observable.create 创建的也是 Cold Observable,而 just, range, timer 和 from 这些创建的同样是 Cold Observable。
hot Observables
Hot observable 不管有没有订阅者订阅,他们创建后就开发发射数据流。 一个比较好的示例就是 鼠标事件。 不管系统有没有订阅者监听鼠标事件,鼠标事件一直在发生,当有订阅者订阅后,从订阅后的事件开始发送给这个订阅者,之前的事件这个订阅者是接受不到的;如果订阅者取消订阅了,鼠标事件依然继续发射。
Publish
Cold Observable 和 Hot Observable 之间可以相互转化。使用 publish 操作函数可以把 Cold Observable 转化为 Hot Observable。
public final ConnectableObservable<T> publish()
publish 返回一个 ConnectableObservable 对象,这个对象是 Observable 的之类,多了三个函数:
public final Subscription connect()
public abstract void connect(Action1<? super Subscription> connection)
public Observable<T> refCount()
另外还有一个重载函数,可以在发射数据之前对数据做些处理:
public final <R> Observable<R> publish(Func1<? super Observable<T>,? extends Observable<R>> selector)
之前介绍的所有对 Observable 的操作都可以在 selector 中使用。你可以通过 selector 参数创建一个 Subscription ,后来的订阅者都订阅到这一个 Subscription 上,这样可以确保所有的订阅者都在同一时刻收到同样的数据。
这个重载函数返回的是 Observable 而不是 ConnectableObservable, 所以下面讨论的操作函数无法在这个重载函数返回值上使用。
connect
ConnectableObservable 如果不调用 connect 函数则不会触发数据流的执行。当调用 connect 函数以后,会创建一个新的 subscription 并订阅到源 Observable (调用 publish 的那个 Observable)。这个 subscription 开始接收数据并把它接收到的数据转发给所有的订阅者。这样,所有的订阅者在同一时刻都可以收到同样的数据。
try {
ConnectableObservable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
cold.connect();
Subscription firstSubs = cold.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("First: " + aLong);
}
});
Thread.sleep(500);
Subscription secondSubs = cold.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("Second: " + aLong);
}
});
Thread.sleep(500);
firstSubs.unsubscribe();
secondSubs.unsubscribe();
} catch (InterruptedException e) {
e.printStackTrace();
}
结果:
First: 0
First: 1
First: 2
Second: 2
First: 3
Second: 3
Hot observables–停止接收数据
停止接收数据有两种方式:
- 断开长连接,再次connect,那么源Observable从新发射数据,unsubscribe_connect
- 停止订阅,再定订阅的话,继续接收长连接返回的数据,unsubscribe_subscribe
unsubscribe_connect
- connect 函数返回的是一个 Subscription,和 Observable.subscribe返回的结果一样。
- 可以使用这个 Subscription 来取消订阅到 ConnectableObservable。
- 如果调用 这个 Subscription 的 unsubscribe 函数,可以停止把数据转发给 Observer,但是这些 Observer 并没有从 ConnectableObservable 上取消注册,只是停止接收数据了。
- 如果再次调用 connect , 则 ConnectableObservable 开始一个新的订阅,在 ConnectableObservable 上订阅的 Observer 会再次开始接收数据。
try {
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Subscription connectSubs = connectable.connect();
connectable.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log(aLong.toString());
}
});
Thread.sleep(500);
log("Close connection");
connectSubs.unsubscribe();//取消订阅,结束数据流
Thread.sleep(500);
log("Reconnecting");
connectable.connect();//再次连接,数据流重新发射
Subscription subs = connectable.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log(aLong.toString());
}
});
Thread.sleep(500);
subs.unsubscribe();
} catch (InterruptedException e) {
e.printStackTrace();
}
结果:
0
1
Close connection
Reconnecting
0
1
通过调用 connect 来重新开始订阅,会创建一个新的订阅。如果源 Observable 为 Cold Observable 则数据流会重新执行一遍。
unsubscribe_subscribe
如果你不想结束数据流,只想从 publish 返回的 Hot Observable 上取消注册,则可以使用 subscribe 函数返回的 Subscription 对象。
try {
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Subscription connectSubs = connectable.connect();
Subscription firstSubs = connectable.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log(aLong.toString());
}
});
Thread.sleep(500);
log("Close first Subscription");
firstSubs.unsubscribe();//只是取消订阅,不结束数据流
Thread.sleep(500);
log("Start second Subscription");
Subscription secondSubs = connectable.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log(aLong.toString());
}
});
Thread.sleep(500);
secondSubs.unsubscribe();
} catch (InterruptedException e) {
e.printStackTrace();
}
结果:
0
1
Close first Subscription
Start second Subscription
5
6
refCount
- refCount 返回一个特殊的 Observable, 这个 Observable 只要有订阅者就会继续发射数据。
- 如果没有订阅者,就停止
- 再次订阅,重新发射
try {
Observable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS).publish().refCount();
Subscription firstSubs = cold.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("First: " + aLong);
}
});
Thread.sleep(500);
Subscription secondSubs = cold.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("Second: " + aLong);
}
});
Thread.sleep(500);
log("Unsubscribe second");
secondSubs.unsubscribe();
Thread.sleep(500);
log("Unsubscribe first");
firstSubs.unsubscribe();
log("First connection again");
Thread.sleep(500);
firstSubs = cold.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("First: " + aLong);
}
});
Thread.sleep(500);
firstSubs.unsubscribe();
} catch (InterruptedException e) {
e.printStackTrace();
}
结果:
First: 0
First: 1
First: 2
Second: 2
First: 3
Second: 3
First: 4
Unsubscribe second
Second: 4
First: 5
First: 6
Unsubscribe first
First connection again
First: 0
First: 1
replay
在publish的基础上添加了缓存
public final ConnectableObservable<T> replay()
try {
ConnectableObservable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS).doOnNext(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("doNext:" + aLong.toString());
}
}).replay();
// }).replay(2);
cold
.doOnSubscribe(new Action0() {
@Override
public void call() {
log("Subscribed");
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
log("Unsubscribed");
}
});
Subscription connectSubs = cold.connect();
log("Subscribe first" + "--time:" + System.currentTimeMillis());
Subscription firstSubs = cold.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("First: " + aLong + "--time:" + System.currentTimeMillis());
}
});
Thread.sleep(700);
log("Subscribe second" + "--time:" + System.currentTimeMillis());
Subscription secondSubs = cold.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("Second: " + aLong + "--time:" + System.currentTimeMillis());
}
});
Thread.sleep(500);
firstSubs.unsubscribe();
secondSubs.unsubscribe();
Thread.sleep(500);
connectSubs.unsubscribe();//可以通过connectSubs断开长连接
} catch (InterruptedException e) {
e.printStackTrace();
}
结果:
emo D/pepe: Subscribe first--time:1462778519645
doNext:0
First: 0--time:1462778519845
doNext:1
First: 1--time:1462778520044
doNext:2
First: 2--time:1462778520245
Subscribe second--time:1462778520346
Second: 0--time:1462778520346
Second: 1--time:1462778520346
Second: 2--time:1462778520346
doNext:3
First: 3--time:1462778520444
Second: 3--time:1462778520444
doNext:4
First: 4--time:1462778520644
Second: 4--time:1462778520644
doNext:5
First: 5--time:1462778520843
Second: 5--time:1462778520843
doNext:6
doNext:7
通过上面的结果可以发现:
- 第二个订阅者订阅后,会立即接收缓存的数据
- 并且接收的缓存数据和最新发射的数据没有时间上的冲突,马上同步接收数据
replay 有 8个重载函数:
ConnectableObservable<T> replay()
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector)
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector,int bufferSize)
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector,int bufferSize, long time, java.util.concurrent.TimeUnit unit)
<R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector,long time, java.util.concurrent.TimeUnit unit)
ConnectableObservable<T> replay(int bufferSize)
ConnectableObservable<T> replay(int bufferSize, long time, java.util.concurrent.TimeUnit unit)
ConnectableObservable<T> replay(long time, java.util.concurrent.TimeUnit unit)
有三个参数 bufferSize、 selector 和 time (以及指定时间单位的 unit)
- bufferSize 用来指定缓存的最大数量。当新的 Observer 订阅的时候,最多只能收到 bufferSize 个之前缓存的数据。
- time, unit 用来指定一个数据存货的时间,新订阅的 Observer 只能收到时间不超过这个参数的数据。
- selector 和 publish(selector) 用来转换重复的 Observable。
cache
- cache 操作函数和 replay 类似,但是隐藏了 ConnectableObservable ,并且不用管理 subscription 了。
- 只要第一个订阅者订阅了,内部的 ConnectableObservable 就链接到源 Observable上了,并且不会取消订阅了。
- 后来的订阅者会收到之前缓存的数据,但是并不会重新订阅到源 Observable 上。.
- 即使所有订阅者都取消,内部的ConnectableObservable仍然也不会停止
- 也就是一个长连接一直存在,因为在内部实现connect,没有返回connectSubs,也就无法调用connectSubs.unsubscribe();
public final Observable<T> cache()
public final Observable<T> cache(int capacity)
try {
Observable<Long> obs = Observable.interval(100, TimeUnit.MILLISECONDS)
.take(8)
.doOnNext(new Action1<Long>() {
@Override
public void call(Long aLong) {
log("doNext:" + aLong.toString());
}
})
.cache()
.doOnSubscribe(new Action0() {
@Override
public void call() {
log("Subscribed");
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
log("Unsubscribed");
}
});
Subscription subscription = obs.subscribe();
Thread.sleep(500);
subscription.unsubscribe();
} catch (InterruptedException e) {
e.printStackTrace();
}
结果:
Subscribed
doNext:0
doNext:1
doNext:2
doNext:3
doNext:4
0Unsubscribed
doNext:5
doNext:6
doNext:7
上面的示例中,doOnNext 打印源 Observable 发射的每个数据。而 doOnSubscribe 和doOnUnsubscribe 打印缓存后的 Observable 的订阅和取消订阅事件。可以看到当订阅者订阅的时候,数据流开始发射,取消订阅数据流并不会停止。
项目源码 GitHub求赞,谢谢!
引用:
RxJava 教程第三部分:驯服数据流之 hot & cold Observable - 云在千峰