Rxjava基础

  现在很多Android App的开发开始使用Rxjava,但是Rxjava以学习曲线陡峭著称,入门有些困难。经过一段时间的学习和使用,这里来介绍一下我对Rxjava的理解。

  说到Rxjava首先需要了解的两个东西,一个是Observable(被观察者,事件源)和 Subscriber(观察者,是 Observer的子类)。Observable发出一系列事件,Subscriber处理这些事件。首先来看一个基本的例子,我们如何创建并使用Observable。

Observable.create(new Observable.OnSubscribe<String>() {
	@Override public void call(Subscriber<? super String> subscriber) {
		subscriber.onNext("hello");
		}
	}).subscribe(new Subscriber<String>()
	{
		@Override public void onCompleted() {
		}
		@Override public void onError(Throwable e) {
		}
		@Override public void onNext(String s) {
			Log.d("rx-info", s);
		}
	});

  创建Observable的最基本的方法是通过Observable.create() 来进行,当有Subscriber通过Observable.subscribe() 方法进行订阅之后Observable就会发射事件,注意必须要有订阅者订阅才会发射事件。发射的方式是通过调用 Observable中的 OnSubsribe 类型的成员来实现(每个Observable有一个final OnSubscribe<T> onSubscribe 成员,该成员是一个接口,后面详细说),在 Onsubsribe类型成员中调用 call() 方法,注意,这个call方法的参数就是 Observable.subscribe() 方法传入的 Subsriber实例。总的一句话就是在Obsubscribe 的 call方法中执行订阅者(Subscriber)的三个方法 onNext(), onCompleted() 和 onError()。

  一开始就是一堆 Observable , Subscriber,subscribe() , OnSubscribe 估计看得头晕,因此我们需要先来对这些东西有一个了解。这里只列出一个帮助理解的大概。

public class Observable<T> {

	  final OnSubscribe<T> onSubscribe;
	  protected Observable(OnSubscribe<T> f) {
              this.onSubscribe = f;
          }

       public final static <T> Observable<T> create(OnSubscribe<T> f) {
            return new Observable<T>(hook.onCreate(f));
        }

       public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
            // cover for generics insanity
        }

	  public final Subscription subscribe(Subscriber<? super T> subscriber) {
              return Observable.subscribe(subscriber, this);
          }

       public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
        // cover for generics insanity
       }
}
public interface Action1<T> extends Action {
    void call(T t);
}
public interface Subscription {
    void unsubscribe();
    boolean isUnsubscribed();
}
public interface Observer<T> {
    void onCompleted();
    void onError(Throwable e);
    void onNext(T t);
}

public abstract class Subscriber<T> implements Observer<T>, Subscription {
	//...
}

  通过上面的代码帮助理清楚 Observable, Observer, Subscriber, OnSubsriber, subscribe() 之间的关系。这里额外提一下 Observable.subscribe() 方法有多个重载:

Subscription    subscribe()
Subscription    subscribe(Action1<? super  T> onNext)
Subscription    subscribe(Action1<? super  T> onNext, Action1< java.lang .Throwable> onError)
Subscription    subscribe(Action1<? super  T> onNext, Action1< java.lang .Throwable> onError, Action0 onComplete)
Subscription    subscribe(Observer<? super  T> observer)
Subscription    subscribe(Subscriber<? super  T> subscriber)

  其它的ActionX 和 FuncX 请大家自行去查阅定义。

  介绍了基本的创建Observable和 Observable是怎么发射事件的之后,来介绍一下Rxjava的Operator和Operator的原理。

  Rxjava的Operator常见的有map, flatMap, concat, merge之类的。这里就不介绍Operator的使用了,介绍一下其原理。介绍原理还是来看源码,以map为例。

  首先看一下使用map的例子:

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("hello");
    }
})
.map(new Func1<String, String>() {
    @Override
    public String call(String s) {
        return s + "word";
    }
})
.subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(String s) {
        Log.d("info-rx", s);
    }
});

  继续来看 map的定义:

    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }

  简单说一下Func1,其中的T表示传入的参数类型,R表示方法返回的参数类型。Operator的操作原理最核心的就是lift的实现。

    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                    Subscriber<? super T> st = hook.onLift(operator).call(o);
                    try {
                        // new Subscriber created and being subscribed with so ‘onStart‘ it
                        st.onStart();
                        onSubscribe.call(st);
                    } catch (Throwable e) {
                        // localized capture of errors rather than it skipping all operators
                        // and ending up in the try/catch of the subscribe method which then
                        // prevents onErrorResumeNext and other similar approaches to error handling
                        Exceptions.throwIfFatal(e);
                        st.onError(e);
                    }
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    // if the lift function failed all we can do is pass the error to the final Subscriber
                    // as we don‘t have the operator available to us
                    o.onError(e);
                }
            }
        });
    }

  lift方法看起来太过复杂,稍作简化:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(...);
}

  lift方法实际是产生一个新的 Observable。在map()调用之后,我们操作的就是新的Observable对象,我们可以把它取名为Observable$2,我们这里调用subscribe就是Observable$2.subscribe,继续看到subscribe里,重要的几个调用:

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);

  注意,这里的observable是Observable$2!!也就是说,这里的onSubscribe是,lift中定义的!!

  回过头来看lift方法中创建新Observable的过程:

return new Observable<R>(new OnSubscribe<R>() {
    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so ‘onStart‘ it
                st.onStart();
                onSubscribe.call(st); //请注意我!! 这个onSubscribe是原始的OnSubScribe对象!!
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                if (e instanceof OnErrorNotImplementedException) {
                    throw (OnErrorNotImplementedException) e;
                }
                st.onError(e);
            }
        } catch (Throwable e) {
            if (e instanceof OnErrorNotImplementedException) {
                throw (OnErrorNotImplementedException) e;
            }
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don‘t have the operator available to us
            o.onError(e);
        }
    }
});

  一定一定要注意这段函数执行的上下文!,这段函数中的onSubscribe对象指向的是外部类,也就是第一个Observable的onSubScribe!而不是Observable$2中的onSubscribe,接下来看:

Subscriber<? super T> st = hook.onLift(operator).call(o);

  这行代码,就是定义operator,生成一个经过operator操作过的Subscriber,看下OperatorMap这个类中的call方法: 

@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
    return new Subscriber<T>(o) {

        @Override
        public void onCompleted() {
            o.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            o.onError(e);
        }

        @Override
        public void onNext(T t) {
            try {
                o.onNext(transformer.call(t));
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(OnErrorThrowable.addValueAsLastCause(e, t));
            }
        }

    };
}

  没错,对传入的Subscriber做了一个代理,把转换后的值传入。这样就生成了一个代理的Subscriber,最后我们最外层的OnSubscribe对象对我们代理的Subscriber进行了调用: 

 @Override
public void call(Subscriber<? super String> subscriber) {
    //此处的subscriber就是被map包裹(wrapper)后的对象。
    subscriber.onNext("hello");
}

  然后这个subscriber传入到内部,链式的通知,最后通知到我们在subscribe函数中定义的对象。

  分析lift的原理,其实还是回到了一开始介绍的Observable必须要有订阅者进行订阅才能发射事件。lift方法会产生一个新的Observable,并且这个Observable位于原始Observable和后面的Subsriber之间,因此lift方法也需要提供一个新的Subscriber来使得新产生的Observable发射事件,这个新的Subsriber就是对事件链后方的Subscriber就行包装做一个代理。

  详细使用Rxjava可参见:

  1. 给 Android 开发者的 RxJava 详解

  2.Rxjava使用基础合集

时间: 2024-08-06 11:51:25

Rxjava基础的相关文章

RxJava开发精要3-向响应式世界问好

原文出自<RxJava Essentials> 原文作者 : Ivan Morgillo 译文出自 : 开发技术前线 www.devtf.cn 转载声明: 本译文已授权开发人员头条享有独家转载权,未经同意.不得转载! 译者 : yuxingxin 项目地址 : RxJava-Essentials-CN 向响应式世界问好 在上一章中,我们对观察者模式有个理论上的高速概述.我们也看了从头開始.从列表.或者从已经存在的函数来创建Observables. 在本章中.我们将用我们学到的来创建我们第一个响

