Android进阶:五、RxJava2源码解析 2

上一篇文章Android进阶:四、RxJava2 源码解析 1里我们讲到Rxjava2 从创建一个事件到事件被观察的过程原理,这篇文章我们讲Rxjava2中链式调用的原理。本文不讲用法,仍然需要读者熟悉Rxjava基本的用法。

一.Rxjava2 的基本用法

Rxjava是解决异步问题的,它的链式调用让代码看起来非常流畅优。现在我们带上线程切换以及链式调用来看看。下面代码是示例:

 Observable
                .create(new ObservableOnSubscribe<String>() {

                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        e.onNext("a");
                    }
                })
                .subscribeOn(Schedulers.io())
                .unsubscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(String s) throws Exception {
                        return 1;
                    }
                })
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Object o) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

我们创建一个事件(观察者),想输出一个字符串 "a"。这个事件发生在IO线程,结束也在IO线程,事件的状态回调发生在主线程。示例的用法大家应该都能懂,我们主要讨论这个链式的原理流程。为什么这么说呢?因为这个链式跟一般的链式不太一样。

二.create方法

这个方法我们之前看过,返回一个ObservableCreate对象,ObservableCreate继承自Observable,里面的source存着我们创建的ObservableOnSubscribe匿名对象。

三.subscribeOn方法

这是Obserbvable的方法,先看源码:

  public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

代码结构跟create的差不多,在钩子函数里直接返回我们创建的对象ObservableSubscribeOn<T>(this, scheduler),并传入当前的Observable也就是ObservableCreate对象。所以我们看一下这个类的代码:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
}

这个类继承自AbstractObservableWithUpstream类,构造函数的参数是ObservableSource,所以这里我们需要介绍两个类:

  • ObservableSource
    ObservableSource是一个接口,所有的Observable都实现了这个接口,它里面只有:

    void subscribe(@NonNull Observer<? super T> observer);

    这一个方法。很明显这个方法是为了让Observer订阅Observable的,或者说为了Observable把事件状态传递给Observer的。

  • AbstractObservableWithUpstream
    这个类继承了Observbable
    abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> {
    
    protected final ObservableSource<T> source;
    
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }
    }

    从源码可以看出这个类有变量source,它在构造函数里传入值,存储ObservableSource对象。

    所以当我们调用Observable的subscribeOn方法的时候会创建一个ObservableSubscribeOn对象,并用变量source存储当前的Observable对象,然后返回ObservableSubscribeOn对象。

四.unsubscribeOn方法

 public final Observable<T> unsubscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableUnsubscribeOn<T>(this, scheduler));
    }

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

这个方法跟上面的方法是一个模子刻的。所以我们主要看ObservableUnsubscribeOn这个类就好。

public final class ObservableUnsubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    public ObservableUnsubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
}

这个类跟刚才的ObservableSubscribeOn也几乎一模一样,继承自AbstractObservableWithUpstream类,使用source存了当前Observable对象。而此时的Observbvable对象是上一个方法创建的对象,也就是ObservableSubscribeOn对象。

五.observeOn方法和map方法

由于这些方法的内容基本一样我就省略代码的解释。
observeOn方法是创建了ObservableObserveOn对象,并保存上一个方法创建的Observable。map方法是创建ObservableMap对象,并保存上一个方法创建的Observable
所以总结一下可知:链式调用这些方法的时候,都会创建一个相关的对象,然后用变量source存储上一个方法创建的Observable子类对象。

六.subscribe方法

上次文章讲到,这个方法内部会调用一个抽象方法,subscribeActual方法,作为真实的订阅。而这个方法的逻辑需要看子类如何实现。
而第一次调用该这个subscribe方法的对象是ObservableMap对象。所以我们看看它内部如何实现的。
ObservableMap的subscribeActual方法实现:

  public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

内部调用了source的subscribe方法。此时ObservableMap对象里存的source是上一个方法创建的observable,也就是ObservableObserveOn对象。所以我们要看看ObservableObserveOn是如何实现subscribeActual方法的:

protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

同理他最终也是调用了上一个Observable的subscribe。

于是我们知道当我们调用subscribe方法的时候,会递归式的调用source存储的上一个方法创建的Observable的subscribeActual方法,一直到ObsservableCreate的subscribeActual的方法,把事件状态传递给观察者。这个上一篇文章已经讲过。

七.总结

我们常见的普通的链式调用一般都会返回当前同一个对象。和普通的链式调用不同当我们调用Rxjava2的链式调用时,他们会返回自己对应的Observable子类对象,每个对象都不一样,然后在subscribeActual方法中递归式的调用每个对象的subscribeActual方法,完成一个链式的调用。

