RxJava observeOn()与subscribeOn()的关系

observeOn和subscribeOn都是对observable的一种操作,区别就是subscribeOn改变了observable本身产生事件的schedule以及发出事件后相关处理事件的程序所在的schedule,而obseveron仅仅是改变了对发出事件后相关处理事件的程序所在的schedule。

或许你会问,这有多大的区别吗?的确是有的,比如说产生observable事件是一件费时可能会卡主线程的操作(比如说获取网络数据),那么subscribeOn就是你的选择,这样可以避免卡住主线程。

两者最主要的差别是影响的范围不同,observeOn is more limited,但是却是可以多次调用,多次改变不同的接受者所在的schedule,在调用这个函数之后的observable造成影响。而subscribeOn则是一次性的,无论在什么地方调用,总是从改变最原始的observable开始影响整个observable的处理。

subscribeOn()和observeOn()的区别

  • subscribeOn()主要改变的是订阅的线程,即call()执行的线程;
  • observeOn()主要改变的是发送的线程,即onNext()执行的线程。

subscribeOn

我们先看一个例子。

       Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        subscriber.onNext("a");
                        subscriber.onNext("b");

                        subscriber.onCompleted();
                    }
                })
                .subscribeOn(Schedulers.io())

                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(String integer) {
                        System.out.println(integer);
                    }
                });

运行如下:

a
b

我们看一下subscribeOn()中,都干了什么

public final Observable<T> subscribeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return create(new OperatorSubscribeOn<T>(this, scheduler));
    }

很明显,会走if之外的方法。

在这里我们可以看到,我们又创建了一个Observable对象,但创建时传入的参数为OperatorSubscribeOn(this,scheduler),我们看一下此对象以及其对应的构造方法

OperatorSubscribeOn代码:

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;
    final Observable<T> source;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);

        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();

                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };

                source.unsafeSubscribe(s);
            }
        });
    }
}

可以看到,OperatorSubscribeOn实现Onsubscribe,并且由其构造方法可知,他保存了上一个Observable对象,并保存了Scheduler对象。

这里做个总结。

把Observable.create()创建的称之为Observable_1,OnSubscribe_1。把subscribeOn()创建的称之为Observable_2,OnSubscribe_2(= OperatorSubscribeOn)。

那么,前两步就是创建了两个的observable,和OnSubscribe,并且OnSubscribe_2中保存了Observable_1的应用,即source。

调用Observable_2.subscribe()方法会调用OnSubscibe_2的call方法,即OperatorSubscribeOn的call()。

下面分析下call()方法。

  • inner.schedule()改变了线程,此时Action的call()在我们指定的线程中运行。
  • Subscriber被包装了一层。
  • source.unsafeSubscribe(s);,注意source是Observable_1对象。

unsafeSubscribe方法代码:

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
        try {
            // new Subscriber so onStart it
            subscriber.onStart();
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(this, onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(hook.onSubscribeError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                hook.onSubscribeError(r);
                // TODO why aren‘t we throwing the hook‘s return value.
                throw r;
            }
            return Subscriptions.unsubscribed();
        }
    }

代码很多,关键代码:

hook.onSubscribeStart(this, onSubscribe).call(subscriber);

该方法即调用了OnSubscribe_1.call()方法。注意,此时的call()方法在我们指定的线程中运行。那么就起到了改变线程的作用。

对于以上线程,我们可以总结,其有如下流程:

  • Observable.create() : 创建了Observable_1和OnSubscribe_1;
  • subscribeOn(): 创建Observable_2和OperatorSubscribeOn(OnSubscribe_2),同时OperatorSubscribeOn保存了Observable_1的引用。
  • observable_2.subscribe(Observer):
    • 调用OperatorSubscribeOn的call()。call()改变了线程的运行,并且调用了Observable_1.unsafeSubscribe(s);
    • Observable_1.unsafeSubscribe(s);,该方法的实现中调用了OnSubscribe_1的call()。

      从这个可以了解,无论我们的subscribeOn()放在哪里,他改变的是subscribe()的过程,而不是onNext()的过程。

那么如果有多个subscribeOn(),那么线程会怎样执行呢。如果按照我们的逻辑,有以下程序

