快速理解RxJava源码的设计理念

前言

我在看过几篇关于RxJava源码分析的博客后,不知是我的水平有限还是源码过于博大精深,导致花了很长的时间才搞清楚其运行原理。我个人觉得应该有更好的办法来快速剖析理解,于是决定写下本文。

本文适合已经看过一些RxJava源码分析资料的同学,不过没看过也没关系。在看本文时可参考这篇博客:RxJava基本流程和lift源码分析,它说得比较全,在此感谢博主大头鬼Bruce。

一、初探RxJava

【以下摘录了RxJava基本流程和lift源码分析

我们先来看一段最基本的代码,分析这段代码在RxJava中是如何实现的。

Observable.OnSubscribe<String> onSubscriber1 = new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("1");
        subscriber.onCompleted();
    }
};
Subscriber<String> subscriber1 = new Subscriber<String>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(String s) {

    }
};

Observable.create(onSubscriber1)
        .subscribe(subscriber1);

首先我们来看一下Observable.create的代码:

public final static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(hook.onCreate(f));
}

protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
}

直接就是调用了Observable的构造函数来创建一个新的Observable对象,这个对象我们暂时标记为observable1,以便后面追溯。

同时,会将我们传入的OnSubscribe对象onSubscribe1保存在observable1的onSubscribe属性中,这个属性在后面的上下文中很重要,大家留心一下。

接下来我们来看看subscribe方法。

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    ...
    subscriber.onStart();
    hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
    return hook.onSubscribeReturn(subscriber);
}

可以看到,subscribe之后,就直接调用了observable1.onSubscribe.call方法,也就是我们代码中的onSubscribe1对象的call方法

,传入的参数就是我们代码中定义的subscriber1对象。call方法中所做的事情就是调用传入的subscriber1对象的onNext和onComplete方法。

这样就实现了观察者和被观察者之间的通讯。

【摘录结束】

好了,总结一下:

  • 处理数据是订阅类Subscriber的职责

入门结束,下面看看带操作符的情况。

二、带操作符的RxJava应用

【以下摘录了RxJava基本流程和lift源码分析

先展示Demo:

Observable.OnSubscribe<String> onSubscriber1 = new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("1");
        subscriber.onCompleted();
    }
};
Subscriber<Integer> subscriber1 = new Subscriber<Integer>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Integer i) {

    }
};
Func1<String, Integer> transformer1 = new Func1<String, Integer>() {
    @Override
    public Integer call(String s) {
        return Integer.parseInt(s);
    }
};

Observable.create(onSubscriber1)
        .map(transformer1)
        .subscribe(subscriber1);

和刚才不同的是我们在create之后调用了map方法,然后才调用subscribe方法。

【摘录结束】

再贴上RxJava的执行流程图:

我先根据这张图给出RxJava能这样运行的3条准则:

  1. 从create函数能向下执行到subscribe,靠的是返回Observable类的对象;
  2. 从subscribe函数能再向上执行到onSubscriber1,靠的是下面的Observable对象能够调用上面Observable对象的onSubscribe属性;
  3. 从上面能再次往下依次对数据进行处理,靠的是Subscriber责任链。

为什么我不先分析源码而先给准则呢?我发现如果在不知道准则(或者称作设计思路)情况下看源码就会“见树不见林”,而知道了准则就把握了框架脉络,剩下的只是到源码中找答案而已。既然本文标榜了“快速理解RxJava源码的设计理念”,那当然不能徒有虚名。下面我们到源码中找答案。

首先我们来证明准则1:

map方法的代码如下:

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return lift(new OperatorMap<T, R>(func));
}

map返回的是Observable,证明完毕。快吧,就这么快。在此将map返回的Observable对象标记为observable2。

再证明准则2:

对照demo中的代码,我们调用map之后,就调用了subscribe方法,也就是调用了这里的observable2的subscribe方法。

根据上面的介绍,调用subscribe之后,就会调用observable2.onSubscribe.call方法:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber<? super R> o) {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            st.onStart();
            onSubscribe.call(st);
        }
    });
}

其中,call函数的第一句与准则3的证明有关,暂时不看。第二句也不看(不好意思,我没看懂这句干什么的,不过不影响继续分析),我们看第三句:

onSubscribe.call(st);

这句很关键,准则2的证明靠他了。我们要搞明白onSubscribe是谁的属性。我将demo换个方式写出来:

