RxJava中的doOnSubscribe默认运行线程分析

假设你对RxJava1.x还不是了解,能够參考以下文章。

1. RxJava使用介绍 【视频教程】

2. RxJava操作符

? Creating Observables(Observable的创建操作符) 【视频教程】

? Transforming Observables(Observable的转换操作符) 【视频教程】

? Filtering Observables(Observable的过滤操作符) 【视频教程】

? Combining Observables(Observable的组合操作符) 【视频教程】

? Error Handling Operators(Observable的错误处理操作符) 【视频教程】

? Observable Utility Operators(Observable的辅助性操作符) 【视频教程】

? Conditional and Boolean Operators(Observable的条件和布尔操作符) 【视频教程】

? Mathematical and Aggregate Operators(Observable数学运算及聚合操作符) 【视频教程】

? 其它如observable.toList()、observable.connect()、observable.publish()等等。 【视频教程】

3. RxJava Observer与Subcriber的关系 【视频教程】

4. RxJava线程控制(Scheduler) 【视频教程】

5. RxJava 并发之数据流发射太快怎样办(背压(Backpressure)) 【视频教程】


前言

在有心课堂《RxJava之旅》中有学员留言:map和doOnSubscribe默认调度器是IO调度器,这里说错了吧?

以下我们分析下。

在前面讲 Subscriber 的时候,提到过 Subscriber 的 onStart() 能够用作流程開始前的初始化。然而 onStart() 由于在 subscribe() 发生时就被调用了,因此不能指定线程。而是仅仅能运行在 subscribe() 被调用时的线程。这就导致假设 onStart() 中含有对线程有要求的代码(比如在界面上显示一个 ProgressBar,这必须在主线程运行),将会有线程非法的风险,由于有时你无法预測 subscribe() 将会在什么线程运行。

而与 Subscriber.onStart() 相相应的。有一个方法 Observable.doOnSubscribe() 。

它和 Subscriber.onStart() 相同是在 subscribe() 调用后并且在事件发送前运行。但差别在于它能够指定线程。

默认情况下, doOnSubscribe() 运行在 subscribe() 发生的线程;而假设在 doOnSubscribe() 之后有 subscribeOn() 的话,它将运行在离它近期的 subscribeOn() 所指定的线程。

演示样例代码:

Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            progressBar.setVisibility(View.VISIBLE); // 须要在主线程运行
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

subscribeOn 作用于该操作符之前的 Observable 的创建操符作以及 doOnSubscribe 操作符 ,换句话说就是 doOnSubscribe 以及 Observable 的创建操作符总是被其之后近期的 subscribeOn 控制 。没看懂不要紧,看以下代码你就懂了。

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<?

super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onCompleted();
            }
        })
        .doOnSubscribe(new Action0() {

            @Override
            public void call() {
                System.out.println("00doOnSubscribe在线程" + Thread.currentThread().getName() + "中");
            }
        })
        .subscribeOn(Schedulers.newThread())
        .map(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                System.out.println("map1在线程" + Thread.currentThread().getName() + "中");
                return integer + "";
            }
        })
        .doOnSubscribe(new Action0() {

            @Override
            public void call() {
                System.out.println("11doOnSubscribe在线程" + Thread.currentThread().getName() + "中");
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                System.out.println("map2在线程" + Thread.currentThread().getName() + "中");

                return s + "1";
            }
        })
        .doOnSubscribe(new Action0() {

            @Override
            public void call() {
                System.out.println("22doOnSubscribe在线程" + Thread.currentThread().getName() + "中");
            }
        })
        .subscribeOn(Schedulers.newThread())
        .subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext在线程" + Thread.currentThread().getName() + "中");
            }
        });

运行结果例如以下:

22doOnSubscribe在线程RxNewThreadScheduler-1中
11doOnSubscribe在线程RxIoScheduler-3中
00doOnSubscribe在线程RxNewThreadScheduler-2中
map1在线程RxNewThreadScheduler-2中
map2在线程RxIoScheduler-2中
onNext在线程RxIoScheduler-2中

依据代码和运行结果我总结例如以下:

  1. doOnSubscribe()与onStart()相似,均在代码调用时就会回调。但doOnSubscribe()能够通过subscribeOn()操作符改变运行的线程且越在后面运行越早;
  2. doOnSubscribe()后面紧跟subscribeOn(),那么doOnSubscribe()将于subscribeOn()指定的线程保持一致。假设doOnSubscribe()在subscribeOn()之后,他的运行线程得再看情况分析;
  3. doOnSubscribe()假设在observeOn()后(注意:observeon()后没有紧接着再调用subcribeon()方法)。那么doOnSubscribe的运行线程就是main线程,与observeon()指定的线程没有关系。
  4. 假设在observeOn()之前没有调用过subcribeOn()方法,observeOn()之后subscribe面()方法之前调用subcribeOn()方法,那么他会改变整个代码流程中全部调用doOnSubscribe()方法所在的线程。同一时候也会改变observeOn()方法之前全部操作符所在的线程(有个重要前提:不满足第2点的条件,也就是doOnSubscribe()后面没有调用subscribeOn()方法)。
  5. 假设在observeOn()前后都没有调用过subcribeOn()方法,那么整个代码流程中的doOnSubscribe()运行在main线程,与observeOn()指定的线程无关。同一时候observeOn()之前的操作符也将运行在main线程,observeOn()之后的操作符与observeOn()指定的线程保持一致。