Observable.just("ss")
                .subscribeOn(Schedulers.io())   // ----1---
                .subscribeOn(Schedulers.newThread()) //----2----
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {

                    }
                });

那么,我们根据之前的源码分析其执行逻辑。

  • Observable.just(“ss”),创建Observable_1,OnSubscribe_1
  • Observable_1.subscribeOn(Schedulers.io()):创建observable_2,OperatorSubscribeOn_2并保存Observable_1的引用。
  • observable_2.subscribeOn(Schedulers.newThread()):创建Observable_3,OperatorSubscribeOn_3并保存Observable_2的引用。
  • Observable_3.subscribe():
    • 调用OperatorSubscribeOn_3.call(),改变线程为Schedulers.newThread()。
    • 调用OperatorSubscribeOn_2.call(),改变线程为Schedulers.io()。
    • 调用OnSubscribe_1.call(),此时call()运行在Schedulers.io()。

      根据以上逻辑分析,会按照1的线程进行执行。



subscribeOn如何工作,关键代码其实就是一行代码:

source.unsafeSubscribe(s);

注意它所在的位置,是在worker的call里面,说白了,就是把source.subscribe这一行调用放在指定的线程里,那么总结起来的结论就是:

subscribeOn的调用,改变了调用前序列所运行的线程。


observeOn

看一下observeOn()源码:

public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, RxRingBuffer.SIZE);
    }

public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
        return observeOn(scheduler, false, bufferSize);
    }

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
    }

这里引出了新的操作符lift

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }

这里不再介绍了,详见:http://blog.csdn.net/jdsjlzx/article/details/51686152

在lift()中,有如下关键代码:

Subscriber<? super T> st = hook.onLift(operator).call(o);

OperatorObserveOn.call()核心代码:

 @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
            parent.init();
            return parent;
        }
    }

我们看到其返回了ObserveOnSubscriber< T>,注意:此时只调用了call()方法,但call()方法中并没有改变线程的操作,此时为subscribe()过程。

我们直奔重点,因为,我们了解到其改变的是onNext()过程,那么我们肯定要看一下ObserveOnSubscriber.onNext()找找在哪改变线程

@Override
public void onNext(final T t) {
            if (isUnsubscribed() || finished) {
                return;
            }
            if (!queue.offer(on.next(t))) {
                onError(new MissingBackpressureException());
                return;
            }
            schedule();
        }

这里做了两件事,首先把结果缓存到一个队列里,然后调用schedule启动传入的worker

我们这里需要注意下:

在调用observeOn前的序列,把结果传入到onNext就是它的工作,它并不关心后续的流程,所以工作就到这里就结束了,剩下的交给ObserveOnSubscriber继续。

onNext方法最后调用了schedule(),从方法名可以看到,其肯定是改变线程用的,并且该方法经过一番循环之后,调用了该类的call()方法。

protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                recursiveScheduler.schedule(this);
            }
        }

recursiveScheduler 就是之前我们传入的Scheduler,我们一般会在observeOn传入AndroidScheluders.mainThread()。

scheduler中调用的call()方法

        // only execute this from schedule()
        @Override
        public void call() {
            long missed = 1L;
            long currentEmission = emitted;

            // these are accessed in a tight loop around atomics so
            // loading them into local variables avoids the mandatory re-reading
            // of the constant fields
            final Queue<Object> q = this.queue;
            final Subscriber<? super T> localChild = this.child;
            final NotificationLite<T> localOn = this.on;

            // requested and counter are not included to avoid JIT issues with register spilling
            // and their access is is amortized because they are part of the outer loop which runs
            // less frequently (usually after each bufferSize elements)

            for (;;) {
                long requestAmount = requested.get();

                while (requestAmount != currentEmission) {
                    boolean done = finished;
                    Object v = q.poll();
                    boolean empty = v == null;

                    if (checkTerminated(done, empty, localChild, q)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    localChild.onNext(localOn.getValue(v));

                    currentEmission++;
                    if (currentEmission == limit) {
                        requestAmount = BackpressureUtils.produced(requested, currentEmission);
                        request(currentEmission);
                        currentEmission = 0L;
                    }
                }

                if (requestAmount == currentEmission) {
                    if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                        return;
                    }
                }

                emitted = currentEmission;
                missed = counter.addAndGet(-missed);
                if (missed == 0L) {
                    break;
                }
            }
        }