...
Observable.create(onSubscriber1);
observable1.map(transformer1);
observable2.subscribe(subscriber1);

map函数是observable1的,那么lift函数也是observable1的,按Java语法规定内部类可以调用外部类的属性,所以onSubscribe是observable1的。这样就实现了observable2跨入到observable1的目的。证毕。

最后证明准则3:

说实话,写到这里我感觉还是有部分同学没明白怎么回事。不过没关系,我将会继续围绕讲述。

我们再来看看observable2.onSubscribe.call方法:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber<? super R> o) {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            st.onStart();
            onSubscribe.call(st);
        }
    });
}

前面说了,call函数的第一句与准则3的证明有关。我们先不看它做了什么,返回值给了st,st是个Subscriber对象。我们在第一节总结说了,处理数据是订阅类Subscriber的职责,因此st有处理数据的职责。再看第三句,将st给了observable1。问题来了,准则3想要运行,就必须有Subscriber责任链。“链”在哪里?不用猜了,我直接告诉你答案:往下“链”的代码在:

Subscriber<? super T> st = hook.onLift(operator).call(o);

我们看看OperatorMap的call方法:

@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
    return new Subscriber<T>(o) {
        @Override
        public void onNext(T t) {
            o.onNext(transformer.call(t));
        }
    };
}

入口参数o在Demo里就是subscriber1对象。我们看它的这句:

o.onNext(transformer.call(t));

按照语法规定,是先执行括号里的。我把它写得再直白一点:

临时数据=map(处理前的数据);//本步骤处理
subscriber1.onNext(临时数据);//提交给下一步处理

往上“链”的代码在:

onSubscribe.call(st);

总结一下:

在observable2这种操作符返回的被观察者中,OnSubscribe要做两件事:

  1. 在本级中生成订阅类Subscriber,在Subscriber中将本级的操作(map)与下一级操作连接起来(向下链接)
  2. 将本级中生成订阅类Subscriber交给上一级Observable,方便上一级向下链接

因此,在此的订阅类Subscriber的职责实际上有两个:

  1. 处理数据
  2. 连接下一个步骤的Subscriber

    再上一张图帮助消化:

后记:RxJava为什么这么搞

相信你对lift流程图挺有印象:

我看到后第一个感觉就是:有必要搞得这么复杂吗?幸好还记得设计模式中的装饰者模式和责任链模式,还能够大体上理解。不过老问题依然存在:它这么搞有必要吗?学习一个陌生的事物最好的办法就是拿一个熟悉的事物进行对比,回想一下软件在运行前会进行什么工作?答案是配置。在代码

Observable.create(onSubscriber1)
          .map(transformer1)
          .subscribe(subscriber1);

create也好,map也罢,都是配置。再问个问题,软件配置好了怎么运行?要么是点击软件,要么是命令启动。subscribe就是启动。好了,我们可以进行抽象了。无论是RxJava还是软件,其核心就是“配置+启动”。说了RxJava与软件的相同之处,它俩有什么不同吗?你看软件是先配置再启动对吧,没见过还要像”lift流程图”一样上串下跳吧。这就是RxJava的设计理念不同了。我当时在想,如果我来设计,我会先create一个Observable对象,然后将map这类的操作符放到Observable对象里的一个”操作队列”里面,这样配置就完成了。等需要启动时,调用subscribe启动”操作队列”处理数据,你看代码执行流程不是很简单吗。这种实现办法是通过数据结构达成的,优点就是源码执行流程清楚简单,缺点就是这个对象职责过多。而RxJava是通过责任链模式达成的,优点是职责分散,缺点就是源码执行起来上串下跳。不过RxJava在设计时已经为用户考虑到这点,实际上如果你不看源码只是使用RxJava,对你而言代码的执行就是一条线下来也很清晰。好了,就讲到这吧,希望对理解RxJava能有所帮助。

参考文档

RxJava基本流程和lift源码分析

RxJava源码初探

时间: 2024-07-29 00:46:13

快速理解RxJava源码的设计理念的相关文章

PLSA模型的再理解以及源码分析