写在最后

在最后,我整理了一份资料,如果有需要学习的同学可以联系我免费分享出来的,而且我们为了感谢很多支持的学者,在腾讯课堂每晚上20点有免费的直播教学,需要的同学可以来学习学习
领取方式:交流群653583088

原文地址:https://blog.51cto.com/13983283/2367289

时间: 2024-09-30 04:21:07

Android进阶:五、RxJava2源码解析 2的相关文章

Android进阶:RxJava2 源码解析 1

本文适合使用过Rxjava2或者了解Rxjava2的基本用法的同学阅读 一.Rxjava是什么 Rxjava在GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的.基于事件的程序的库). 通俗来说,Rxjava是一个采用了观察者模式设计处理

五.jQuery源码解析之jQuery.extend(),jQuery.fn.extend()

给jQuery做过扩展或者制作过jQuery插件的人这两个方法东西可能不陌生.jQuery.extend([deep],target,object1,,object2...[objectN]) jQuery.fn.extend([deep],target,object1,,object2...[objectN])这两个属性都是用于合并两个或多个对象的属性到target对象.deep是布尔值,表示是否进行深度合并,默认是false,不执行深度合并.通过这种方式可以在jQuery或jQuery.fn

RxJava2 源码解析(二)

转载请标明出处: http://blog.csdn.net/zxt0601/article/details/61637439 本文出自:[张旭童的博客](http://blog.csdn.net/zxt0601) 概述 承接上一篇RxJava2 源码解析(一), 本系列我们的目的: 知道源头(Observable)是如何将数据发送出去的. 知道终点(Observer)是如何接收到数据的. 何时将源头和终点关联起来的 知道线程调度是怎么实现的 知道操作符是怎么实现的 本篇计划讲解一下4,5. Rx

十五.jQuery源码解析之Sizzle总体结构.htm

Sizzle是一款纯javascript实现的css选择器引擎,它具有完全独立,无库依赖;小;易于扩展和兼容性好等特点. W3C Selectors API规范定义了方法querySelector()和querySelectorAll(),但是IE6,7不支持这两个方法. 在Sizzele内部,如果浏览器支持方法querySelectorAll(),则调用该方法查找元素,如果不支持,则模拟该方法的行为. Sizzle支持几乎所有的css3选择器,并且会按照文档位置返回结果. 上面截取的只是Siz

【Android】IntentService &amp; HandlerThread源码解析

一.前言 在学习Service的时候,我们一定会知道IntentService:官方文档不止一次强调,Service本身是运行在主线程中的(详见:[Android]Service),而主线程中是不适合进行耗时任务的,因而官方文档叮嘱我们一定要在Service中另开线程进行耗时任务处理.IntentService正是为这个目的而诞生的一个优雅设计,让程序员不用再管理线程的开启和允许. 至于介绍HandlerThread,一方面是因为IntentService的实现中使用到了HandlerThrea

Android进阶:四、RxJava2 源码解析 1

本文适合使用过Rxjava2或者了解Rxjava2的基本用法的同学阅读 一.Rxjava是什么Rxjava在GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的.基于事件的程序的库). 通俗来说,Rxjava是一个采用了观察者模式设计处理异

Android LayoutInflater.from().inflate()源码解析

我们知道,在Activity#setContentView()中会调用PhoneWindow#setContentView().而在PhoneWindow#setContentView()中有这么一句mLayoutInflater.inflate(layoutResID, mContentParent).这行代码的作用是将我们的activity_main.xml填充到mContentParent中去.详见:setContentView源码解析.在写adapter的时候,也经常写mInflater

Android SVG动画PathView源码解析与使用教程(API 14)

使用的是一个第三方库android-pathview主要是一个自定义View--PathView,跟所有自定义View一样,重写了三个构造方法.并且最终调用三个参数的构造方法,在里面获取自定义属性. /** * Default constructor. * * @param context The Context of the application. */ public PathView(Context context) { this(context, null); } /** * Defau

Android 常用开源框架源码解析 系列 (十)Rxjava 异步框架

一.Rxjava的产生背景 一.进行耗时任务 传统解决办法: 传统手动开启子线程,听过接口回调的方式获取结果 传统解决办法的缺陷: 随着项目的深入.扩展.代码量的增大会产生回调之中套回调的,耦合度高度增加的不利场景.对代码维护和扩展是很严重的问题. RxJava本质上是一个异步操作库 优点: 使用简单的逻辑,处理复杂 ,困难的异步操作事件库;在一定程度上替代handler.AsyncTask等等 二.传统的观察者模式 使用场景 1.一个方面的操作依赖于另一个方面的状态变化 2.如果在更改一个对象