call()中有localChild.onNext(localOn.getValue(v));调用。

在Scheduler启动后, 我们在Observable.subscribe(a)传入的a就是这里的child, 我们看到,在call中终于调用了它的onNext方法,把真正的结果传了出去,但是在这里,我们是工作在observeOn的线程上的。

总结起来的结论就是:

  1. observeOn 对调用之前的序列默不关心,也不会要求之前的序列运行在指定的线程上
  2. observeOn 对之前的序列产生的结果先缓存起来,然后再在指定的线程上,推送给最终的subscriber

observeOn改变的是onNext()调用

subcribeOn和observeOn 对比分析

Observable
.map                    // 操作1
.flatMap                // 操作2
.subscribeOn(io)
.map                    //操作3
.flatMap                //操作4
.observeOn(main)
.map                    //操作5
.flatMap                //操作6
.subscribeOn(io)        //!!特别注意
.subscribe(handleData)

有如上逻辑,则我们对其运行进行分析。

首先,我们需要先明白其内部执行的逻辑。

在调用subscribe之后,逻辑开始运行。分别调用每一步OnSubscribe.call(),注意:自下往上。当运行到最上,即Observable.create()后,我们在其中调用了subscriber.onNext(),于是程序开始自上往下执行每一个对象的subscriber.onNext()方法。最终,直到subscribe()中的回调。

其次,从上面对subscribeOn()和observeOn()的分析中可以明白,subscribeOn()是在call()方法中起作用,而observeOn()实在onNext()中作用。

那么对于以上的逻辑,我们可以得出如下结论:

  • 操作1,2,3,4在io线程中,因为在如果没有observeOn()影响,他们的回调操作默认在订阅的线程中。而我们的订阅线程在subscribeOn(io)发生了改变。注意他们执行的先后顺序。
  • 操作5,6在main线程中运行。因为observeOn()改变了onNext().
  • 特别注意那一个逻辑没起到作用

再简单点总结就是

  1. subscribeOn的调用切换之前的线程。
  2. observeOn的调用切换之后的线程。
  3. observeOn之后,不可再调用subscribeOn 切换线程

复杂情况

我们经常多次使用subscribeOn切换线程,那么以后是否可以组合observeOn和subscribeOn达到自由切换的目的呢?

组合是可以的,但是他们的执行顺序是有条件的,如果仔细分析的话,可以知道observeOn调用之后,再调用subscribeOn是无效的,原因是什么?

因为subscribeOn改变的是subscribe这句调用所在的线程,大多数情况,产生内容和消费内容是在同一线程的,所以改变了产生内容所在的线程,就改变了消费内容所在的线程。

经过上面的阐述,我们知道,observeOn的工作原理是把消费结果先缓存,再切换到新线程上让原始消费者消费,它和生产者是没有一点关系的,就算subscribeOn调用了,也只是改变observeOn这个消费者所在的线程,和OperatorObserveOn中存储的原始消费者一点关系都没有,它还是由observeOn控制。

@扔物线 大神给的总结:

  1. 下面提到的“操作”包括产生事件、用操作符操作事件以及最终的通过 subscriber 消费事件;
  2. 只有第一subscribeOn() 起作用(所以多个 subscribeOn() 吴意义);
  3. 这个 subscribeOn() 控制从流程开始的第一个操作,直到遇到第一个 observeOn();
  4. observeOn() 可以使用多次,每个 observeOn() 将导致一次线程切换(),这次切换开始于这次 observeOn() 的下一个操作;
  5. 不论是 subscribeOn() 还是 observeOn(),每次线程切换如果不受到下一个 observeOn() 的干预,线程将不再改变,不会自动切换到其他线程。

参考文章:

https://segmentfault.com/a/1190000004856071

http://blog.csdn.net/lisdye2/article/details/51113837

虽然文章大部分是参考的,但是我写这个博客花了3个小时多,看源码真的很头疼,希望以后能有所提高。

时间: 2024-10-04 02:13:17

RxJava observeOn()与subscribeOn()的关系的相关文章

Rxjava observeOn()和subscribeOn()初探

