RxJava(11-线程调度Scheduler)

转载请标明出处:

http://blog.csdn.net/xmxkf/article/details/51821940

本文出自:【openXu的博客】

目录:

  • 使用示例
  • subscribeOn原理
    • 多次subscribeOn的情况
  • observeOn原理
  • 调度器的种类
  • 各种操作符的默认调度器
  • 源码下载

??RxJava中 使用observeOnsubscribeOn操作符,你可以让Observable在一个特定的调度器上执行,observeOn指示一个Observable在一个特定的调度器上调用观察者的onNext, onErroronCompleted方法,subscribeOn则指示Observable将全部的处理过程(包括发射数据和通知)放在特定的调度器上执行。

1. 使用示例

先看看下面的例子,体验一下在RxJava中 如何使用线程的切换:

private void logThread(Object obj, Thread thread){
    Log.v(TAG, "onNext:"+obj+" -"+Thread.currentThread().getName());
}
Observable.OnSubscribe onSub = new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        Log.v(TAG, "OnSubscribe -"+Thread.currentThread());
        subscriber.onNext(1);
        subscriber.onCompleted();
    }
};
Log.v(TAG, "--------------①-------------");
Observable.create(onSub)
        .subscribe(integer->logThread(integer, Thread.currentThread()));
Log.v(TAG, "--------------②-------------");
Observable.create(onSub)
        .subscribeOn(Schedulers.io())
        .subscribe(integer->logThread(integer, Thread.currentThread()));
Log.v(TAG, "--------------③-------------");
Observable.create(onSub)
        .subscribeOn(Schedulers.newThread())
        .subscribe(integer->logThread(integer, Thread.currentThread()));
Log.v(TAG, "--------------④-------------");
Observable.create(onSub)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(integer->logThread(integer, Thread.currentThread()));
Log.v(TAG, "--------------⑤-------------");
Observable.create(onSub)
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.newThread())
        .subscribe(integer->logThread(integer, Thread.currentThread()));
Log.v(TAG, "--------------⑥-------------");
Observable.interval(100, TimeUnit.MILLISECONDS)
        .take(1)
        .subscribe(integer->logThread(integer, Thread.currentThread()));
/*
 输出:
 --------------①-------------
 OnSubscribe -Thread[main,5,main]
 onNext:1 -Thread[main,5,main]
 --------------②-------------
 OnSubscribe -Thread[RxIoScheduler-2,5,main]
 onNext:1 -Thread[RxIoScheduler-2,5,main]
 --------------③-------------
 OnSubscribe -Thread[RxNewThreadScheduler-1,5,main]
 onNext:1 -Thread[RxNewThreadScheduler-1,5,main]
 --------------④-------------
 OnSubscribe -Thread[RxNewThreadScheduler-2,5,main]
 onNext:1 -Thread[main,5,main]
 --------------⑤-------------
 OnSubscribe -Thread[RxNewThreadScheduler-4,5,main]
 onNext:1 -Thread[RxNewThreadScheduler-3,5,main]
 --------------⑥-------------
 onNext:0 -RxComputationScheduler-3
 */

从上面的输出结果中,我们大概知道了下面几点:

①. RxJava中已经封装了多种调度器,不同的调度器可以指定在不同的线程中执行和观察

②. create创建的Observable默认在当前线程(主线程)中执行任务流,并在当前线程观察

③. interval创建的Observable会在一个叫Computation的线程中执行任务流和观察任务流

④. 除了observeOn和subscribeOn ,使用其他创建或者变换操作符也有可能造成线程的切换

2. subscribeOn()原理

??subscribeOn()用来指定Observable在哪个线程中执行事件流,也就是指定ObservableOnSubscribe(计划表)的call方法在那个线程发射数据。下面通过源码分析subscribeOn是怎样实现线程的切换的。

下面看看subscribeOn方法:

public final Observable<T> subscribeOn(Scheduler scheduler) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return create(new OperatorSubscribeOn<T>(this, scheduler));
}

我们看到他创建了一个新的Observable,并为新的Observable创建了新的计划表OperatorSubscribeOn对象,新的计划表保存了原始Observable对象和调度器scheduler。接着我们看看OperatorSubscribeOn

