RxJava源码初探

Demo分析

响应式编程的概念现在火的一塌糊涂,各种RxXXX库层出不穷,虽然这些库的实现语言各不相同,但是原理都是一样的。我的理解是这些库主要都包含三个东西:Observable, OnSubscribe, Subscriber。阅读本文的读者必须懂的这些概念,初学者建议看下RxJava专题 上的文章再来看本文。我们就从源码层级来分析一下这中间的事件流,线程切换是怎么个原理。这里交代下本文分析的RxJava的版本是1.1.0 先来看个简单的Demo实例

Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("hello world");
            subscriber.onCompleted();
            }
        }).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onNext(String s) {
            }
        });

上面的demo就是简单的创建一个Observable,然后触发事件流。ok, 我们一步一步来分析。我们的Demo先是调用Observabale的create方法,而create方法其实内部就是调用Observale的构造函数来实例化一个Observable对象。再然后就是调用subscrible方法

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // 此处省略一堆验证参数的代码

        // new Subscriber so onStart it
        subscriber.onStart();

        try {
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            try {
                subscriber.onError(hook.onSubscribeError(e));
            } catch (Throwable e2) {
                throw r;
            }
            return Subscriptions.unsubscribed();
        }
    }

创建完Observable对象后,就会调用subscribe方法触发事件流。subscribe方法会先做一些参数验证,然后调用subscriber.onStart()方法,再然后调用我们在创建Observable的时候传递进去的OnSubscribe对象的call方法来触发Subscriber对象的onNext, onComplete 或者onError方法。用个图来表示。

这里为了方便都采用了简写

* Observable ———> Ob

* OnSubscribe ———> OnSub

* Subscriber———> Sub

图中的箭头表示调用关系,上图中的箭头表示OnSub的call方法的调用,而call方法中又会调用Sub对象的方法onNext, onError或者onComplete方法。

map/lift方法分析

分析map方法和分析lift方法是一样的,弄懂了一个,另外一个自然也就懂了。ok, 先看下Observable的map方法的代码

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }

可以看到map方法实质上就是调用lift方法实现的,这里我们先要知道Func1<? super T, ? extends R> 和OperatorMap<T, R>对象充当什么样的角色。先看下Func1的代码

public interface Func1<T, R> extends Function {
    R call(T t);
}

Func1是一个接口,它只有一个方法, 传入一个T类型的对象,返回一个R类型的对象,所以它是一个转换器。调用它的call方法可以将第一个泛型类型的对象T转换成第二个泛型类型的对象R。

那OperatorMap呢?看下代码

public final class OperatorMap<T, R> implements Operator<R, T> {

    private final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {

            @Override
            public void onCompleted() {
                o.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }

            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwOrReport(e, this, t);
                }
            }

        };
    }

看这段代码我们知道OperatorMap<R, T> 实现了Operator< T, R>接口,这里要注意两个泛型 T 和 R 在OperatorMap和Operator里面的顺序是对换的。先来看下Operator接口的代码

public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
        // cover for generics insanity
    }

在这里大家不要去记住泛型T,泛型R啊什么的, 我们按顺序来区分它们,就记住第一个泛型对象,第二个泛型对象。上面我们说道Func1是个转换器, 那Operator也是个转换器咯,不过它不是把第一个泛型的对象转换成第二个泛型的对象, 而是将Susbcriber<第一个泛型>对象转换成Sbscriber<第二个泛型>对象。再根据OperatorMap<T, R> implements Operator<R, T> OperatorMap<R, T> 和Operator< T, R>的泛型顺序是相反的可知,OperatorMap<T, R >转换器的功能是将Subscriber<第二个泛型>对象转换成Subscriber<第一个泛型>对象。 再来看下lift方法

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                    Subscriber<? super T> st = hook.onLift(operator).call(o);
                    try {
                        st.onStart();
                        onSubscribe.call(st);
                    } catch (Throwable e) {
               // 此处省略无关代码
                        st.onError(e);
                    }
                } catch (Throwable e) {
                   // 此处省略无关代码
                    o.onError(e);
                }
            }
        });
    }

lift方法就是创建了一个Observable<R>对象并返回。每个Observable<R>对象必须有一个OnSubscribe<R>对象,所以lift方法中也实例化了一个OnSubscribe<R>对象并传递给Observable<R>。该OnSubscribe对象的call方法又会调用OperatorMap的call方法创建一个Subscriber,最后组成了一个Observable对象返回。那么一个简单的demo加上一个map方法的图就是这样的。

