Android进阶:四、RxJava2 源码解析 1

本文适合使用过Rxjava2或者了解Rxjava2的基本用法的同学阅读

一.Rxjava是什么
Rxjava在GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。

通俗来说,Rxjava是一个采用了观察者模式设计处理异步的框架。链式调用设计让代码优雅易读。
举个例子:

   Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("a");
            }
        });

        observable.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

这是Rxjava2最简单的用法:
1.创建一个Observable,重写subscribe方法,这里主要处理被观察的事件。
2.订阅这个Observable,事件会回调observer的方法,我们可以对事件做响应的处理
二.Rxjava源码解析

2.1. 创建Observable:
创建Observable用的是Observable.create(ObservableOnSubscribe<T> source)方法。这个方法的参数是ObservableOnSubscribe:

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}

ObservableEmitter对接口Emitter进行扩展,增加了setDisposable、setCancellable等方法
基本参数了解了,现在看看create方法里面做了什么,代码如下:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

调用了RxJavaPlugins的onAssembly方法。又有一个新参数ObservableCreate<T>(source),我们看看它是什么:

final class ObservableCreate<T> extends Observable<T> {

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

}

继承了Observable,所以也是个被观察对象,在构造函数中我们看到我们new的ObservableOnSubscribe对象,被存在了ObservableCreate的source里面
那我们继续看看onAssembly方法做什么:

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

一个Hook方法。onObservableAssembly是一个静态变量,我们没有设置,默认为空,所以直接返回source对象。也就是说,Observable的create方法其实就是把我们ObservableOnSubscribe对象,存储在ObservableCreate对象的source里面,然后返回ObservableCreate对象。
我们知道ObservableCreate是继承Observable的,所以创建了ObservableCreate对象,我们的Observable也就创建完了。

2.2 订阅事件(被观察者)
订阅被观察者的操作是observable.subscribe(new Observer<String>())。这个操作符其实是个“被动”,就是事件被观察者观察。因为subscribe方法里的参数Observer才是观察者。我们也会在Observer里的各个会调方法里接收到事件相关的返回值。
我们看看subscribe方法的源码:

 public final void subscribe(Observer<? super T> observer) {
        try {
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            RxJavaPlugins.onError(e);
        }
    }

看代码我们知道最主要调用的方法是:subscribeActual(observer);,这个方法是Observable里的抽象方法,而此时我们的Observable是一个ObservableCreate对象(前面create方法返回的对象)。所以我们去看一下ObservableCreate里面是如何重写这个方法的。代码如下:

protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

我们一看到这个方法主要做了三件事:
①创建一个CreateEmitter对象parent。
②把parent传给source的subscribe方法。上面我们知道source就是刚才存的ObservableOnSubscribe对象,subscribe也就是我们重写的方法:

          @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("a");
            }

所以我们在这个方法里就能收到一个CreateEmmiter,通过CreateEmitter可以回调相应的方法。CreateEmitter是实现ObservableEmitter接口,我们看看它内部实现,如:onNext源码如下:

@Override
public void onNext(T t) {
    observer.onNext(t);
}

也就是说,当我们在ObservableOnSubscribe的subscribe方法里调用ObservableEmitter的onNext方法的时候,它里面会调用observer的onNext。于是通过这样的传递,我们就能在observer里响应的回调方法里收到事件的相关状态。

至此一个简单Rxjava流式传递原理已经讲完了,总结流程如下:

  • 使用Observbable.create方法,产生一个ObservableCreate对象,对象里存着ObservableOnSubscribe对象source。
  • 调用ObservableCreate.subscribe方法,实际调用的是subscribeActual方法,传入一个Observer对象。
  • subscribeActual方法中创建一个CreateEmmiter对象,调用source.subscribe方法,传入CreateEmmiter对象。
  • 于是我们在ObservableOnSubscribe中就接收到了一个CreateEmmiter,CreateEmmiter是ObservableEmmiter的子类。我们可以在这里调用CreateEmmiter的方法进行事件回调。
  • 调用CreateEmmiter方法,实际上会调用Observer的响应的方法。也就是CreateEmmiter把事件状态传递给观察者。

原文地址:https://blog.51cto.com/13983283/2366721

时间: 2024-11-05 20:48:25

Android进阶:四、RxJava2 源码解析 1的相关文章