我们为什么要在Android中使用RxJava

本文翻译来自–>Why should we use RxJava on Android 另外: 微凉一季 再另外: 微凉一季 感觉RxJava近期风生水起,不学习一下都不好意思了.洒家也是初学RxJava,也是感觉代码好像更复杂更难懂了.看了一篇外文感同身受,简单翻译一下.本文简介使用RxJava优势所在.但可能须要有一点RxJava基础,推荐先看一下抛物线的那篇经典的匠心写作. -–华丽切割线.译文開始--- Reactive Extensions (Rx) 是一系列接口和方法.为开发人员提供

当Kotlin遇见RxJava多数据源

温馨提醒 阅读本文最好有Kotlin基础,若没有基础,可参考之前文章Kotlin初探,使用Kotlin优雅的开发Android应用,以及RxJava基础(本文基于RxJava2),当然我也会尽可能详细解释让你顺利阅读本文. 源码传送门 写在前面 最近几天回过头,看了之前的总结RxJava操作符系列,感觉对Rxjava多数据源的处理不是很理解,所以在总结学习一波.大家都知道,最近Kotlin语言一直占据热搜榜,褒贬不一,但我想说,不管有什么想法都要抛在脑后,毕竟Google爸爸出手,你不情愿也要跟

RxJava源码浅析

Create 创建一个Observable比较简单,最基础的方法是调用Observable的create方法进行创建,贴一下示例: Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { //执行想要的操作 } }); 它的源码实现也比较简单,在create的时候创建出一个Observa

RxAndroid和RxJava的资料分享

最近ReactiveX编程模型开始慢慢发展起来了,对于 Android 开发者来说, RxJava 是一个很难上手的库,因为它对于 Android 开发者来说有太多陌生的概念了,可是它真的很牛.RxAndroid的核心是异步,提供了可维护行好的链式语法和各种线程之间的方便切换,对于大多数Android开发者来说,使用RxAndroid写的代码初看应该是很迷茫的,但熟悉了流程就好了,RxAndroid对于复杂的异步编程场景还是能显著提高代码可读性的,就像javascript 的Promise编程模

Retrofit2.0+RxJava+MVP+Bmob的使用

本篇来总结一下学过的关于android方面的东西.梳理一下知识: 1.Retrofit2.0 Retrofit 是一个Square开发的类型安全的REST安卓客户端请求库.这个库为网络认证.API请求以及用OkHttp发送网络请求提供了强大的框架 . 2.RxJava/RxAndroid RxJava是一款响应式变成框架.RxAndroid在RxJava基础之上扩展了android线程调度.RxJava基本组成部分是Observables和Subscribers(事实上Observer才是最小的

Android基础搜集

1.来自知乎的网友总结的基础 原文:http://diycode.cc/topics/72 ServiceManager.ActivityManager.packageManager .*****Manager 都弄懂了? Binder 也搞清楚了? IPC 也弄明白了? FrameWork 层的每个类都折腾了? Hook 会玩了? 各种 SystemService 也知道怎么运行的了? View 的渲染你明白是怎么回事了? Intent 是如何实现 Activity.Service 等之间的解

Android 通用流行框架大全

1. 缓存 DiskLruCache    Java实现基于LRU的磁盘缓存 2.图片加载 Android Universal Image Loader 一个强大的加载,缓存,展示图片的库 Picasso一个强大的图片下载与缓存的库 Fresco  一个用于管理图像和他们使用的内存的库 Glide   一个图片加载和缓存的库 3. 图片处理 Picasso-transformations 一个为Picasso提供多种图片变换的库 Glide-transformations   一个为Glide提

Android通用流行框架大全

1. 缓存 名称 描述 DiskLruCache Java实现基于LRU的磁盘缓存 2.图片加载 名称 描述 Android Universal Image Loader 一个强大的加载,缓存,展示图片的库 Picasso 一个强大的图片下载与缓存的库 Fresco 一个用于管理图像和他们使用的内存的库 Glide 一个图片加载和缓存的库 3. 图片处理 名称 描述 Picasso-transformations 一个为Picasso提供多种图片变换的库 Glide-transformation