public final class OperatorSubscribeOn<T> implements Observable.OnSubscribe<T> {

    final Scheduler scheduler;   //调度器
    final Observable<T> source;  //原始Observable
    //①.原始观察者订阅了新的Observable后,将执行此call方法
    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Scheduler.Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        //②. call方法中使用传入的调度器创建的Worker对象的schedule方法切换线程
          inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();
                //③ .创建了一个新的观察者
                    Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        //⑤. 新的观察者收到数据后直接发送给原始观察者
                        subscriber.onNext(t);
                    }
                    ...
                };
                //④. 在切换的线程中,新的观察者订阅原始Observable,用来接收数据
                source.unsafeSubscribe(s);
            }
        });
    }
}

??上面源码中注释已经写的很清楚了,OperatorSubscribeOn其实就是一个普通的任务表,用于为新的Observable发射数据,只是不是真正的发射,它创建了一个新的观察者订阅原始Observable,这样就可以接受原始Observable发射的数据,然后直接发送给原始观察者。

??在call方法中通过scheduler.createWorker().schedule()完成线程的切换,这里就牵扯到两个对象了,SchedulerWorker,不要着急,一个个的看,先看Scheduler,由于RxJava中有多种调度器,我们就看一个简单的Schedulers.newThread(),其他调度器的思路是一样的,下面一步一步看源码:

public final class Schedulers {
    //各种调度器对象
    private final Scheduler computationScheduler;
    private final Scheduler ioScheduler;
    private final Scheduler newThreadScheduler;
    //单例,Schedulers被加载的时候,上面的各种调度器对象已经初始化
    private static final Schedulers INSTANCE = new Schedulers();
    //构造方法
    private Schedulers() {
        RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook();
        ...
        Scheduler nt = hook.getNewThreadScheduler();
        if (nt != null) {
            newThreadScheduler = nt;
        } else {
            //①.创建newThreadScheduler对象
            newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler();
        }
    }
    //②. 获取NewThreadScheduler对象
    public static Scheduler newThread() {
        return INSTANCE.newThreadScheduler;
    }
    ...
}

Schedulers中保存了多种调度器对象,在Schedulers被加载的时候,他们就被初始化了,Schedulers就像是一个调度器的管理器,接着跟踪RxJavaSchedulersHook.createNewScheduler(),最终会找到一个叫NewThreadScheduler的类:

public final class NewThreadScheduler extends Scheduler {
    private final ThreadFactory threadFactory;
    public NewThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);
    }
}

NewThreadScheduler就是我们调用subscribeOn(Schedulers.newThread() )传入的调度器对象,每个调度器对象都有一个createWorker方法用于创建一个Worker对象,而NewThreadScheduler对应创建的Worker是一个叫NewThreadWorker的对象,在新产生的OperatorSubscribeOn计划表中就是通过NewThreadWorker.schedule(Action0)实现线程的切换,下面我们跟踪schedule(Action0)方法:

public class NewThreadWorker extends Scheduler.Worker implements Subscription {
    private final ScheduledExecutorService executor;   //
    public NewThreadWorker(ThreadFactory threadFactory) {
        //创建一个线程池
        ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
        executor = exec;
    }
    @Override
    public Subscription schedule(final Action0 action) {
        return schedule(action, 0, null);
    }
    @Override
    public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
        return scheduleActual(action, delayTime, unit);
    }
    //重要:worker.schedule()最终调用的是这个方法
    public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
        //return action;
        Action0 decoratedAction = schedulersHook.onSchedule(action);
        //ScheduledAction就是一个Runnable对象,在run()方法中调用了Action0.call()
        ScheduledAction run = new ScheduledAction(decoratedAction);
        Future<?> f;
        if (delayTime <= 0) {
            f = executor.submit(run);   //将Runnable对象放入线程池中
        } else {
            f = executor.schedule(run, delayTime, unit);  //延迟执行
        }
        run.add(f);

        return run;
    }
    ...
}

我们发现OperatorSubscribeOn计划表中通过NewThreadWorker.schedule(Action0),将Action0放入到一个线程池中执行,这样就实现了线程的切换。