Android进阶:RxJava2 源码解析 1

本文适合使用过Rxjava2或者了解Rxjava2的基本用法的同学阅读 一.Rxjava是什么 Rxjava在GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的.基于事件的程序的库). 通俗来说,Rxjava是一个采用了观察者模式设计处理

Android进阶:五、RxJava2源码解析 2

上一篇文章Android进阶:四.RxJava2 源码解析 1里我们讲到Rxjava2 从创建一个事件到事件被观察的过程原理,这篇文章我们讲Rxjava2中链式调用的原理.本文不讲用法,仍然需要读者熟悉Rxjava基本的用法. 一.Rxjava2 的基本用法 Rxjava是解决异步问题的,它的链式调用让代码看起来非常流畅优.现在我们带上线程切换以及链式调用来看看.下面代码是示例: Observable .create(new ObservableOnSubscribe<String>() {

【Android】IntentService &amp; HandlerThread源码解析

一.前言 在学习Service的时候,我们一定会知道IntentService:官方文档不止一次强调,Service本身是运行在主线程中的(详见:[Android]Service),而主线程中是不适合进行耗时任务的,因而官方文档叮嘱我们一定要在Service中另开线程进行耗时任务处理.IntentService正是为这个目的而诞生的一个优雅设计,让程序员不用再管理线程的开启和允许. 至于介绍HandlerThread,一方面是因为IntentService的实现中使用到了HandlerThrea

四.jQuery源码解析之jQuery.fn.init()的参数解析

从return new jQuery.fn.init( selector, context, rootjQuery )中可以看出 参数selector和context是来自我们在调用jQuery方法时传过来的.那么selector和context都有哪些可能. 对于表格中的4~9行中的可能做具体分析. 如果selector是字符串,则首先检测是html代码还是#id. 126行的if语句:以"<"开头,以">"结尾,且长度>=3.则假设额这个是HT

RxJava2 源码解析(二)

转载请标明出处: http://blog.csdn.net/zxt0601/article/details/61637439 本文出自:[张旭童的博客](http://blog.csdn.net/zxt0601) 概述 承接上一篇RxJava2 源码解析(一), 本系列我们的目的: 知道源头(Observable)是如何将数据发送出去的. 知道终点(Observer)是如何接收到数据的. 何时将源头和终点关联起来的 知道线程调度是怎么实现的 知道操作符是怎么实现的 本篇计划讲解一下4,5. Rx

Handler机制(四)---Handler源码解析

Handler的主要用途有两个:(1).在将来的某个时刻执行消息或一个runnable,(2)把消息发送到消息队列. 主要依靠post(Runnable).postAtTime(Runnable, long).postDelayed(Runnable, long).sendEmptyMessage(int).sendMessage(Message).sendMessageAtTime(Message).sendMessageDelayed(Message, long)这些方法来来完成消息调度.p

Android LayoutInflater.from().inflate()源码解析

我们知道,在Activity#setContentView()中会调用PhoneWindow#setContentView().而在PhoneWindow#setContentView()中有这么一句mLayoutInflater.inflate(layoutResID, mContentParent).这行代码的作用是将我们的activity_main.xml填充到mContentParent中去.详见:setContentView源码解析.在写adapter的时候,也经常写mInflater

Android SVG动画PathView源码解析与使用教程(API 14)

使用的是一个第三方库android-pathview主要是一个自定义View--PathView,跟所有自定义View一样,重写了三个构造方法.并且最终调用三个参数的构造方法,在里面获取自定义属性. /** * Default constructor. * * @param context The Context of the application. */ public PathView(Context context) { this(context, null); } /** * Defau

RxJava2 源码解析(一)

概述 最近事情太多了,现在公司内部的变动,自己岗位的变化,以及最近决定找工作.所以博客耽误了,准备面试中,打算看一看RxJava2的源码,遂有了这篇文章. 不会对RxJava2的源码逐字逐句的阅读,只寻找关键处,我们平时接触得到的那些代码. 背压实际中接触较少,故只分析了Observable. 分析的源码版本为:2.0.1 我们的目的: 知道源头(Observable)是如何将数据发送出去的. 知道终点(Observer)是如何接收到数据的. 何时将源头和终点关联起来的 知道线程调度是怎么实现的