Rxjava这么强大的类库怎么可能没有多线程切换呢? 其中observeOn()与subscribeOn()就是实现这样的作用的.本文主要讲解observeOn()与subscribeOn()的用法,不去探究其中的原理. 0. 默认情况 在默认情况下,其不做任何线程处理,Observable和Observer处于同一线程,没有做任何线程切换,依次执行,如下图所示: 可以写一个demo测试之: Observable<String> source = Observable.just("A

RxJava Observer与Subscriber的关系

在说Observer与Subscriber的关系之前,我们下重温下相关概念. RxJava 的观察者模式 RxJava 有四个基本概念:Observable (可观察者,即被观察者). Observer (观察者). subscribe (订阅).事件.Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer. 与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNe

RxJava从入门到放弃---关于RxJava-入门必看

RxJava 到底是什么 RxJava 好在哪 API 介绍和原理简析 1. 概念:扩展的观察者模式 观察者模式 RxJava 的观察者模式 2. 基本实现 1) 创建 Observer 2) 创建 Observable 3) Subscribe (订阅) 4) 场景示例 a. 打印字符串数组 b. 由 id 取得图片并显示 3. 线程控制 -- Scheduler (一) 1) Scheduler 的 API (一) 2) Scheduler 的原理 (一) 4. 变换 1) API 2) 变

给 Android 开发者的 RxJava 详解

作者:扔物线 前言 我从去年开始使用 RxJava ,到现在一年多了.今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使用的场景越来越多 .而最近这几个月,我也发现国内越来越多的人开始提及 RxJava .有人说『RxJava 真是太好用了』,有人说『RxJava 真是太难用了』,另外更多的人表示:我真的百度了也谷歌了,但我还是想问: RxJava 到底是什么? 鉴于 RxJava 目前这种既火爆又神秘的现状,而我又在一年的使用

深入浅出RxJava就这一篇就够了

前言: 第一次接触RxJava是在前不久,一个新Android项目的启动,在评估时选择了RxJava.RxJava是一个基于事件订阅的异步执行的一个类库.听起来有点复杂,其实是要你使用过一次,就会大概明白它是怎么回事了!为是什么一个Android项目启动会联系到RxJava呢?因为在RxJava使用起来得到广泛的认可,又是基于Java语言的.自然会有善于组织和总结的开发者联想到Android!没错,RxAndroid就这样在RxJava的基础上,针对Android开发的一个库.今天我们主要是来讲

Rxjava入门与使用

认识 rxjava RxJava是 ReactiveX 在JVM上的一个实现,ReactiveX使用Observable序列 组合异步和基于事件 的程序. Observable(观察者) 和 Subscriber(订阅者)是两个主要的类.在 RxJava 上,一个 Observable 是一个发出数据流或者事件的类,Subscriber 是一个对这些发出的 items (数据流或者事件)进行处理(采取行动)的类.一个 Observable 的标准流发出一个或多个 item,然后成功完成或者出错.

RxJava中的doOnSubscribe默认运行线程分析

假设你对RxJava1.x还不是了解,能够參考以下文章. 1. RxJava使用介绍 [视频教程] 2. RxJava操作符 ? Creating Observables(Observable的创建操作符) [视频教程] ? Transforming Observables(Observable的转换操作符) [视频教程] ? Filtering Observables(Observable的过滤操作符) [视频教程] ? Combining Observables(Observable的组合操

RxJava入门

项目小版本上线,抽空简单学习了下久仰大名的RxJava 一.引入 个人觉得rxjava的特点: 强大灵活的事件流处理(多线程/多事件/复合对象) 强大灵活优雅简洁的异步 链式调用 可自动Lambda化 实现:RxJava 是通过一种扩展的观察者模式来实现的 类比 类比 实际 实际 职责 演讲者 Button (可)被订阅者 (同右) (可)被观察者 Observable 决定什么时候触发事件以及触发怎样的事件 听众 OnClickListener 订阅者 Subscriber 观察者 Obser

78. Android之 RxJava 详解

转载:http://gank.io/post/560e15be2dca930e00da1083 前言 我从去年开始使用 RxJava ,到现在一年多了.今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使用的场景越来越多 .而最近这几个月,我也发现国内越来越多的人开始提及 RxJava .有人说『RxJava 真是太好用了』,有人说『RxJava 真是太难用了』,另外更多的人表示:我真的百度了也谷歌了,但我还是想问: RxJava