通过上面的分析,我们知道subscribeOn是怎样将任务表放入线程池中执行的,感觉还是有点绕,看下图:

????

多次subscribeOn()的情况

我们发现,每次使用subscribeOn都会产生一个新的Observable,并产生一个新的计划表OnSubscribe,目标Subscriber最后订阅的将是最后一次subscribeOn产生的新的Observable。在每个新的OnSubscribecall方法中都会有一个产生一个新的线程,在这个新线程中订阅上一级Observable,并创建一个新的Subscriber接受数据,最终原始Observable将在第一个新线程中发射数据,然后传送给给下一个新的观察者,直到传送到目标观察者,所以多次调用subscribeOn只有第一个起作用(这只是表面现象,其实每个subscribeOn都切换了线程,只是最终目标Observable是在第一个subscribeOn产生的线程中发射数据的)。看下图:

????

??多次subscribeOn()只有第一个会起作用,后面的只是在第一个的基础上在外面套了一层壳,就像下面的伪代码,最后执行是在第一个新线程中执行:

...
//第3个subscribeOn产生的新线程
new Thread(){
    @Override
    public void run() {
        Subscriber s1 = new Subscriber();
        //第2个subscribeOn产生的新线程
        new Thread(){
            @Override
            public void run() {
                Subscriber s2 = new Subscriber();
                //第1个subscribeOn产生的新线程
                new Thread(){
                    @Override
                    public void run() {
                        Subscriber<T> s3 = new Subscriber<T>(subscriber) {
                            @Override
                            public void onNext(T t) {
                                subscriber.onNext(t);
                            }
                            ...
                        };
                        //①. 最后一个新观察者订阅原始Observable
                        原始Observable.subscribe(s3);
                        //②. 原始Observable将在此线程中发射数据

                              //③. 最后一个新的观察者s3接受数据

                              //④. s3收到数据后,直接发送给s2,s2收到数据后传给s1,...最后目标观察者收到数据
                         }
                }.start();
            }
        }.start();
    }
}.start();

3. observeOn原理

??observeOn调用的是lift操作符,lift操作符在上一篇博客中讲过。lift操作符创建了一个代理的Observable,用于接收原始Observable发射的数据,然后在Operator中对数据做一些处理后传递给目标Subscriber

??observeOn一样创建了一个代理的Observable,并创建一个代理观察者接受上一级Observable的数据,代理观察者收到数据之后会开启一个线程,在新的线程中,调用下一级观察者的onNextonCompeteonError方法。

我们看看observeOn操作符的源码:

public final class OperatorObserveOn<T> implements Observable.Operator<T, T> {
    private final Scheduler scheduler;
    //创建代理观察者,用于接收上一级Observable发射的数据
    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        if (scheduler instanceof ImmediateScheduler) {
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
            parent.init();
            return parent;
        }
    }

    //代理观察者
    private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
        final Subscriber<? super T> child;
        final Scheduler.Worker recursiveScheduler;
        final NotificationLite<T> on;
        final Queue<Object> queue;
        //接受上一级Observable发射的数据
        @Override
        public void onNext(final T t) {
            if (isUnsubscribed() || finished) {
                return;
            }
            if (!queue.offer(on.next(t))) {
                onError(new MissingBackpressureException());
                return;
            }
            schedule();
        }
        @Override
        public void onCompleted() {
            ...
            schedule();
        }
        @Override
        public void onError(final Throwable e) {
            ...
            schedule();
        }
        //开启新线程处理数据
        protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                recursiveScheduler.schedule(this);
            }
        }
        // only execute this from schedule()
        //在新线程中将数据发送给目标观察者
        @Override
        public void call() {
            long missed = 1L;
            long currentEmission = emitted;
            final Queue<Object> q = this.queue;
            final Subscriber<? super T> localChild = this.child;
            final NotificationLite<T> localOn = this.on;
            for (;;) {
                while (requestAmount != currentEmission) {
                    ...
                    localChild.onNext(localOn.getValue(v));
                }
            }
        }
    }
}

可以发现,observeOn操作符对它后面的操作产生影响,比如下面一段代码:

Observable.just(100)
        .subscribeOn(Schedulers.computation())     //Computation线程中发射数据
        .map(integer -> {return "map1-"+integer;}) //Computation线程中接受数据
        .observeOn(Schedulers.io())          //②. 切换
        .map(integer -> {return "map2-"+integer;}) //io线程中接受数据,由②决定
        .observeOn(Schedulers.newThread())   //③. 切换
        .map(integer -> {return "map3-"+integer;}) //newThread线程中接受数据,由③决定
        .observeOn(AndroidSchedulers.mainThread()) //④. 切换
        .delay(1000, TimeUnit.MILLISECONDS)        //主线程中接受数据,由④决定
        .subscribe(str -> logThread(str, Thread.currentThread()));  //Computation线程中接受数据,由④决定

/*
    说明:最后目标观察者将在Computation线程中接受数据,这取决于delay操作符,
    delay操作符是在Computation线程中执行的,执行完后就会将数据发送给目标观察者。
    而他上面的observeOn将决定于delay产生的代理观察者在主线程中接受数据
 */

/*
 输出:
 onNext:map3-map2-map1-100 -RxComputationScheduler-3
 */

只要涉及到lift操作符,其实就是生成了一套代理的Subscriber(观察者)、Observable(被观察者)和OnSubscribe(计划表)。Observable最典型的特征就是链式调用,我们暂且将每一步操作称为一级。代理的OnSubscribe中的call方法就是让代理Subscriber订阅上一级Observable,直到订阅到原始Observable发射数据,代理Subscriber收到数据后,可能对数据做一些操作也有可能切换线程,然后将数据传送给下一级Subscriber,直到目标观察者接收到数据,目标观察者在那个线程接受数据取决于上一个Subscriber在哪一个线程调用目标观察者的方法。示意图如下:

????

4. 调度器的种类

RxJava中可用的调度器有下面几种:

调度器类型 效果
Schedulers.computation(?) 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.from(executor) 使用指定的Executor作为调度器
Schedulers.immediate(?) 在当前线程立即开始执行任务
Schedulers.io(?) 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io(?)默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.newThread(?) 为每个任务创建一个新线程
Schedulers.trampoline(?) 当其它排队的任务完成后,在当前线程排队开始执行

RxAndroid中新增了一个:

调度器类型 效果
AndroidSchedulers.mainThread(?) 主线程,UI线程,可以用于更新界面

5. 各种操作符的默认调度器

??在之前学习各种操作符的时候,都会介绍xx操作符默认在xxx调度器上执行,当时可能不太注意这是什么意思,下面总结了一些操作符默认的调度器:

操作符 调度器
buffer(timespan) computation
buffer(timespan,?count) computation
buffer(timespan,?timeshift) computation
debounce(timeout,?unit) computation
delay(delay,?unit) computation
delaySubscription(delay,?unit) computation
interval computation
repeat trampoline
replay(time,?unit) computation
replay(buffersize,?time,?unit) computation
replay(selector,?time,?unit) computation
replay(selector,?buffersize,?time,?unit) computation
retrytrampolinesample(period,?unit) computation
skip(time,?unit) computation
skipLast(time,?unit) computation
take(time,?unit) computation
takeLast(time,?unit) computation
takeLast(count,?time,?unit) computation
takeLastBuffer(time,?unit) computation
takeLastBuffer(count,?time,?unit) computation
throttleFirst computation
throttleLast computation
throttleWithTimeout computation
timeInterval immediate
timeout(timeoutSelector) immediate
timeout(firstTimeoutSelector,?timeoutSelector) immediate
timeout(timeoutSelector,?other) immediate
timeout(timeout,?timeUnit) computation
timeout(firstTimeoutSelector,?timeoutSelector,?other) immediate
timeout(timeout,?timeUnit,?other) computation
timer computation
timestamp immediate
window(timespan) computation
window(timespan,?count) computation
window(timespan,?timeshift) computation

源码下载:

https://github.com/openXu/RxJavaTest

时间: 2024-10-29 10:45:44

RxJava(11-线程调度Scheduler)的相关文章