PLSA模型的再理解以及源码分析 之前写过一篇PLSA的博文,其中的收获就是知道PLSA是LSA在概率层面的扩展,知道了PLSA是一种主题模型,知道了PLSA中的参数估计使用的是EM算法.当时我就认为,这样子经典好用的算法,我是会回头再来理解它的,这样子才会有更加深刻的心得.所以有了这篇PLSA模型的再理解. 1. 两种思路解PLSA模型 参考了很多资料,发现大体上有两种解决PLSA模型的思路.下面我们大致说一下它们的思路. 思路一:EM算法中需要更新两个概率 PLSA模型的示意图如下: 其中包

深入理解OkHttp源码(一)——提交请求

本篇文章主要介绍OkHttp执行同步和异步请求的大体流程.主要流程如下图: 主要分析到getResponseWidthInterceptorChain方法,该方法为具体的根据请求获取响应部分,留着后面的博客再介绍. Dispatcher类 Dispatcher类负责异步任务的请求策略.首先看它的部分定义: public final class Dispatcher { private int maxRequests = 64; private int maxRequestsPerHost = 5

深入理解ButterKnife源码并掌握原理(一)

前言 话说在android这座大山里,有一座庙(方块公司-square),庙里住着一个神-jake(我是这么叫的嘻嘻). 不要小看这个小jake,这个神可是为android应用开发们提供了强有力的帮助.比如流行的开源库okhttp,eventbus系列 ,retrofit,butterknife 等等都是出于他之手.小弟佩服的不要不要的-,可以说是为android的应用开发效率和耦合性提高了一个台阶啊. 其它的大神我也是佩服的不要不要的-嘻嘻 声明 这一系列的文章是对ButterKnife的源码

深入理解STL源码 系列文章

深入理解STL源码(1) 空间配置器(allocator) 深入理解STL源码(0) STL简介 深入理解STL源码(3.3) 序列式容器之deque和stack.queue 深入理解STL源码(3.2) 序列式容器之list 深入理解STL源码(3.1) 序列式容器之vector 深入理解STL源码(2) 迭代器(Iterators)和Traits 深入理解STL源码(4.3) 关联式容器之map和multimap 深入理解STL源码(4.2) 关联式容器之set和multiset 深入理解S

深入理解OkHttp源码(三)——网络操作

这篇博客侧重于了解OkHttp的网络部分,包括Socket的创建.连接,连接池等要点.OkHttp对Socket的流操作使用了Okio进行了封装,本篇博客不做介绍,想了解的朋友可以参考拆轮子系列:拆Okio. OkHttp中关于网络的几个概念 下面的主要翻译自OkHttp的官方文档,查看原文. URL URLs(比如https://github.com/square/okhttp)是HTTP和网络的基础,不止指定了Web上的资源,还指定了如何获取该资源. Address Address(比如gi

十分钟深入理解HashMap源码

十分钟就要深入理解HashMap源码,看完你能懂?我觉得得再多看一分钟,才能完全掌握! 终于来到比较复杂的HashMap,由于内部的变量,内部类,方法都比较多,没法像ArrayList那样直接平铺开来说,因此准备从几个具体的角度来切入. 桶结构 HashMap的每个存储位置,又叫做一个桶,当一个Key&Value进入map的时候,依据它的hash值分配一个桶来存储. 看一下桶的定义:table就是所谓的桶结构,说白了就是一个节点数组. transient Node<K,V>[] tab

浅谈RxJava源码解析(观察者),创建(create、from、just),变换(Map、flatMap)、线程调度

一.创建操作: 1.观察者模式:RxJava的世界里,我们有四种角色: Observable<T>(被观察者).Observer(观察者) Subscriber(订阅者).Subject Observable和Subject是两个"生产"实体,Observer和Subscriber是两个"消费"实体.Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Ob

RxJava源码初探

Demo分析 响应式编程的概念现在火的一塌糊涂,各种RxXXX库层出不穷,虽然这些库的实现语言各不相同,但是原理都是一样的.我的理解是这些库主要都包含三个东西:Observable, OnSubscribe, Subscriber.阅读本文的读者必须懂的这些概念,初学者建议看下RxJava专题 上的文章再来看本文.我们就从源码层级来分析一下这中间的事件流,线程切换是怎么个原理.这里交代下本文分析的RxJava的版本是1.1.0 先来看个简单的Demo实例 Observable.create(ne

RxJava源码浅析

Create 创建一个Observable比较简单,最基础的方法是调用Observable的create方法进行创建,贴一下示例: Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { //执行想要的操作 } }); 它的源码实现也比较简单,在create的时候创建出一个Observa