上面的Ob<T>图我们已经见过了,下面的Ob<R>就是lift创建并返回的,从lift代码中我们知道Observable<R>中的OnSubscribe<R>对象的call(Subscriber<R> subscriber)方法会调用原始OnSubscribe<T>对象的call(Subscriber<T> subscriber)方法。在简单domo中,Subscriber<T>对象是我们创建并调用Observable<T>的subscribe方法传递进去的,那这里的Subscriber<T>是哪来的呢。答案就是OperatorMap的call方法,该call方法会将一个Subscriber<R>对象转换成Subscriber<T>对象。 新创建的Subscriber<T>对象的onNext, onError或者onComplete方法都会调用Subscriber<R>的相应方法。这也是上图中右边的由Sub<T>指向Sub<R>的表示意思。我们调用map方法时需要传入的Func1对象的call方法也是在Subscriber<T>的onNext方法中被调用。 那这个Subscriber<R>对象自然就是我们在代码实例化并通过subscribe方法传递给新的Observable<R>对象的参数咯。这里说下图中的虚线和实线的意思,虚线表示OnSubscribe对象的call方法调用, 实线表示Subscriber对象的onNext, onError或者onComplete方法的调用。

flatMap方法分析

先上代码

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        if (getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
        }
        return merge(map(func));
    }

可以看到有两条路径可以走,第一条是当原始Observable是ScalarSynchronousObservable对象的时候才会执行的。我能找到的只有Observable的just方法才会创建ScalarSynchornousObservable对象,其他的都是直接new一个Observable对象。那我们先来分析原始Observable<T>是调用just方法创建的情况,即第一条路径ScalarSynchronousObservable.scalarFlatMap(func)的执行情况。

public <R> Observable<R> scalarFlatMap(final Func1<? super T, ? extends Observable<? extends R>> func) {
        return create(new OnSubscribe<R>() {
            @Override
            public void call(final Subscriber<? super R> child) {
                Observable<? extends R> o = func.call(t);
                if (o.getClass() == ScalarSynchronousObservable.class) {
                    child.onNext(((ScalarSynchronousObservable<? extends R>)o).t);
                    child.onCompleted();
                } else {
                    o.unsafeSubscribe(new Subscriber<R>(child) {
                        @Override
                        public void onNext(R v) {
                            child.onNext(v);
                        }
                        @Override
                        public void onError(Throwable e) {
                            child.onError(e);
                        }
                        @Override
                        public void onCompleted() {
                            child.onCompleted();
                        }
                    });
                }
            }
        });
    }

ScalarSynchronousObservable的scalarFlatMap方法创建了一个新的Observable<R>对象。算上原始Observable<T>对象和我们Func1对象的call函数创建的Observable<R>对象,我们现在一共有三个Observable对象。

如果我们提供的Func1对象的call函数中创建Observable<R>对象的时候是调用Observable.just方法创建的,则if里面的条件成立,那么会执行

Observable<? extends R> o = func.call(t);
if (o.getClass() == ScalarSynchronousObservable.class) {
                    child.onNext(((ScalarSynchronousObservable<? extends R>)o).t);
                    child.onCompleted();
                } 

这里要注意下func.call(t)的t对象是原始Observable.just(T t)方法传递进去的t。而ScalarSynchronousObservable<? extends R>)o).t的t是我们在Func1.call方法里面用just(T t)方法创建Observable对象时传递进去的。

最后的图就是这样了。原始的Observable<T>已经没什么用了,它就当做一个提供数据T的容器,被Func1.call方法用来取数据Observable<T>.t。Func1.call方法将T对象转换成Observable<R>对象,该Obersvable<R>也没什么用,只是当做数据R的容器,被最后的Observable<R>里面的OnSubscrib<R>.call方法调用到,用来获取R数据对象并传递给Sub<R>.onNext(R r)方法。

ok, 如果Func1.call方法里面提供的Observable<R>不是通过just创建的,那if条件就不成立,那代码就会走else里面的