Android异步框架RxJava 1.x系列(三) - 线程调度器Scheduler

前言 RxJava 事件的发出和消费都在同一个线程,基于同步的观察者模式.观察者模式的核心是后台处理,前台回调的异步机制.要实现异步,需要引入 RxJava 的另一个概念 - 线程调度器 Scheduler. 正文 在不指定线程的情况下,RxJava 遵循的是线程不变的原则.即在哪个线程调用 subscribe() 方法,就在哪个线程生产事件:在哪个线程生产事件,就在哪个线程消费事件.如果需要切换线程,就需要用到线程调度器 Scheduler. 1. 几种Scheduler介绍 在 RxJava

RxJava使用(三)Scheduler 线程控制

前言 在默认情况下,即在不指定线程的情况下,RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件:在哪个线程生产事件,就在哪个线程消费事件. 如果需要切换线程,就需要用到 Scheduler (调度器). Schedulers部分主要来自<给Android 开发者的 RxJava 详解> Schedulers介绍 在RxJava 中,Scheduler --调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程.R

理解 RxJava 的线程模型

来源:鸟窝, colobu.com/2016/07/25/understanding-rxjava-thread-model/ 如有好文章投稿,请点击 → 这里了解详情 ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET.JavaScript和C++,Rx近几年越来越

RxJava初探

1. 简史 ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET.JavaScript和C++,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网

RxJava从入门到放弃---关于RxJava-入门必看

RxJava 到底是什么 RxJava 好在哪 API 介绍和原理简析 1. 概念:扩展的观察者模式 观察者模式 RxJava 的观察者模式 2. 基本实现 1) 创建 Observer 2) 创建 Observable 3) Subscribe (订阅) 4) 场景示例 a. 打印字符串数组 b. 由 id 取得图片并显示 3. 线程控制 -- Scheduler (一) 1) Scheduler 的 API (一) 2) Scheduler 的原理 (一) 4. 变换 1) API 2) 变

给 Android 开发者的 RxJava 详解

作者:扔物线 前言 我从去年开始使用 RxJava ,到现在一年多了.今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使用的场景越来越多 .而最近这几个月,我也发现国内越来越多的人开始提及 RxJava .有人说『RxJava 真是太好用了』,有人说『RxJava 真是太难用了』,另外更多的人表示:我真的百度了也谷歌了,但我还是想问: RxJava 到底是什么? 鉴于 RxJava 目前这种既火爆又神秘的现状,而我又在一年的使用

RxJava 详解——简洁的异步操作(二)

上次说的两个例子,事件的发出和消费都是在同一个线程的.如果只用上面的方法,实现出来的只是一个同步的观察者模式.观察者模式本身的目的就是异步机制,因此异步对于 RxJava 是至关重要的.而要实现异步,则需要用到 RxJava 的另一个概念: Scheduler .本文就来介绍一下Scheduler的用法: 3. 线程控制 -- Scheduler (一) 默认情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件:在哪个线程生产事件,就在

78. Android之 RxJava 详解

转载:http://gank.io/post/560e15be2dca930e00da1083 前言 我从去年开始使用 RxJava ,到现在一年多了.今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使用的场景越来越多 .而最近这几个月,我也发现国内越来越多的人开始提及 RxJava .有人说『RxJava 真是太好用了』,有人说『RxJava 真是太难用了』,另外更多的人表示:我真的百度了也谷歌了,但我还是想问: RxJava

转:给 Android 开发者的 RxJava 详解

转自:  http://gank.io/post/560e15be2dca930e00da1083 评注:多图解析,但是我还是未看懂. 前言 我从去年开始使用 RxJava ,到现在一年多了.今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使用的场景越来越多 .而最近这几个月,我也发现国内越来越多的人开始提及 RxJava .有人说『RxJava 真是太好用了』,有人说『RxJava 真是太难用了』,另外更多的人表示:我真的百度

RxJava学习小结

什么是RxJava 1. 定义 RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences. RxJava是JVM的响应式扩展(ReactiveX),它是通过使用可观察的序列将异步和基于事件的程序组合起来的一个库. 2. 特点 (1)观察者模式 RxJava