Create
创建一个Observable比较简单,最基础的方法是调用Observable的create方法进行创建,贴一下示例:
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { //执行想要的操作 } });
它的源码实现也比较简单,在create的时候创建出一个Observable,并且将我们自己写的OnSubscribe传入,在订阅的时候代码如下:
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { // validate and proceed if (subscriber == null) { throw new IllegalArgumentException("observer can not be null"); } if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); /* * the subscribe function can also be overridden but generally that's not the appropriate approach * so I won't mention that in the exception */ } // new Subscriber so onStart it subscriber.onStart(); /* * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */ // if not already wrapped if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber<T>(subscriber); } // The code below is exactly the same an unsafeSubscribe but not used because it would // add a significant depth to already huge call stacks. try { // allow the hook to intercept and/or decorate hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // in case the subscriber can't listen to exceptions anymore if (subscriber.isUnsubscribed()) { RxJavaPluginUtils.handleException(hook.onSubscribeError(e)); } else { // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(hook.onSubscribeError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. hook.onSubscribeError(r); // TODO why aren't we throwing the hook's return value. throw r; } } return Subscriptions.unsubscribed(); } }
重点是这一句:
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
在订阅的时候调用我们自己写的OnSubscribe,传入的参数也就是自己写的Subscriber。
Map
先贴出简单的示例代码:
Observable.just(1,2,3,4,5) .map(new Func1<Integer, String>() { @Override public String call(Integer integer) { return integer+"test"; } }) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.d(TAG, "call: "+s); } });
map方法的作用是将一种类型的Observer变成另一种类型的Observer,看一下它内部的实现
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { return lift(new OperatorMap<T, R>(func)); }
当前的Observer声明的泛型参数是T,为了使用另一种泛型参数,需要单独的在泛型方法中进行声明。看一下Func1的定义
public interface Func1<T, R> extends Function { R call(T t); }
很简单的函数式接口,内部只有一个方法,接口的实现类通过传入一个类型为T的对象,通过调用call方法将其转化成类型为R的对象并且返回。
回到上面的例子中,T就是Integer,R就是String,而我们使用了匿名类来实现的Func1接口,目的是将Integer类转化成String类。
在map方法中新出现了一个类叫OperatorMap,看一下这个类:
public final class OperatorMap<T, R> implements Operator<R, T> { final Func1<? super T, ? extends R> transformer; public OperatorMap(Func1<? super T, ? extends R> transformer) { this.transformer = transformer; } @Override public Subscriber<? super T> call(final Subscriber<? super R> o) { MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer); o.add(parent); return parent; } static final class MapSubscriber<T, R> extends Subscriber<T> { ... } }
因为代码比较短就全部贴上来了。看一下这个方法的构造,传入了一个Func1的实现类,就是我们在前面例子中所写的匿名类,然后竟然结束了。。。结束了。。。根本没有啥具体的信息。。。
但是还有一点值得去看的就是类名:OperatorMap<T, R> implements Operator<R, T>,这个Operator是个啥, 找一下它定义的地方:
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> { // cover for generics insanity }
这个接口是个空接口,但是既然没有任何方法为什么要有这个接口存在呢?答案就是要限制Func1中泛型参数的类型,想一下如果有一个类实现了Operator接口,它所复写的方法是什么样子?OperatorMap就实现类Operator,贴一下它的实现代码:
@Override public Subscriber<? super T> call(final Subscriber<? super R> o) { <pre name="code" class="java"><span style="white-space:pre"> </span>MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer); o.add(parent); return parent;
}
可以看出如果调用Operator的call方法可以将一种类型的Subscriber转化成另一种类型的Subscriber。而转化的方法的是创建了一个MapSubscriber直接返回。这个类是OperatorMap的内部类,贴一下代码简单看一下:
static final class MapSubscriber<T, R> extends Subscriber<T> { final Subscriber<? super R> actual; final Func1<? super T, ? extends R> mapper; boolean done; public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) { this.actual = actual; this.mapper = mapper; } @Override public void onNext(T t) { R result; try { result = mapper.call(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); unsubscribe(); onError(OnErrorThrowable.addValueAsLastCause(ex, t)); return; } actual.onNext(result); } @Override public void onError(Throwable e) { if (done) { RxJavaPluginUtils.handleException(e); return; } done = true; actual.onError(e); } @Override public void onCompleted() { if (done) { return; } actual.onCompleted(); } @Override public void setProducer(Producer p) { actual.setProducer(p); } }
这个类继承了抽象类Subscriber,那么它也是一个订阅者,也实现了订阅者都应该实现的onNext,onError,onCompleted方法。但是MapSubscriber在这些方法中做了一些处理,以onNext方法为例:
<span style="white-space:pre"> </span>R result; result = mapper.call(t); actual.onNext(result);
将方法简化后在onNext中只剩下这三句,mapper就是在调用map方法的时候我们传入的Func1的匿名实现类,而actual是调用OperatorMap的call方法中传入的参数,暂时还没涉及到,等下涉及到了再来看。最后调用了actual的onNext方法,然后一切就结束了。
再回过头继续看map:
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { return lift(new OperatorMap<T, R>(func)); }
跟进一下lift方法:
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator)); }
又出现了一个并没有见过的类onSubscribeLift,看一下类的定义:
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> { static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); final OnSubscribe<T> parent; final Operator<? extends R, ? super T> operator; public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) { this.parent = parent; this.operator = operator; } @Override public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = hook.onLift(operator).call(o); try { st.onStart(); parent.call(st); } catch (Throwable e) { Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); } } }
代码也比较少,所以就直接贴了。如果记性比较好应该能记得OnSubscribe是什么作用,在这里就不再赘述了,可以回头看一看。而OnSubscribeLift既然是实现了onSubscribe接口
,那么就和它有一样的作用。在OnSubscribeLift的构造方法中接收了两个参数,一个是原始的onSubscribe,另一个就是Operator啦,如果还有印象应该可以Operator这个接口目的是将一种类型的Subscriber转化成另一种类型的Subscriber。
而OnSubscribe中最重要的就是call方法,也是通过create创建Observable时我们需要重写的,OnSubscribeLift中的call方法已经给出了实现,简单看一下。
hook.onLift(operator) 这句直接返回的就是operator,所以这句话的作用是调用operator的call方法将R类型的Subscriber转化成T类型的Subscriber,调用新创建出了Subscriber的onStart, 然后使用原来的OnSubscribe调用call方法,传入的是新创建的Subscriber。
到这里整个map的调用流程就结束了,但是相信看过了一定还是一脸懵逼,再从头看一遍,相信一下就能明白了,我再把开始时候的例子重新贴出来:
Observable.just(1,2,3,4,5) .map(new Func1<Integer, String>() { @Override public String call(Integer integer) { return integer+"test"; } }) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.d(TAG, "call: "+s); } });
从后来往前看,首先是subscribe,在这个方法中调用了OnSubscribe的call方法,并且将Action1的实例对象封装成subscriber传入作为参数。这里OnSubscribe并不是just方法调用中产生的OnSubscribe(如果你是用create创建的那么也不是你自己写的那个OnSubscribe),上面已经说了,map中只是调用了lift,那么真正起作用的OnSubscribe也应该是lift所产生的Observable中的OnSubscribe。再贴一下代码:
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator)); }
这个新的OnSubscribe就是这个OnSubscribeLift,其中传入的operator是OperatorMap的对象。
订阅的时候调用的是OnSubscribeLift的call方法,再贴一下:
public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = hook.onLift(operator).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); parent.call(st); } catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); } }
这个call方法中传入的参数o就是订阅时封装了Action1的Subscriber。try块中的第一行调用了operator的call方法,传入的参数是这个subscriber,如果各位还有印象能记起来这个operator就是在map方法中创建出来的OperatorMap,再贴一下它的call方法:
public Subscriber<? super T> call(final Subscriber<? super R> o) { MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer); o.add(parent); return parent; }
在创建了一个MapSubscriber以后就返回了。但是大家不知道有没有注意,这里面的返回类型是Subscriber<? super T>,也就是返回了一个原始类型的Subscriber,在我们这里面也就是返回了一个Integer类型的,原因也是因为使用了map方法后在subscribe方法中要接收的是一个R类型的Subscriber,在这里进行了一下转化。目前还没有出现任何有用的逻辑。返回了MapSubscriber对象st后,重要的一句是parent.call(st)。parent是原始的OnSubscribe,也就是我们自己复写call方法的OnSubscribe,它想要的参数自然是Subscriber<T>,这里由于传它的参数是MapSubscriber,满足了这个条件。所以现在要做的就是看一下MapSubscriber中的onNext,onError和onCompleted等方法啦(因为OnSubscribe的逻辑不就是调用这几个方法么)。以onNext为例:
public void onNext(T t) { R result; try { result = mapper.call(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); unsubscribe(); onError(OnErrorThrowable.addValueAsLastCause(ex, t)); return; } actual.onNext(result); }
这里的mapper就是我们自定义的那个Func1的实例。还记得Func1的作用么,将T转换成R,在本例中也就是将Integer转换成String,这里进行了转换,终于找到了~~~
那转换之后呢,调用了actual的onNext方法,这个actual就是OperatorMap中call方法里面的o,也就是最终我们在代码中写的封装了Action1的Subscriber。这样就完成了一连串的转换和调用。
简单的总结一下整个过程。因为只有原始我们写的OnSubscribe中有发射数据的逻辑,所以原始的那个必需得用上,但是原始的那个OnSubscribe想要的是一个T类型的Subscriber,不过我们现在有的只是R类型的Subscriber,所以进行了一层封装,将R类型的Subscriber转换成T类型的Subscriber。然后在封装类的OnNext方法中将原始的OnSubscribe里call方法中发射的一系列数据进行一个转换(从T转换成R),用的就是自己写的Func1,然后调用我们所写的Subscriber<R>中的onNext等方法,方法需要的参数类型是R,而我们转换生成的参数也是R,整个调用过程完成~~
我自己画了一个简单的流程图如下:
FlatMap
假设现在有一个商店类Shop,商店类中有地址和商品集合,每一个商品都有自己的名字,如果给出一个商店的集合,要将商店的地址输出出来该怎么做?
直接使用上面说过的map就可以,代码如下(实体的代码比较简单就没有贴出来):
List<Shop> shops = new ArrayList<>(); for(int i = 1;i<=10;++i){ Shop shop = new Shop(); shop.setAddress(new Address("Shop"+i)); for(int j = 1;j<=10;++j){ Good good = new Good("Shop"+i+":Good"+j); shop.getGoods().add(good); } shops.add(shop); } Observable.from(shops) .map(new Func1<Shop, Address>() { @Override public Address call(Shop shop) { return shop.getAddress(); } }) .subscribe(new Action1<Address>() { @Override public void call(Address address) { Log.d(TAG, "call: "+address.getName()); } });
但是如果想输出商店所有商品的名字呢?
可以这样:
Observable.from(shops) .map(new Func1<Shop, List<Good>>() { @Override public List<Good> call(Shop shop) { return shop.getGoods(); } }) .subscribe(new Action1<List<Good>>() { @Override public void call(List<Good> goods) { for(Good g:goods){ Log.d(TAG, "call: "+g.getName()); } } });
好吧把RxJava的初衷,也就是流式和简洁破坏了,并且subscribe方法的可复用性极低,并不是我们想要看到的。
基于以上问题可以总结一点小的共性出来,还是将一个Observable<T>转换成另一个Observable<R>,只不过这个R并不是T中的属性,而是T中某个集合中的元素,这时候我们可以考虑使用FlatMap。贴一下使用FlatMap后的代码:
Observable.from(shops) .flatMap(new Func1<Shop, Observable<Good>>() { @Override public Observable<Good> call(Shop shop) { return Observable.from(shop.getGoods()); } }) .subscribe(new Action1<Good>() { @Override public void call(Good good) { Log.d(TAG, "call: "+good.getName()); } });
下面来看一下FlatMap是怎么实现的:
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) { ... return merge(map(func)); }
看到了很熟悉的map。从map传入的Func1参数可以看出来map的作用是将Observable<T>转换成Observable<Observable<R>>。
看一下merge的实现:
public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) { ... return source.lift(OperatorMerge.<T>instance(false)); }
从方法的签名可以看出merge的作用是将Observable<Observable<T>>的参数转换成Observable<T>。看一下lift:
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator)); }
和刚刚介绍的map中的lift是一个方法,目的是通过一个操作符Operator将当前的Observable<T>转换成Observable<R>。既然是转化Observable,而且source的类型是Observable<Observable<T>>,转换出来的东西想要Observable<T>,所以我们可以猜测OperatorMerge.<T>instance(false)所返回的对象应该是这样的一个形式
Operator<T, Observable<T>>
看一下OperatorMerge的定义:
public final class OperatorMerge<T> implements Operator<T, Observable<? extends T>> { ... }
instance方法是一个单例方法,采用了静态内部类的形式,代码如下:
public static <T> OperatorMerge<T> instance(boolean delayErrors) { if (delayErrors) { return (OperatorMerge<T>)HolderDelayErrors.INSTANCE; } return (OperatorMerge<T>)HolderNoDelay.INSTANCE; }
private static final class HolderNoDelay { /** A singleton instance. */ static final OperatorMerge<Object> INSTANCE = new OperatorMerge<Object>(false, Integer.MAX_VALUE); }
如果有印象的话能记起来lift方法是将operator和我们所写的OnSubscribe封装在一个OnSubscribeLift中,然后用这个OnSubscribe创建出一个Observable<R>。而在订阅的时候我们写的Observer的类型是R,所以走到了OnSubscribeLift的call方法中,在这个方法中调用了operator的call,目的是将一个Subscriber<R>转换成一个Subscriber<T>从而能让我们自己写的OnSubscribe来对其进行操作。在map方法中执行这个任务的是OperatorMap类,具体的任务由其中的内部类MapSubscriber来实现,在merge操作中其实也是一样的,看一下代码:
@Override public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) { MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent); MergeProducer<T> producer = new MergeProducer<T>(subscriber); subscriber.producer = producer; child.add(subscriber); child.setProducer(producer); return subscriber; }
call方法的目的仍然是创建一个可以满足原始的自己写的OnSubscribe的Subscriber(这里原始的Observable是Observable<Observable<T>>的source,如果忘记了可以回头看一下)。而具体的任务仍然是创建一个内部类的对象来实现:
static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>> { final Subscriber<? super T> child; ... public MergeSubscriber(Subscriber<? super T> child, boolean delayErrors, int maxConcurrent) { this.child = child; ... } ... <pre name="code" class="java"> <span style="white-space:pre"> </span>@Override public void onNext(Observable<? extends T> t) { if (t == null) { return; } if (t == Observable.empty()) { emitEmpty(); } else if (t instanceof ScalarSynchronousObservable) { tryEmit(((ScalarSynchronousObservable<? extends T>)t).get()); } else { InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++); addInner(inner); t.unsafeSubscribe(inner); emit(); } }
... }
这个类的代码想对比较多,我们还是挑一些来看。构造方法中重点是传入了一个child,这个对象就是最终我们自己写的Subscriber,从它的泛型定义上也可以看的出来Subscriber<T>。
然后看一下onNext,除去各种错误检查,直接看最后一个else,里面首先创建出一个InnerSubscriber,看一下它的代码:
static final class InnerSubscriber<T> extends Subscriber<T> { final MergeSubscriber<T> parent; final long id; volatile boolean done; volatile RxRingBuffer queue; int outstanding; static final int limit = RxRingBuffer.SIZE / 4; public InnerSubscriber(MergeSubscriber<T> parent, long id) { this.parent = parent; this.id = id; } @Override public void onStart() { outstanding = RxRingBuffer.SIZE; request(RxRingBuffer.SIZE); } @Override public void onNext(T t) { parent.tryEmit(this, t); } @Override public void onError(Throwable e) { done = true; parent.getOrCreateErrorQueue().offer(e); parent.emit(); } @Override public void onCompleted() { done = true; parent.emit(); } public void requestMore(long n) { int r = outstanding - (int)n; if (r > limit) { outstanding = r; return; } outstanding = RxRingBuffer.SIZE; int k = RxRingBuffer.SIZE - r; if (k > 0) { request(k); } } }
构造方法中将MergeSubscriber当作参数parent传入。先不去管这个类,看MergeSubscriber的onNext方法后面做了些什么:
t.unsafeSubscribe(inner);
这句比较重点,看一下实现:
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) { try { // new Subscriber so onStart it subscriber.onStart(); // allow the hook to intercept and/or decorate hook.onSubscribeStart(this, onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(hook.onSubscribeError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. hook.onSubscribeError(r); // TODO why aren't we throwing the hook's return value. throw r; } return Subscriptions.unsubscribed(); } }
可以看出除了稍有不同外就是正常的订阅,逻辑也都类似,那么正常订阅的时候最终会调用到Subscriber的onNext方法中,这里的Subscriber是哪一个呢?没错就是刚才所看到的InnerSubscribe。看一下它的onNext:
@Override public void onNext(T t) { parent.tryEmit(this, t); }
调用了parent,也就是MergeSubscriber的tryEmit,我们接着来看一下:
void tryEmit(InnerSubscriber<T> subscriber, T value) { boolean success = false; long r = producer.get(); if (r != 0L) { synchronized (this) { // if nobody is emitting and child has available requests r = producer.get(); if (!emitting && r != 0L) { emitting = true; success = true; } } } if (success) { emitScalar(subscriber, value, r); } else { queueScalar(subscriber, value); } }
比较关键的是if(success)里面的方法emitScalar,看一下代码实现:
protected void emitScalar(InnerSubscriber<T> subscriber, T value, long r) { boolean skipFinal = false; try { try { child.onNext(value); } catch (Throwable t) { if (!delayErrors) { Exceptions.throwIfFatal(t); skipFinal = true; subscriber.unsubscribe(); subscriber.onError(t); return; } getOrCreateErrorQueue().offer(t); } if (r != Long.MAX_VALUE) { producer.produced(1); } subscriber.requestMore(1); // check if some state changed while emitting synchronized (this) { skipFinal = true; if (!missed) { emitting = false; return; } missed = false; } } finally { if (!skipFinal) { synchronized (this) { emitting = false; } } } /* * In the synchronized block below request(1) we check * if there was a concurrent emission attempt and if there was, * we stay in emission mode and enter the emission loop * which will take care all the queued up state and * emission possibilities. */ emitLoop(); }
终于看到了我么想看的代码:
child.onNext(value);
记性好的还能想起来child就是MergeSubscriber构造方法传入的我们自己写的Subscriber。
所以整个主线流程到这里就结束啦,看得还是晕晕的,下面再上一张FlatMap的流程图:
subscribeOn和observerOn
RxJava神奇之处还有一个地方就是它可以将线程的转换也简单的通过流式来处理了,下面来看一下例子:
Observable.just(1,2,3,4,5) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { } });
这段代码没有任何实际的含义,仅仅是调用了一下Api,下面来看一下源码,首先是subscribeOn:
public final Observable<T> subscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return create(new OperatorSubscribeOn<T>(this, scheduler)); }
这里也很简单,主要是创建了一个新的Observable,并且这个Observable和原始的Observable都是一样的泛型参数。如果现在订阅了这个新的Subscriber,就会走到OperatorSubscribeOn这个类的call方法中,来看一下:
@Override public void call(final Subscriber<? super T> subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); inner.schedule(new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); Subscriber<T> s = new Subscriber<T>(subscriber) { @Override public void onNext(T t) { subscriber.onNext(t); } @Override public void onError(Throwable e) { try { subscriber.onError(e); } finally { inner.unsubscribe(); } } @Override public void onCompleted() { try { subscriber.onCompleted(); } finally { inner.unsubscribe(); } } @Override public void setProducer(final Producer p) { subscriber.setProducer(new Producer() { @Override public void request(final long n) { if (t == Thread.currentThread()) { p.request(n); } else { inner.schedule(new Action0() { @Override public void call() { p.request(n); } }); } } }); } }; source.unsafeSubscribe(s); } }
我们先只关注第二个call方法,也就是被嵌套的call方法中的内容。首先以我们自己写的Subscriber创建出了一个Subscriber,然后调用了source的unsafeSubscribe方法,从上面看下来的同学应该知道这个方法其实和正常的subscribe主体逻辑上没有什么区别,这里就不贴代码了,忘记了可以自己看一下源码。那么既然传递的参数是我们用自己写的Subscriber创建出来的新的Subscriber,那么久看一下新的Subscriber的onNext等方法:
@Override public void onNext(T t) { subscriber.onNext(t); }
可以看出仅仅是简单调用了一下原Subscriber(也就是我们自己写的Subscriber)的onNext方法。看起来所做的都没有什么卵用。
这个时候我们可以简单的猜想一下,其实在inner.schedule方法执行中就已经切换了线程。我们来看一下源码。
在例子中使用的Scheduler是Schedulers.io,然后在OperatorSubscribeOn的call方法中调用了Schedulers.io返回的Scheduler的createWorker方法,跟进看一下:
public static Scheduler io() { return INSTANCE.ioScheduler; }
Scheduler io = hook.getIOScheduler(); if (io != null) { ioScheduler = io; } else { ioScheduler = RxJavaSchedulersHook.createIoScheduler(); }
public static Scheduler createIoScheduler() { return createIoScheduler(new RxThreadFactory("RxIoScheduler-")); }
public static Scheduler createIoScheduler(ThreadFactory threadFactory) { if (threadFactory == null) throw new NullPointerException("threadFactory == null"); return new CachedThreadScheduler(threadFactory); }
最终到了CachedThreadScheduler中,这个类继承了抽象类Scheduler。在OperatorSubscribeOn的call方法中调用createWorker方法创建出了一个Worker对象,由于这个方法是抽象的,我们看一下这个方法在具体类中的实现:
@Override public Worker createWorker() { return new EventLoopWorker(pool.get()); }
看一下EventLoopWorker的构造方法:
EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.threadWorker = pool.get(); }
有一个很重要的属性threadWorker,它是从pool中得到的,那这个pool是什么,跟进看一下:
public CachedThreadScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; this.pool = new AtomicReference<CachedWorkerPool>(NONE); start(); }
原来pool在这个类构造方法中被创建,而且它是一个引用包装类,这样就知道了get其实返回的只是它的包装类,来看一下包装的NONE:
static { NONE = new CachedWorkerPool(null, 0, null); NONE.shutdown(); }
找到了,这个对象是CachedWorkerPool的实例。
我们再回头看一下OperatorSubscribeOn的call方法,调用了inner.schedule,这个inner就是EventLoopWorker,它的schedule方法如下:
@Override public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { if (innerSubscription.isUnsubscribed()) { // don't schedule, we are unsubscribed return Subscriptions.unsubscribed(); } ScheduledAction s = threadWorker.scheduleActual(new Action0() { @Override public void call() { if (isUnsubscribed()) { return; } action.call(); } }, delayTime, unit); innerSubscription.add(s); s.addParent(innerSubscription); return s; }
这里面的threadWorker就是我们刚刚提到的CacheWorkerPool中get方法创建出来的:
ThreadWorker get() { if (allWorkers.isUnsubscribed()) { return SHUTDOWN_THREADWORKER; } while (!expiringWorkerQueue.isEmpty()) { ThreadWorker threadWorker = expiringWorkerQueue.poll(); if (threadWorker != null) { return threadWorker; } } // No cached worker found, so create a new one. ThreadWorker w = new ThreadWorker(threadFactory); allWorkers.add(w); return w; }
看一下ThreadWorker:
private static final class ThreadWorker extends NewThreadWorker { private long expirationTime; ThreadWorker(ThreadFactory threadFactory) { super(threadFactory); this.expirationTime = 0L; } public long getExpirationTime() { return expirationTime; } public void setExpirationTime(long expirationTime) { this.expirationTime = expirationTime; } }
这个类基本没什么方法,所以上它的父类去找一下:
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) { Action0 decoratedAction = schedulersHook.onSchedule(action); ScheduledAction run = new ScheduledAction(decoratedAction); Future<?> f; if (delayTime <= 0) { f = executor.submit(run); } else { f = executor.schedule(run, delayTime, unit); } run.add(f); return run; }
在这里找到了我们想要的scheduleActual方法,形式上很眼熟,executor.submit明显是线程池去执行了一个Runnable,但是这个线程池是哪里来的呢,我们再找一下:
public NewThreadWorker(ThreadFactory threadFactory) { ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory); // Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak boolean cancelSupported = tryEnableCancelPolicy(exec); if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) { registerExecutor((ScheduledThreadPoolExecutor)exec); } schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook(); executor = exec; }
找到了,是在NewThreadWorker的构造方法中创建的。
至此subscribeOn整体就分析完成了,大体流程也不难,主要就是代码的层次有点深,不太好找。这里再画一个图来总结一下:
下面看一下observerOn:
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize)); }
又看见了熟悉的lift,这个方法应该已经熟悉的不能再熟悉了,lift创建了一个Observable,然后创建了一个OnSubscribeLift并且传入一个Operator,在订阅的时候首先利用Operator创建出一个Subscriber<T>,然后由我们自己写的OnSubscribe对其进行操作。大致过程就是这样,重点在Operator的实现类上,在这里是OperatorObserverOn,进入看一下:
@Override public Subscriber<? super T> call(Subscriber<? super T> child) { if (scheduler instanceof ImmediateScheduler) { // avoid overhead, execute directly return child; } else if (scheduler instanceof TrampolineScheduler) { // avoid overhead, execute directly return child; } else { ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); parent.init(); return parent; } }
和之前的套路都是一样的,创建出了一个内部类ObserveOnSubscriber,然后我们自己写的OnSubscribe调用方法的时候最终将会调用到这个内部类的相应方法中,这里以onNext为例,看一下ObserveOnSubscriber的onNext实现:
@Override public void onNext(final T t) { if (isUnsubscribed() || finished) { return; } if (!queue.offer(on.next(t))) { onError(new MissingBackpressureException()); return; } schedule(); }
首先是调用了queue.offer(on.next(t))方法,这里面on.next(t)直接返回的就是t,这句简单的将t入队,然后调用schedule,看一下:
protected void schedule() { if (counter.getAndIncrement() == 0) { recursiveScheduler.schedule(this); } }
重要的是recursiveScheduler,看一下它是什么东西:
final Scheduler.Worker recursiveScheduler; public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) { this.child = child; this.recursiveScheduler = scheduler.createWorker(); this.delayError = delayError; this.on = NotificationLite.instance(); int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE; // this formula calculates the 75% of the bufferSize, rounded up to the next integer this.limit = calculatedSize - (calculatedSize >> 2); if (UnsafeAccess.isUnsafeAvailable()) { queue = new SpscArrayQueue<Object>(calculatedSize); } else { queue = new SpscAtomicArrayQueue<Object>(calculatedSize); } // signal that this is an async operator capable of receiving this many request(calculatedSize); }
看到这里也就非常熟悉了,和上文所讲的subscribeOn是一个道理,都是用了Worker。
回头看onNext,这个Worker的schedule方法需要一个Action0,正好ObserveOnSubscriber实现了Action0,那么看一下它的call方法,也就是在新线程中被调用的方法:
@Override public void call() { long missed = 1L; long currentEmission = emitted; // these are accessed in a tight loop around atomics so // loading them into local variables avoids the mandatory re-reading // of the constant fields final Queue<Object> q = this.queue; final Subscriber<? super T> localChild = this.child; final NotificationLite<T> localOn = this.on; // requested and counter are not included to avoid JIT issues with register spilling // and their access is is amortized because they are part of the outer loop which runs // less frequently (usually after each bufferSize elements) for (;;) { long requestAmount = requested.get(); while (requestAmount != currentEmission) { boolean done = finished; Object v = q.poll(); boolean empty = v == null; if (checkTerminated(done, empty, localChild, q)) { return; } if (empty) { break; } localChild.onNext(localOn.getValue(v)); currentEmission++; if (currentEmission == limit) { requestAmount = BackpressureUtils.produced(requested, currentEmission); request(currentEmission); currentEmission = 0L; } } if (requestAmount == currentEmission) { if (checkTerminated(finished, q.isEmpty(), localChild, q)) { return; } } emitted = currentEmission; missed = counter.addAndGet(-missed); if (missed == 0L) { break; } } }
并不是很长,找主线代码来看一下。有一个死循环,在死循环中不断的从queue中取出元素,这个元素就是刚刚在onNext中我们放进queue中的。取出后执行localChild.onNext(localOn.getValue(t))这句,localOn.getValue(t)其实就是给t带上一个泛型然后返回,那么localChild是什么:
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) { this.child = child; this.recursiveScheduler = scheduler.createWorker(); this.delayError = delayError; this.on = NotificationLite.instance(); int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE; // this formula calculates the 75% of the bufferSize, rounded up to the next integer this.limit = calculatedSize - (calculatedSize >> 2); if (UnsafeAccess.isUnsafeAvailable()) { queue = new SpscArrayQueue<Object>(calculatedSize); } else { queue = new SpscAtomicArrayQueue<Object>(calculatedSize); } // signal that this is an async operator capable of receiving this many request(calculatedSize); }
就是这里面的child,也就是我们自己写的subscriber。
到这里observerOn就结束了,有之前的subscribeOn做铺垫看起来就比较轻松。下面还是总结一下画一张图:
RxJava基础的一些操作就到这里了,如果发现出现问题请务必告知,谢谢