Observable<? extends R> o = func.call(t);
o.unsafeSubscribe(new Subscriber<R>(child) {
                        @Override
                        public void onNext(R v) {
                            child.onNext(v);
                        }
                        @Override
                        public void onError(Throwable e) {
                            child.onError(e);
                        }
                        @Override
                        public void onCompleted() {
                            child.onCompleted();
                        }
                    }

那么最后的图就是这样的

上图中标明依赖 字样的虚线表示依赖关系,并不是事件流。那原始Observable<T>不是通过just创建的呢,是普通的Observable对象,在flatMap函数中代码就是走merge(map(func))。map函数我们已经知道是怎么回事了,所以先上执行完map的图

再看下merge函数

public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
        if (source.getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
        }
        return source.lift(OperatorMerge.<T>instance(false));
    }

代码将会执行source.lift方法。为什么不会执行上面if里面的代码?因为经过map函数调用生成的Ob<Ob<R>>不是ScalarSynchronousObservable类型的对象。lift方法的图谱我们在分析map方法时已经知道了怎么画,直接上图再解释

一个完整的图就是上面那样的了,问题的关键点是Sub<Ob<R>>是如何触发Sub<R>的事件的,即右下角的那条实线箭头是怎么来的。在map方法的分析中,我们知道决定Sub<Ob<R>>触发Sub<R>事件的代码在Operator中。在这里就是OperatorMerge的call方法。

@Override
    public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
        MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
        MergeProducer<T> producer = new MergeProducer<T>(subscriber);
        subscriber.producer = producer;

        child.add(subscriber);
        child.setProducer(producer);

        return subscriber;
    }

当MergeSubscriber的onNext, onComplete或者onError方法被调用时,都会调用其emit方法, 而emit方法又会调用emitLoop方法,看下emitLoop方法

void emitLoop() {
           // 删除掉大量的无关的代码
            try {
                final Subscriber<? super T> child = this.child;
                for (;;) {
                    Queue<Object> svq = queue;
                    long r = producer.get();
                    boolean unbounded = r == Long.MAX_VALUE;
                    if (svq != null) {
                        for (;;) {
                            while (r > 0) {
                                o = svq.poll();
                                T v = nl.getValue(o);
                                try {
                                    child.onNext(v);
                                } catch (Throwable t) {
                                }
                                r--;
                            }
                        }
                     }
                 }
             }catch (Exception e){
             }
        }

在这里会多次触发child.onNext()事件,于是就是现实了由一个Subscriber.onNext(T t)事件裂变成多个Subscriber.onNext(R r)事件。具体整个流程相当麻烦,有兴趣的读者可以自己慢慢研究下。

observeOn方法分析

老规矩,先上源码

public final Observable<T> observeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return lift(new OperatorObserveOn<T>(scheduler));
    }