今天就分析到这里,假设有问题请大家反馈交流。

时间: 2024-08-28 21:41:28

RxJava中的doOnSubscribe默认运行线程分析的相关文章

RxJava中的doOnSubscribe操作符默认执行线程分析

前言 在有心课堂<RxJava之旅>中有学员留言:map和doOnSubscribe默认调度器是IO调度器,这里说错了吧? 下面我们分析下. 在前面讲 Subscriber 的时候,提到过 Subscriber 的 onStart() 可以用作流程开始前的初始化.然而 onStart() 由于在 subscribe() 发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe() 被调用时的线程.这就导致如果 onStart() 中含有对线程有要求的代码(例如在界面上显示一个 P

关于Ubuntu修改默认运行级别

第一种方法:(内核级别的) Sudo vi /etc/default/grub 修改GRUB_CMDLINE_LINUX_DEFAULT=”quiet splash” 为:GRUB_CMDLINE_LINUX_DEFAULT=” text” 然后运行下sudo update-grub2就可了 第二种方法:(基于upstart的”正统”方法) ubuntu的upstart的体系下,/etc/init.d里面基本上都是连接到/lib/init/upstart-job的软连接,所以,对 initscr

关于Android 3.0以后AsyncTask默认单一线程的分析

在Android里需要大量后台操作的情况下,经常会使用到AsyncTask这个类,比如说加载网络图片,访问服务器的接口,一般的使用情境就是实例化一个AsyncTask的对象mTask,复写AsyncTask的抽象方法doinBackgroud等等,最后执行task.execute(params),然后就可以在UI线程上方便的取得后台线程的执行结果: AsyncTask执行中最终触发的是把任务交给线池THREAD_POOL_EXECUTOR来执行,提交的任务并行的在线程池中运行,但这些规则在3.0

android 不能在子线程中更新ui的讨论和分析

问题描述 做过android开发基本都遇见过ViewRootImpl$CalledFromWrongThreadException,上网一查,得到结果基本都是只能在主线程中更改ui,子线程要修改ui只能post到主线程或者使用handler之类.但是仔细看看exception的描述并不是这样的,"Only the original thread that created a view hierarchy can touch its views",只有创建该 view 布局层次的原始线程

SEAndroid安全机制中的进程安全上下文关联分析

前面一篇文章分析了文件安全上下文关联过程.但是在SEAndroid中,除了要给文件关联安全上下文外,还需要给进程关联安全上下文,因为只有当进程和文件都关联安全上下文之后,SEAndroid安全策略才能发挥作用.也就是说,当一个进程试图访问一个文件时,SEAndroid会将进程和文件的安全上下文提取出来,根据安全策略规则,决定是否允许访问.本文就详细分析SEAndroid的进程安全上下文的关联过程. 老罗的新浪微博:http://weibo.com/shengyangluo,欢迎关注! 在传统的L

大约Android 3.0后AsyncTask默认的单线程分析

在Android下了很大的后台操作在需要的情况下.通常用于AsyncTask这个类.比方说,网络负载形象.访问server接口.一般的情况是使用一个的一例AsyncTask对象mTask,复制AsyncTask抽象方法doinBackgroud等等,最后运行task.execute(params),然后就能够在UI线程上方便的取得后台线程的运行结果: AsyncTask运行中终于触发的是把任务交给线池THREAD_POOL_EXECUTOR来运行,提交的任务并行的在线程池中运行.但这些规则在3.

jvm运行时分析

官方手册: http://docs.oracle.com/javase/7/docs/     ----> http://docs.oracle.com/javase/7/docs/technotes/tools/solaris/java.html   java命令的各种选项的说明 参考书籍: <深入理解Java虚拟机:JVM高级特性与最佳实践(第2版)> 首先说下JVM的内存堆结构,看下图: 主要由 方法区Permanent Generation + 新生代Eden + 新生代幸存区S

RxJava使用(三)Scheduler 线程控制

前言 在默认情况下,即在不指定线程的情况下,RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件:在哪个线程生产事件,就在哪个线程消费事件. 如果需要切换线程,就需要用到 Scheduler (调度器). Schedulers部分主要来自<给Android 开发者的 RxJava 详解> Schedulers介绍 在RxJava 中,Scheduler --调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程.R

Netty中NioEventLoopGroup的创建源码分析

NioEventLoopGroup的无参构造: 1 public NioEventLoopGroup() { 2 this(0); 3 } 调用了单参的构造: 1 public NioEventLoopGroup(int nThreads) { 2 this(nThreads, (Executor)null); 3 } 继续看到双参构造: 1 public NioEventLoopGroup(int nThreads, Executor executor) { 2 this(nThreads,