Demo分析
响应式编程的概念现在火的一塌糊涂,各种RxXXX库层出不穷,虽然这些库的实现语言各不相同,但是原理都是一样的。我的理解是这些库主要都包含三个东西:Observable, OnSubscribe, Subscriber。阅读本文的读者必须懂的这些概念,初学者建议看下RxJava专题 上的文章再来看本文。我们就从源码层级来分析一下这中间的事件流,线程切换是怎么个原理。这里交代下本文分析的RxJava的版本是1.1.0
先来看个简单的Demo实例
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello world");
subscriber.onCompleted();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
});
上面的demo就是简单的创建一个Observable,然后触发事件流。ok, 我们一步一步来分析。我们的Demo先是调用Observabale的create方法,而create方法其实内部就是调用Observale的构造函数来实例化一个Observable对象。再然后就是调用subscrible方法
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// 此处省略一堆验证参数的代码
// new Subscriber so onStart it
subscriber.onStart();
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
throw r;
}
return Subscriptions.unsubscribed();
}
}
创建完Observable对象后,就会调用subscribe方法触发事件流。subscribe方法会先做一些参数验证,然后调用subscriber.onStart()方法,再然后调用我们在创建Observable的时候传递进去的OnSubscribe对象的call方法来触发Subscriber对象的onNext, onComplete 或者onError方法。用个图来表示。
这里为了方便都采用了简写
* Observable ———> Ob
* OnSubscribe ———> OnSub
* Subscriber———> Sub
图中的箭头表示调用关系,上图中的箭头表示OnSub的call方法的调用,而call方法中又会调用Sub对象的方法onNext, onError或者onComplete方法。
map/lift方法分析
分析map方法和分析lift方法是一样的,弄懂了一个,另外一个自然也就懂了。ok, 先看下Observable的map方法的代码
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
可以看到map方法实质上就是调用lift方法实现的,这里我们先要知道Func1<? super T, ? extends R> 和OperatorMap<T, R>对象充当什么样的角色。先看下Func1的代码
public interface Func1<T, R> extends Function {
R call(T t);
}
Func1是一个接口,它只有一个方法, 传入一个T类型的对象,返回一个R类型的对象,所以它是一个转换器。调用它的call方法可以将第一个泛型类型的对象T转换成第二个泛型类型的对象R。
那OperatorMap呢?看下代码
public final class OperatorMap<T, R> implements Operator<R, T> {
private final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, t);
}
}
};
}
看这段代码我们知道OperatorMap<R, T> 实现了Operator< T, R>接口,这里要注意两个泛型 T 和 R 在OperatorMap和Operator里面的顺序是对换的。先来看下Operator接口的代码
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
// cover for generics insanity
}
在这里大家不要去记住泛型T,泛型R啊什么的, 我们按顺序来区分它们,就记住第一个泛型对象,第二个泛型对象。上面我们说道Func1是个转换器, 那Operator也是个转换器咯,不过它不是把第一个泛型的对象转换成第二个泛型的对象, 而是将Susbcriber<第一个泛型>对象转换成Sbscriber<第二个泛型>对象。再根据OperatorMap<T, R> implements Operator<R, T>
OperatorMap<R, T> 和Operator< T, R>的泛型顺序是相反的可知,OperatorMap<T, R >转换器的功能是将Subscriber<第二个泛型>对象转换成Subscriber<第一个泛型>对象。 再来看下lift方法
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
// 此处省略无关代码
st.onError(e);
}
} catch (Throwable e) {
// 此处省略无关代码
o.onError(e);
}
}
});
}
lift方法就是创建了一个Observable<R>对象并返回。每个Observable<R>对象必须有一个OnSubscribe<R>对象,所以lift方法中也实例化了一个OnSubscribe<R>对象并传递给Observable<R>。该OnSubscribe对象的call方法又会调用OperatorMap的call方法创建一个Subscriber,最后组成了一个Observable对象返回。那么一个简单的demo加上一个map方法的图就是这样的。
上面的Ob<T>图我们已经见过了,下面的Ob<R>就是lift创建并返回的,从lift代码中我们知道Observable<R>中的OnSubscribe<R>对象的call(Subscriber<R> subscriber)方法会调用原始OnSubscribe<T>对象的call(Subscriber<T> subscriber)方法。在简单domo中,Subscriber<T>对象是我们创建并调用Observable<T>的subscribe方法传递进去的,那这里的Subscriber<T>是哪来的呢。答案就是OperatorMap的call方法,该call方法会将一个Subscriber<R>对象转换成Subscriber<T>对象。 新创建的Subscriber<T>对象的onNext, onError或者onComplete方法都会调用Subscriber<R>的相应方法。这也是上图中右边的由Sub<T>指向Sub<R>的表示意思。我们调用map方法时需要传入的Func1对象的call方法也是在Subscriber<T>的onNext方法中被调用。 那这个Subscriber<R>对象自然就是我们在代码实例化并通过subscribe方法传递给新的Observable<R>对象的参数咯。这里说下图中的虚线和实线的意思,虚线表示OnSubscribe对象的call方法调用, 实线表示Subscriber对象的onNext, onError或者onComplete方法的调用。
flatMap方法分析
先上代码
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
return merge(map(func));
}
可以看到有两条路径可以走,第一条是当原始Observable是ScalarSynchronousObservable对象的时候才会执行的。我能找到的只有Observable的just方法才会创建ScalarSynchornousObservable对象,其他的都是直接new一个Observable对象。那我们先来分析原始Observable<T>是调用just方法创建的情况,即第一条路径ScalarSynchronousObservable.scalarFlatMap(func)的执行情况。
public <R> Observable<R> scalarFlatMap(final Func1<? super T, ? extends Observable<? extends R>> func) {
return create(new OnSubscribe<R>() {
@Override
public void call(final Subscriber<? super R> child) {
Observable<? extends R> o = func.call(t);
if (o.getClass() == ScalarSynchronousObservable.class) {
child.onNext(((ScalarSynchronousObservable<? extends R>)o).t);
child.onCompleted();
} else {
o.unsafeSubscribe(new Subscriber<R>(child) {
@Override
public void onNext(R v) {
child.onNext(v);
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onCompleted() {
child.onCompleted();
}
});
}
}
});
}
ScalarSynchronousObservable的scalarFlatMap方法创建了一个新的Observable<R>对象。算上原始Observable<T>对象和我们Func1对象的call函数创建的Observable<R>对象,我们现在一共有三个Observable对象。
如果我们提供的Func1对象的call函数中创建Observable<R>对象的时候是调用Observable.just方法创建的,则if里面的条件成立,那么会执行
Observable<? extends R> o = func.call(t);
if (o.getClass() == ScalarSynchronousObservable.class) {
child.onNext(((ScalarSynchronousObservable<? extends R>)o).t);
child.onCompleted();
}
这里要注意下func.call(t)的t对象是原始Observable.just(T t)方法传递进去的t。而ScalarSynchronousObservable<? extends R>)o).t的t是我们在Func1.call方法里面用just(T t)方法创建Observable对象时传递进去的。
最后的图就是这样了。原始的Observable<T>已经没什么用了,它就当做一个提供数据T的容器,被Func1.call方法用来取数据Observable<T>.t。Func1.call方法将T对象转换成Observable<R>对象,该Obersvable<R>也没什么用,只是当做数据R的容器,被最后的Observable<R>里面的OnSubscrib<R>.call方法调用到,用来获取R数据对象并传递给Sub<R>.onNext(R r)方法。
ok, 如果Func1.call方法里面提供的Observable<R>不是通过just创建的,那if条件就不成立,那代码就会走else里面的
Observable<? extends R> o = func.call(t);
o.unsafeSubscribe(new Subscriber<R>(child) {
@Override
public void onNext(R v) {
child.onNext(v);
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onCompleted() {
child.onCompleted();
}
}
那么最后的图就是这样的
上图中标明依赖
字样的虚线表示依赖关系,并不是事件流。那原始Observable<T>不是通过just创建的呢,是普通的Observable对象,在flatMap函数中代码就是走merge(map(func))。map函数我们已经知道是怎么回事了,所以先上执行完map的图
再看下merge函数
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
if (source.getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
}
return source.lift(OperatorMerge.<T>instance(false));
}
代码将会执行source.lift方法。为什么不会执行上面if里面的代码?因为经过map函数调用生成的Ob<Ob<R>>不是ScalarSynchronousObservable类型的对象。lift方法的图谱我们在分析map方法时已经知道了怎么画,直接上图再解释
一个完整的图就是上面那样的了,问题的关键点是Sub<Ob<R>>是如何触发Sub<R>的事件的,即右下角的那条实线箭头是怎么来的。在map方法的分析中,我们知道决定Sub<Ob<R>>触发Sub<R>事件的代码在Operator中。在这里就是OperatorMerge的call方法。
@Override
public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
MergeProducer<T> producer = new MergeProducer<T>(subscriber);
subscriber.producer = producer;
child.add(subscriber);
child.setProducer(producer);
return subscriber;
}
当MergeSubscriber的onNext, onComplete或者onError方法被调用时,都会调用其emit方法, 而emit方法又会调用emitLoop方法,看下emitLoop方法
void emitLoop() {
// 删除掉大量的无关的代码
try {
final Subscriber<? super T> child = this.child;
for (;;) {
Queue<Object> svq = queue;
long r = producer.get();
boolean unbounded = r == Long.MAX_VALUE;
if (svq != null) {
for (;;) {
while (r > 0) {
o = svq.poll();
T v = nl.getValue(o);
try {
child.onNext(v);
} catch (Throwable t) {
}
r--;
}
}
}
}
}catch (Exception e){
}
}
在这里会多次触发child.onNext()事件,于是就是现实了由一个Subscriber.onNext(T t)事件裂变成多个Subscriber.onNext(R r)事件。具体整个流程相当麻烦,有兴趣的读者可以自己慢慢研究下。
observeOn方法分析
老规矩,先上源码
public final Observable<T> observeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler));
}
跟flatMap很像,有两种情况,先分析第一种情况原始Observable<T>是由just方法创建的情况,just方法创建的是ScalrSynchronousObservable对象,所以代码走((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
public Observable<T> scalarScheduleOn(Scheduler scheduler) {
if (scheduler instanceof EventLoopsScheduler) {
EventLoopsScheduler es = (EventLoopsScheduler) scheduler;
return create(new DirectScheduledEmission<T>(es, t));
}
return create(new NormalScheduledEmission<T>(scheduler, t));
}
这里可以看到根据Scheduler的类型又分为两种情况,不过都是创建一个新的Observable<T>对象。那么不同的就是两个OnSubscribe<T>对象了,一个是DirectScheduledEmission,另一个是NormalScheduleEmission。
static final class DirectScheduledEmission<T> implements OnSubscribe<T> {
private final EventLoopsScheduler es;
private final T value;
DirectScheduledEmission(EventLoopsScheduler es, T value) {
this.es = es;
this.value = value;
}
@Override
public void call(final Subscriber<? super T> child) {
child.add(es.scheduleDirect(new ScalarSynchronousAction<T>(child, value)));
//这个方法的代码看不懂,可以把它拆开看,拆开后如下
//ScalarSynchronousAction<T> scalarAction = new ScalarSynchronousAction<T>(child, value);
//Subscription subscription = es.scheduleDirect(scalrAction);
//child.add(subscription);
}
}
/** Emits a scalar value on a general scheduler. */
static final class NormalScheduledEmission<T> implements OnSubscribe<T> {
private final Scheduler scheduler;
private final T value;
NormalScheduledEmission(Scheduler scheduler, T value) {
this.scheduler = scheduler;
this.value = value;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
Worker worker = scheduler.createWorker();
subscriber.add(worker);
worker.schedule(new ScalarSynchronousAction<T>(subscriber, value));
}
}
/** Action that emits a single value when called. */
static final class ScalarSynchronousAction<T> implements Action0 {
private final Subscriber<? super T> subscriber;
private final T value;
private ScalarSynchronousAction(Subscriber<? super T> subscriber,
T value) {
this.subscriber = subscriber;
this.value = value;
}
@Override
public void call() {
try {
subscriber.onNext(value);
} catch (Throwable t) {
subscriber.onError(t);
return;
}
subscriber.onCompleted();
}
}
ScalarSynchronousAction是一个Action0,它的call方法会触发subscriber的onNext, onError或者onComplete事件。而DirectScheduledEmission和NormalScheduledEmission都是OnSubscribe对象,它们的call方法中都会将ScalarSychronousAction放入一个线程池Scheduler中去执行。所以当Observable<T>是ScalarSynchronousObservable<T>的时候即通过just方法创建出来的时候,事件图是这样的。
Scheduler是在OnSubscribe<T>的call方法中启动并切换线程的。
那么原始Observable<T>不是ScalrSynchronousObservable的时候,observeOn方法的代码走向是lift(new OperatorObserveOn(scheduler))。lift方法的事件图我们已经知道了,先上图
lift方法里面会创建一个新的Observable<T>对象和一个新的OnSubscribe<T>对象,新的OnSubscribe<T>.call方法会调用原始的OnSubscribe<T>.call(Subscriber<T> subscriber)方法。这个Subscriber<T>参数对象是通过OperatorObserveOn对象的call方法获取的。
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child);
parent.init();
return parent;
}
}
当Scheduler是ImmediateScheduler或者TrampolineScheduler对象时,返回的是我们代码里面创建并传递给Observable.subscribe()方法的Subscriber对象而且这个过程并没有使用到Scheduler去切换线程,也就是说这个过程依然运行在当前线程。那么事件图应该是这样的。
如果Scheduler不是ImmediateScheduler和TrampolineScheduler的话,OperatorObserveOn对象的call方法就会执行以下代码
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
//省略掉一些代码
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child);
parent.init();
return parent;
}
这里要关注的就是ObserveOnSubscriber对象,它是一个Subscriber对象。ok,也看下代码
private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
final Subscriber<? super T> child;
final Scheduler.Worker recursiveScheduler;
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
//省略掉无关代码
}
void init() {
child.add(scheduledUnsubscribe);
child.setProducer(new Producer() {
@Override
public void request(long n) {
BackpressureUtils.getAndAddRequest(requested, n);
schedule();
}
});
child.add(recursiveScheduler);
child.add(this);
}
@Override
public void onStart() {
request(RxRingBuffer.SIZE);
}
@Override
public void onNext(final T t) {
//省略掉无关代码
schedule();
}
@Override
public void onCompleted() {
//省略掉无关代码
schedule();
}
@Override
public void onError(final Throwable e) {
//省略掉无关代码
schedule();
}
final Action0 action = new Action0() {
@Override
public void call() {
pollQueue();
}
};
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(action);
}
}
// only execute this from schedule()
void pollQueue() {
//省略掉无关代码
do {
for (;;) {
if (finished) {
if ((error = this.error) != null) {
queue.clear();
child.onError(error);
return;
} else if (queue.isEmpty()) {
child.onCompleted();
return;
}
}
if (r > 0) {
Object o = queue.poll();
if (o != null) {
child.onNext(on.getValue(o));
} else {
break;
}
} else {
break;
}
}
if (emitted > 0) {
request(emitted);
}
}
}
可以看出当ObserveOnSubscriber对象的onNext, onError或者onComplete事件被触发时,它们会调用schedule方法,而schedule方法会将Action0加入线程池去执行,也就是说Action0的call方法执行的代码是在scheduler切换后的线程中执行,Action0.call方法会调用pollQueue去触发新的Subscriber<T>的onNext, onError或者onComplete事件。最后的事件图是这样的。
上图中的 Scheduler作用于
表示 在Subscriber<T>对象的onNext, onError或者onComplete方法中调用Scheduler的方法去切换线程。
subscribeOn方法分析
先看subscriveOn方法的代码
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}
当原始Observable<T>对象是ScalarSynchronousObservable对象时,即原始Observable<T>是由just方法创建的时候,代码和observeOn方法中情况一样,直接上事件流图
Scheduler是在OnSubscribe<T>的call方法中启动并切换线程的。那么原始Observable<T>不是ScalrSynchronousObservable的时候,subscribeOn方法的代码走向是nest().lift(new OperatorSubscribeOn<T>(scheduler));
先看下nest方法的代码
public final Observable<Observable<T>> nest() {
return just(this);
}
public final static <T> Observable<T> just(final T value) {
return ScalarSynchronousObservable.create(value);
}
public final class ScalarSynchronousObservable<T> extends Observable<T> {
public static final <T> ScalarSynchronousObservable<T> create(T t) {
return new ScalarSynchronousObservable<T>(t);
}
private final T t;
protected ScalarSynchronousObservable(final T t) {
super(new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> s) {
s.onNext(t);
s.onCompleted();
}
});
this.t = t;
}
public T get() {
return t;
}
//此处省略掉无关代码
}
nest方法其实是调用just方法创建一个ScalarSynchronousObservable对象。这里要注意的是just方法的参数是原始Observable<T>对象,执行完nest方法的图如下
原始Observable<T>对象被当做一个属性保存在新创建出来的ScalarSynchronousObservable对象当中。以上就是nest方法执行完之后的情况,按照nest().lift(new OperatorSubscribeOn<T>(scheduler));
的代码走向,我们来看下执行lift的时候事件流的图是怎么样的。先上一张没有分析OperatorSubscribeOn代码的图
lift方法会创建一个Observable<T>对象,该对象的内部有一个OnSubscribe<T>对象,它的call方法会调用OperatorSubscribeOn.call方法得到一个Subscriber<Observable<T>>对象,并在调用上面的Observable<Observable<T>>.OnSubscribe<Observable<T>>.call方法时传递进去。那么来看下OperatorSubscribeOn.call是如何将一个Subscribe<T>转换成Subscriber<Observable<T>>对象的。
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {
private final Scheduler scheduler;
public OperatorSubscribeOn(Scheduler scheduler) {
this.scheduler = scheduler;
}
@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
return new Subscriber<Observable<T>>(subscriber) {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(final Observable<T> o) {
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
o.unsafeSubscribe(new Subscriber<T>(subscriber) {
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void setProducer(final Producer producer) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (Thread.currentThread() == t) {
producer.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
producer.request(n);
}
});
}
}
});
}
});
}
});
}
};
}
}
可以看到OperatorSubscribeOn对象的call方法里面实例化了一个Subscriber<T>对象并返回。该Subscriber<T>的onNext方法中会调用Scheduler.schedule将一个Action0加入线程池里执行,该Action0的call方法会触发原始Observable<T>的unsafeSubscribe方法,然后会触发原始OnSubscribe<T>.call方法,也就是说原始OnSubscribe<T>.call方法是在切换后的线程中执行。如图