跟flatMap很像,有两种情况,先分析第一种情况原始Observable<T>是由just方法创建的情况,just方法创建的是ScalrSynchronousObservable对象,所以代码走((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);

public Observable<T> scalarScheduleOn(Scheduler scheduler) {
        if (scheduler instanceof EventLoopsScheduler) {
            EventLoopsScheduler es = (EventLoopsScheduler) scheduler;
            return create(new DirectScheduledEmission<T>(es, t));
        }
        return create(new NormalScheduledEmission<T>(scheduler, t));
    }

这里可以看到根据Scheduler的类型又分为两种情况,不过都是创建一个新的Observable<T>对象。那么不同的就是两个OnSubscribe<T>对象了,一个是DirectScheduledEmission,另一个是NormalScheduleEmission。

static final class DirectScheduledEmission<T> implements OnSubscribe<T> {
        private final EventLoopsScheduler es;
        private final T value;
        DirectScheduledEmission(EventLoopsScheduler es, T value) {
            this.es = es;
            this.value = value;
        }
        @Override
        public void call(final Subscriber<? super T> child) {
            child.add(es.scheduleDirect(new ScalarSynchronousAction<T>(child, value)));
            //这个方法的代码看不懂,可以把它拆开看,拆开后如下
            //ScalarSynchronousAction<T> scalarAction = new ScalarSynchronousAction<T>(child, value);
            //Subscription subscription = es.scheduleDirect(scalrAction);
            //child.add(subscription);
        }
    }
    /** Emits a scalar value on a general scheduler. */
    static final class NormalScheduledEmission<T> implements OnSubscribe<T> {
        private final Scheduler scheduler;
        private final T value;

        NormalScheduledEmission(Scheduler scheduler, T value) {
            this.scheduler = scheduler;
            this.value = value;
        }

        @Override
        public void call(final Subscriber<? super T> subscriber) {
            Worker worker = scheduler.createWorker();
            subscriber.add(worker);
            worker.schedule(new ScalarSynchronousAction<T>(subscriber, value));
        }
    }
    /** Action that emits a single value when called. */
    static final class ScalarSynchronousAction<T> implements Action0 {
        private final Subscriber<? super T> subscriber;
        private final T value;

        private ScalarSynchronousAction(Subscriber<? super T> subscriber,
                T value) {
            this.subscriber = subscriber;
            this.value = value;
        }

        @Override
        public void call() {
            try {
                subscriber.onNext(value);
            } catch (Throwable t) {
                subscriber.onError(t);
                return;
            }
            subscriber.onCompleted();
        }
    }

ScalarSynchronousAction是一个Action0,它的call方法会触发subscriber的onNext, onError或者onComplete事件。而DirectScheduledEmission和NormalScheduledEmission都是OnSubscribe对象,它们的call方法中都会将ScalarSychronousAction放入一个线程池Scheduler中去执行。所以当Observable<T>是ScalarSynchronousObservable<T>的时候即通过just方法创建出来的时候,事件图是这样的。

Scheduler是在OnSubscribe<T>的call方法中启动并切换线程的。

那么原始Observable<T>不是ScalrSynchronousObservable的时候,observeOn方法的代码走向是lift(new OperatorObserveOn(scheduler))。lift方法的事件图我们已经知道了,先上图

lift方法里面会创建一个新的Observable<T>对象和一个新的OnSubscribe<T>对象,新的OnSubscribe<T>.call方法会调用原始的OnSubscribe<T>.call(Subscriber<T> subscriber)方法。这个Subscriber<T>参数对象是通过OperatorObserveOn对象的call方法获取的。

@Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child);
            parent.init();
            return parent;
        }
    }

当Scheduler是ImmediateScheduler或者TrampolineScheduler对象时,返回的是我们代码里面创建并传递给Observable.subscribe()方法的Subscriber对象而且这个过程并没有使用到Scheduler去切换线程,也就是说这个过程依然运行在当前线程。那么事件图应该是这样的。

如果Scheduler不是ImmediateScheduler和TrampolineScheduler的话,OperatorObserveOn对象的call方法就会执行以下代码

@Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        //省略掉一些代码
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child);
            parent.init();
            return parent;

    }

这里要关注的就是ObserveOnSubscriber对象,它是一个Subscriber对象。ok,也看下代码

private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
        final Subscriber<? super T> child;
        final Scheduler.Worker recursiveScheduler;

        public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
            this.child = child;
            this.recursiveScheduler = scheduler.createWorker();
            //省略掉无关代码
        }

        void init() {
            child.add(scheduledUnsubscribe);
            child.setProducer(new Producer() {

                @Override
                public void request(long n) {
                    BackpressureUtils.getAndAddRequest(requested, n);
                    schedule();
                }

            });
            child.add(recursiveScheduler);
            child.add(this);
        }

        @Override
        public void onStart() {
            request(RxRingBuffer.SIZE);
        }

        @Override
        public void onNext(final T t) {
            //省略掉无关代码
            schedule();
        }

        @Override
        public void onCompleted() {
            //省略掉无关代码
            schedule();
        }

        @Override
        public void onError(final Throwable e) {
           //省略掉无关代码
            schedule();
        }

        final Action0 action = new Action0() {

            @Override
            public void call() {
                pollQueue();
            }

        };

        protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                recursiveScheduler.schedule(action);
            }
        }

        // only execute this from schedule()
        void pollQueue() {
            //省略掉无关代码
            do {
                for (;;) {
                    if (finished) {
                        if ((error = this.error) != null) {
                            queue.clear();
                            child.onError(error);
                            return;
                        } else if (queue.isEmpty()) {
                            child.onCompleted();
                            return;
                        }
                    }
                    if (r > 0) {
                        Object o = queue.poll();
                        if (o != null) {
                            child.onNext(on.getValue(o));
                        } else {
                            break;
                        }
                    } else {
                        break;
                    }
                }
            if (emitted > 0) {
                request(emitted);
            }
        }
    }

