RxJava && Agera 从源码简要分析基本调用流程(1)

版权声明:本文由晋中望原创文章,转载请注明出处: 
文章原文链接:https://www.qcloud.com/community/article/123

来源:腾云阁 https://www.qcloud.com/community

相信很多做Android或是Java研发的同学对RxJava应该都早有耳闻了,尤其是在Android开发的圈子里,RxJava渐渐开始广为流行。同样有很多同学已经开始在自己的项目中使用RxJava。它能够帮助我们在处理异步事件时能够省去那些复杂而繁琐的代码,尤其是当某些场景逻辑中回调中嵌入回调时,使用RxJava依旧能够让我们的代码保持极高的可读性与简洁性。不仅如此,这种基于异步数据流概念的编程模式事实上同样也能广泛运用在移动端这种包括网络调用、用户触摸输入和系统弹框等在内的多种响应驱动的场景。那么现在,就让我们一起分析一下RxJava的响应流程吧。
(本文基于RxJava-1.1.3)

一.用法

首先来看一个简单的例子:

运行结果为:

从结果中我们不难看出整体的调用流程:

首先通过调用Observable.create()方法生成一个被观察者,紧接着在这里我们又调用了map()方法对原被观察者进行数据流的变换操作,生成一个新的被观察者(为何是新的被观察者后文会讲),最后调用subscribe()方法,传入我们的观察者,这里观察者订阅的则是调用map()之后生成的新被观察者。

在整个过程中我们会注意到三个主角:Observable、OnSubscribe、Subscriber,所有的操作都是围绕它们进行的。不难看出这里三个角色的分工:

  • Observable:被观察者的来源,亦或说是被观察者本身
  • OnSubscribe:用来通知观察者的不同行为
  • Subscriber:观察者,通过实现对应方法来产生具体的处理。

所以接下来我们以这三个角色为中心来分析具体的流程。

二.分析

1.订阅过程

首先我们进入Observable.create()看看:

这里调用构造函数生成了一个Observable对象并将传入的OnSubscribe赋给自己的成员变量onsubscribe,等等,这个hook是从哪里冒出来的?我们向上找:

RxJavaObservableExecutionHook这个抽象Proxy类默认对OnSubscribe对象不做任何处理,不过通过继承该类并重写onCreate()等方法我们可以对这些方法对应的时机做一些额外处理比如打Log或者一些数据收集方面的工作。

到目前最初始的被观察者已经生成了,我们再来看看观察者这边。我们知道通过调用observable.subscribe()方法传入一个观察者即构成了观察者与被观察者之间的订阅关系,那么这内部又是如何实现的呢?看代码:

这里我们略去部分无关代码看主要部分,subscribe.onStart()默认空实现我们暂且不用管它,对于传进来的subscriber要包装成SafeSubscriber,这个SafeSubscriber对原来的subscriber的一系列方法做了更完善的处理,包括:onError()onCompleted()只会有一个被执行;保证一旦onError()或者onCompleted()被执行,将不再能再执onNext()等情况。这里封装为SafeSubscriber之后,调用onSubscribe.call(),并将subscriber传入,这样就完成了一次订阅。

显而易见,Subscriber作为观察者,在订阅行为完成后,其具体行为在整个链式调用中起着至关重要的作用,我们来看看它内部的构成的主要部分:


每个Subscriber都持有一个SubscriptionList,这个list保存的是所有该观察者的订阅事件,同时Subscriber也对应实现了Subscription接口,当这个Subscriber取消订阅的时候会将持有事件列表中的所有Subscription取消订阅,并且从此不再接受任何订阅事件。同时,通过Producer可以去限定该Subscriber所接收的数据流的总量,这个限制量其实是加在Subscriber.onNext()方法上的,onComplete()onError()则不会受到其影响。因为是底层抽象类,onNext()onComplete()onError()统一不在这里处理。

2.变换过程

在收到Observable的消息之前我们有可能会对数据流进行处理,例如map()、flatMap()、deBounce()、buffer()等方法,本例中我们用了map()方法,它接收了原被观察者发射的数据并将通过该方法返回的结果作为新的数据发射出去,相当于做了一层中间转化:

我们接着看这个转化过程:

这里是通过一个lift()方法实现的,再查看其他的转化方法发现内部也都使用lift()实现的,看来这个lift()就是关键所在了,不过不急,我们先来看看这个OperationMap是什么:

OperationMap实现了Operator接口的call()方法,该方法接受外部传入的观察者,并将其作为参数构造出了一个新的观察者,我们不难发现o.onNext(transformer.call(t));这一句起了至关重要的作用,这里的接口transformer将泛型T转化为泛型R:

这样之后,再将转换后的数据传回至原观察者的onNext()方法,就完成了观察数据流的转化,但是你应该也注意到了,我们用来做转换的这个新的观察者并没有实现订阅被观察者的操作,这个订阅操作又是在哪里实现的呢?答案就是接下来的lift()

在这里我们新生成了一个Observable对象,在这个新对象的onSubscribe成员的call()方法中我们通过operator.call()拿到之前生成的未产生订阅的观察者st,之后将它作为参数传入一开始的onSubscribe.call()中,即完成了这个中间订阅的过程。
现在我们将整个流程梳理一下:

  • 一次map()变换
  • 根据Operator实例生成新的Subscriber
  • 通过lift()生成新的Observable
  • 原Subscriber订阅新的Observavble
  • 新的Observable中onSubscribe通知新Subscriber订阅原Observable
  • 新Subscriber将消息传给原Subscriber。

为了便于理解,这里借用一下扔物线的图:

以上就是一次map()变换的流程,事实上多次map()也是同样道理:最外层的目标Subscriber发生订阅行为后,onSubscribe.onNext()会逐层嵌套调用,直至初始Observable被最底层的Subscriber订阅,通过Operator的一层层变化将消息传到目标Subscriber。再次祭出扔物线的图:

至于其他的多种变化的实现流程也都很类似,借助于Operator的不同实现来达到变换数据流的目的。例如其中的flatMap(),它需要进行两次lift(),其中第二次是OperationMerge,将转换成的每一个Observable数据流通过InnerSubscriber这个纽带订阅后,在InnerSubscriber的onNext()中拿到R,再通过传入的parent(也就是原MergeSubscriber)将它们全部发射(emit)出去,由最外层我们传入的Subscriber统一接收,这样就完成了 T => Observable<R> => R 的转化:




除此之外,还有许多各式各样的操作符,如果它们还不能满足你的需要,你也可以通过实现Operator接口定制新的操作符。灵活运用它们往往能达到事半功倍的效果,比如通过使用sample()debounce()等操作符有效避免backpressure的需要等等,这里就不一一介绍了。

下篇将继续从"线程切换过程"开始分析

文章来源公众号:QQ空间终端开发团队(qzonemobiledev)

时间: 2024-10-21 20:01:34

RxJava && Agera 从源码简要分析基本调用流程(1)的相关文章

RxJava &amp;&amp; Agera 从源码简要分析基本调用流程(2)

版权声明:本文由晋中望原创文章,转载请注明出处: 文章原文链接:https://www.qcloud.com/community/article/124 来源:腾云阁 https://www.qcloud.com/community 接上篇RxJava && Agera 从源码简要分析基本调用流程(1)我们从"1.订阅过程"."2.变换过程"进行分析,下篇文章我们继续分析"3.线程切换过程" 3.线程切换过程 从上文中我们知道了R

uboot源码简要分析

uboot源码简要分析 一.uboot源码整体框架 源码解压以后,我们可以看到以下的文件和文件夹: cpu 与处理器相关的文件.每个子目录中都包括cpu.c和interrupt.c.start.S.u-boot.lds. cpu.c:初始化CPU.设置指令Cache和数据Cache等 interrupt.c:设置系统的各种中断和异常 start.S:是U-boot启动时执行的第一个文件,它主要做最早期的系统初始化,代码重定向和设置系统堆栈,为进入U-boot第二阶段的C程序奠定基础. u-boo

EventBus 3.0 源码简要分析

EvenBus 可以在不同模块间传递信息,减少接口的使用. 一.使用例子 <span style="font-size:18px;">public class MainActivity extends AppCompatActivity { @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.la

Android 5.0 Settings源码简要分析

概述: 先声明:本人工作快两年了,仍是菜鸟级别的,惭愧啊!以前遇到好多知识点都没有记录下来,感觉挺可惜的,现在有机会接触Android 源码.我们一个Android组的搞Setting,我觉得是得写得东西,毕竟才接触,现在只能看一段时间代码,就先记录下一些收获吧,说多了就是泪~本文主要针对L平台上Settings模块正常启动流程做一个简要分析,并试着分析一下Settings下面某选项的实现. Setting 简介 在之前的KK平台上Settings模块的第一个Activity名字为Setting

Xutils源码文件下载方法的调用流程

//我主要是好奇Xutils是在哪里回调onLoading(),找老半天没找到,好不容易找到就写下来吧 前言: 1.仅对主要代码行进行摘要,提供大致流程 2.为便于理解,本文变量名不同于源码变量名,而是类名的驼峰式写法.如源码中:WorkRunnable mWorker,在本文中为 workRunnable 3.需要配合看Xutils的源码,可以让你开Xutils源代码时减少一定的难度 代码主干: HttpFragment: //HttpFragment是Xutils自带例子中的一个类 http

Elasticsearch之client源码简要分析

问题 让我们带着问题去学习,效率会更高 1  es集群只配置一个节点,client是否能够自动发现集群中的所有节点?是如何发现的? 2  es client如何做到负载均衡? 3  一个es node挂掉之后,es client如何摘掉该节点? 4  es client node检测分为两种模式(SimpleNodeSampler和SniffNodesSampler),有什么不同? 核心类 TransportClient    es client对外API类 TransportClientNod

spring mvc 源码简要分析

关于web项目,运用比较多的是过滤器和拦截器 过滤器基于责任链设计模式 创建过滤器链 / Create the filter chain for this requestApplicationFilterChain filterChain = ApplicationFilterFactory.createFilterChain(request, wrapper, servlet); //调用 private void internalDoFilter(ServletRequest request

wifidog 源码初分析(4)-转

在上一篇<wifidog 源码处分析(3)>的流程结束后,接入设备的浏览器重定向至 路由器 上 wifidog 的 http 服务(端口 2060) /wifidog/auth 上(且携带了 认证服务器 为此接入设备分配的 token),本篇就是从 wifidog 接收到 /wifidog/auth 的访问后的 校验流程. - 根据<wifidog 源码初分析(2)>中描述的,在 wifidog 启动 http 服务前,注册了一个针对访问路径 /wifidog/auth 的回调,如

又是正版!Win下ffmpeg源码调试分析二(Step into ffmpeg from Opencv for bugs in debug mode with MSVC)

最近工作忙一直没时间写,但是看看网络上这方面的资源确实少,很多都是linux的(我更爱unix,哈哈),而且很多是直接引入上一篇文章的编译结果来做的.对于使用opencv但是又老是被ffmpeg库坑害的朋友们,可能又爱又恨,毕竟用它处理和分析视频是第一选择,不仅是因为俩者配合使用方便,而且ffmpeg几乎囊括了我所知道的所有解编码器,但是正是因为这个导致了一些bug很难定位,所以有必要考虑一下如何快速定位你的ffmpeg bug. sorry,废话多了.首先给个思路: 1.使opencv 的hi