rxjava源码解析:操作符subscribeOn

1.subscribe流程

先看一个简单的例子:

//标记为Observable1
Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("hello world!");
            subscriber.onCompleted();
        }
    })
    .subscribeOn(Schedulers.newThread())

    //Subscriber标记为Subscriber1
    .subscribe(new Subscriber<String>() {
        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(String s) {

        }
    });

subscribeOn的流程如下:

  1. 首先会根据原来的Observable1生成一个新的Observable<Observable<String>>我们命名为Observable2
  2. 然后调用Observable2.lift(OperatorSubscribeOn);
  3. 返回调用lift之后生成的新的Observable3. subscribeOn过程执行完毕

subscribe()过程跟之前分析的的一样。 注意:

  1. Observable<Observable<String>>2中onSubscribe中的call()方法,返回的是Observable1
  2. OperatorSubscribeOn中生成的Subscriber2对象负责把Observable<Observable<String>>2发射的Observable1跟Subsriber1关联调用

现在分析OperatorSubscribeOn生成的Subscriber2

  1. Subscriber2中调用scheduler创建不同的调度器的worker
  2. worker调用schedule()去执行Observable1的subscribe()
  3. Observable1的subscribe()方法中的Subscriber<String>调用了Subscriber1<String>中的onNext() onCompleted()等。

完毕

时间: 2024-12-27 23:15:58

rxjava源码解析:操作符subscribeOn的相关文章

浅谈RxJava源码解析(观察者),创建(create、from、just),变换(Map、flatMap)、线程调度

一.创建操作: 1.观察者模式:RxJava的世界里,我们有四种角色: Observable<T>(被观察者).Observer(观察者) Subscriber(订阅者).Subject Observable和Subject是两个"生产"实体,Observer和Subscriber是两个"消费"实体.Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Ob

Android进阶:四、RxJava2 源码解析 1

本文适合使用过Rxjava2或者了解Rxjava2的基本用法的同学阅读 一.Rxjava是什么Rxjava在GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的.基于事件的程序的库). 通俗来说,Rxjava是一个采用了观察者模式设计处理异

Android进阶:RxJava2 源码解析 1

本文适合使用过Rxjava2或者了解Rxjava2的基本用法的同学阅读 一.Rxjava是什么 Rxjava在GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的.基于事件的程序的库). 通俗来说,Rxjava是一个采用了观察者模式设计处理

RxJava2 源码解析(二)

转载请标明出处: http://blog.csdn.net/zxt0601/article/details/61637439 本文出自:[张旭童的博客](http://blog.csdn.net/zxt0601) 概述 承接上一篇RxJava2 源码解析(一), 本系列我们的目的: 知道源头(Observable)是如何将数据发送出去的. 知道终点(Observer)是如何接收到数据的. 何时将源头和终点关联起来的 知道线程调度是怎么实现的 知道操作符是怎么实现的 本篇计划讲解一下4,5. Rx

Retrofit源码解析

square公司开源了一系列的优秀库,比如Retrofit,OkHttp,Picasso等, 前面简单分析了Picasso的源码,这里来分析下Retrofit的使用: 一.gradle添加依赖 compile 'com.squareup.okhttp:okhttp:2.4.0' compile 'com.squareup.okhttp:okhttp-urlconnection:2.4.0' compile 'com.squareup.okio:okio:1.5.0' compile 'com.g

Android进阶:五、RxJava2源码解析 2

上一篇文章Android进阶:四.RxJava2 源码解析 1里我们讲到Rxjava2 从创建一个事件到事件被观察的过程原理,这篇文章我们讲Rxjava2中链式调用的原理.本文不讲用法,仍然需要读者熟悉Rxjava基本的用法. 一.Rxjava2 的基本用法 Rxjava是解决异步问题的,它的链式调用让代码看起来非常流畅优.现在我们带上线程切换以及链式调用来看看.下面代码是示例: Observable .create(new ObservableOnSubscribe<String>() {

Java生鲜电商平台-SpringCloud微服务架构中网络请求性能优化与源码解析

Java生鲜电商平台-SpringCloud微服务架构中网络请求性能优化与源码解析 说明:Java生鲜电商平台中,由于服务进行了拆分,很多的业务服务导致了请求的网络延迟与性能消耗,对应的这些问题,我们应该如何进行网络请求的优化与处理呢? 到底有没有一些好的建议与方案呢? 下面这个文章将揭晓上面的问题,让你对SpringCloud微服务网络请求性能有一个全新的认识. 目录简介 01.网络请求异常分类 02.开发中注意问题 03.原始的处理方式 04.如何减少代码耦合性 05.异常统一处理步骤 06

underscore.js源码解析(二)

前几天我对underscore.js的整体结构做了分析,今天我将针对underscore封装的方法进行具体的分析,代码的一些解释都写在了注释里,那么废话不多说进入今天的正文. 没看过上一篇的可以猛戳这里:underscore.js源码解析(一) underscore.js源码GitHub地址: https://github.com/jashkenas/underscore/blob/master/underscore.js 本文解析的underscore.js版本是1.8.3 _.each 1

Java String源码解析

String类概要 所有的字符串字面量都属于String类,String对象创建后不可改变,因此可以缓存共享,StringBuilder,StringBuffer是可变的实现 String类提供了操作字符序列中单个字符的方法,比如有比较字符串,搜索字符串等 Java语言提供了对字符串连接运算符的特别支持(+),该符号也可用于将其他类型转换成字符串. 字符串的连接实际上是通过StringBuffer或者StringBuilder的append()方法来实现的 一般情况下,传递一个空参数在这类构造函