可以看出当ObserveOnSubscriber对象的onNext, onError或者onComplete事件被触发时,它们会调用schedule方法,而schedule方法会将Action0加入线程池去执行,也就是说Action0的call方法执行的代码是在scheduler切换后的线程中执行,Action0.call方法会调用pollQueue去触发新的Subscriber<T>的onNext, onError或者onComplete事件。最后的事件图是这样的。

上图中的 Scheduler作用于 表示 在Subscriber<T>对象的onNext, onError或者onComplete方法中调用Scheduler的方法去切换线程。

subscribeOn方法分析

先看subscriveOn方法的代码

public final Observable<T> subscribeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return nest().lift(new OperatorSubscribeOn<T>(scheduler));
    }

当原始Observable<T>对象是ScalarSynchronousObservable对象时,即原始Observable<T>是由just方法创建的时候,代码和observeOn方法中情况一样,直接上事件流图

Scheduler是在OnSubscribe<T>的call方法中启动并切换线程的。那么原始Observable<T>不是ScalrSynchronousObservable的时候,subscribeOn方法的代码走向是nest().lift(new OperatorSubscribeOn<T>(scheduler));

先看下nest方法的代码

public final Observable<Observable<T>> nest() {
        return just(this);
    }

public final static <T> Observable<T> just(final T value) {
        return ScalarSynchronousObservable.create(value);
    }

public final class ScalarSynchronousObservable<T> extends Observable<T> {

    public static final <T> ScalarSynchronousObservable<T> create(T t) {
        return new ScalarSynchronousObservable<T>(t);
    }

    private final T t;

    protected ScalarSynchronousObservable(final T t) {
        super(new OnSubscribe<T>() {

            @Override
            public void call(Subscriber<? super T> s) {
                s.onNext(t);
                s.onCompleted();
            }

        });
        this.t = t;
    }

    public T get() {
        return t;
    }
    //此处省略掉无关代码
}

nest方法其实是调用just方法创建一个ScalarSynchronousObservable对象。这里要注意的是just方法的参数是原始Observable<T>对象,执行完nest方法的图如下

原始Observable<T>对象被当做一个属性保存在新创建出来的ScalarSynchronousObservable对象当中。以上就是nest方法执行完之后的情况,按照nest().lift(new OperatorSubscribeOn<T>(scheduler)); 的代码走向,我们来看下执行lift的时候事件流的图是怎么样的。先上一张没有分析OperatorSubscribeOn代码的图

lift方法会创建一个Observable<T>对象,该对象的内部有一个OnSubscribe<T>对象,它的call方法会调用OperatorSubscribeOn.call方法得到一个Subscriber<Observable<T>>对象,并在调用上面的Observable<Observable<T>>.OnSubscribe<Observable<T>>.call方法时传递进去。那么来看下OperatorSubscribeOn.call是如何将一个Subscribe<T>转换成Subscriber<Observable<T>>对象的。

public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {

    private final Scheduler scheduler;

    public OperatorSubscribeOn(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    @Override
    public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        return new Subscriber<Observable<T>>(subscriber) {

            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }

            @Override
            public void onNext(final Observable<T> o) {
                inner.schedule(new Action0() {

                    @Override
                    public void call() {
                        final Thread t = Thread.currentThread();
                        o.unsafeSubscribe(new Subscriber<T>(subscriber) {

                            @Override
                            public void onCompleted() {
                                subscriber.onCompleted();
                            }

                            @Override
                            public void onError(Throwable e) {
                                subscriber.onError(e);
                            }

                            @Override
                            public void onNext(T t) {
                                subscriber.onNext(t);
                            }

                            @Override
                            public void setProducer(final Producer producer) {
                                subscriber.setProducer(new Producer() {

                                    @Override
                                    public void request(final long n) {
                                        if (Thread.currentThread() == t) {
                                            producer.request(n);
                                        } else {
                                            inner.schedule(new Action0() {

                                                @Override
                                                public void call() {
                                                    producer.request(n);
                                                }
                                            });
                                        }
                                    }

                                });
                            }

                        });
                    }
                });
            }

        };
    }
}

可以看到OperatorSubscribeOn对象的call方法里面实例化了一个Subscriber<T>对象并返回。该Subscriber<T>的onNext方法中会调用Scheduler.schedule将一个Action0加入线程池里执行,该Action0的call方法会触发原始Observable<T>的unsafeSubscribe方法,然后会触发原始OnSubscribe<T>.call方法,也就是说原始OnSubscribe<T>.call方法是在切换后的线程中执行。如图

时间: 2024-09-29 09:03:19

RxJava源码初探的相关文章

快速理解RxJava源码的设计理念

前言 我在看过几篇关于RxJava源码分析的博客后,不知是我的水平有限还是源码过于博大精深,导致花了很长的时间才搞清楚其运行原理.我个人觉得应该有更好的办法来快速剖析理解,于是决定写下本文. 本文适合已经看过一些RxJava源码分析资料的同学,不过没看过也没关系.在看本文时可参考这篇博客:RxJava基本流程和lift源码分析,它说得比较全,在此感谢博主大头鬼Bruce. 一.初探RxJava [以下摘录了RxJava基本流程和lift源码分析] 我们先来看一段最基本的代码,分析这段代码在RxJ

minidlna源码初探(二)—— SSDP设备发现的大致流程

前言: 之前有专文介绍了minidlna中的UPNP功能,内中介绍其中包含的SSDP(简单发现协议),SOAP(简单对象访问协议)等几个协议(http://blog.csdn.net/sakaue/article/details/19070735).本文将根据minidlna的程序流程,概述SSDP的流程,为下一部分ACE实现做铺垫. 设备发现的大致流程: 首先,根据UPNP的规范: 在设备加入网络,UPnP发现协议允许设备向控制点广告它的服务.它使用向一个标准地址和端口多址传送发现消息来实现.

Servlet源码初探

年底,公司的事情告一段落,就来捣鼓一下这个Servlet源码,为下一步的spingmvc源码初探做准备 1.Servlet接口 public interface Servlet { void init(ServletConfig var1) throws ServletException; ServletConfig getServletConfig(); void service(ServletRequest var1, ServletResponse var2) throws Servlet

RxJava源码浅析

Create 创建一个Observable比较简单,最基础的方法是调用Observable的create方法进行创建,贴一下示例: Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { //执行想要的操作 } }); 它的源码实现也比较简单,在create的时候创建出一个Observa

浅谈RxJava源码解析(观察者),创建(create、from、just),变换(Map、flatMap)、线程调度

一.创建操作: 1.观察者模式:RxJava的世界里,我们有四种角色: Observable<T>(被观察者).Observer(观察者) Subscriber(订阅者).Subject Observable和Subject是两个"生产"实体,Observer和Subscriber是两个"消费"实体.Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Ob

React Native IOS源码初探

原文链接 http://www.open-open.com/lib/view/open1465637638193.html 每个项目都有一个入口,然后进行初始化操作,React Native 也不例外.一个不含 Objective-C 代码的项目留给我们的唯一线索就是位于 AppDelegate 文件中的代码: RCTRootView *rootView = [[RCTRootView alloc] initWithBundleURL:jsCodeLocation moduleName:@"re

[java学习]java容器源码初探(1)

一.动态数组ArrayList 在我们开发者眼中,这就是一个"动态数组",可以"动态"地调整数组的大小,虽然说数组从定义了长度后,就不能改变大小. 实现"动态"调整的基本原理就是:按照某个调整策略,重新创建一个调整后一样大小的数组,然后将原来的数组赋值回去. 下面我们来解析一下几个与数组不一样的方法. 看看ArrayList中主要的几个字段(源码剖析): // 默认的初始数组大小 private static final int DEFAULT_

Selenium3笔记-WebDriver源码初探

Selenium3 有哪些变化? 其实相对于与Selenium2,Selenium3没有做太多的改动.下面给出官方的文档说明,供参考. 参考文档:https://seleniumhq.wordpress.com/2013/08/28/the-road-to-selenium-3/ "We aim for Selenium 3 to be "a tool for user-focused automation of mobile and web apps",Developers

【原创】JAVA并发编程——Callable和Future源码初探

JAVA多线程实现方式主要有三种:继承Thread类.实现Runnable接口.使用ExecutorService.Callable.Future实现有返回结果的多线程.其中前两种方式线程执行完后都没有返回值,只有最后一种是带返回值的. thread和runnable不讨论了. 太多地方可以找到他们的探究了 重点看callable和future, 先上一段代码: package com.future.chenjun.test; import java.util.concurrent.Callab