RxJava Lift

RxJava 不建议开发者自定义 Operator 来直接使用 lift(),而是建议尽量使用已有的 lift() 包装方法(如 map() flatMap() 等)进行组合来实现需求,因为直接使用 lift() 非常容易发生一些难以发现的错误。

例子

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("hello");
    }
})
.map(new Func1<String, String>() {
    @Override
    public String call(String s) {
        return s + "word";
    }
})
.subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(String s) {
        Log.d("rx", s);
    }
});

上面用了map操作符,map操作符是最常见和常用的,这里看一下他的源码:

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return lift(new OperatorMap<T, R>(func));
}

从上面代码可以看到调用了lift函数,然后把我们的转换器传入进去,看下它做了什么事。

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber<? super R> o) {
            try {
                Subscriber<? super T> st = hook.onLift(operator).call(o);
                try {
                    // new Subscriber created and being subscribed with so ‘onStart‘ it
                    st.onStart();
                    onSubscribe.call(st);
                } catch (Throwable e) {
                    // localized capture of errors rather than it skipping all operators
                    // and ending up in the try/catch of the subscribe method which then
                    // prevents onErrorResumeNext and other similar approaches to error handling
                    if (e instanceof OnErrorNotImplementedException) {
                        throw (OnErrorNotImplementedException) e;
                    }
                    st.onError(e);
                }
            } catch (Throwable e) {
                if (e instanceof OnErrorNotImplementedException) {
                    throw (OnErrorNotImplementedException) e;
                }
                // if the lift function failed all we can do is pass the error to the final Subscriber
                // as we don‘t have the operator available to us
                o.onError(e);
            }
        }
    });
}

简化一下

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(...);
}

返回了一个新的Observable对象,这才是重点! 这种链式调用看起来特别熟悉?有没有像javascript中的Promise/A,在then中返回一个Promise对象进行链式调用?

OK,那么我们要看下它是如何工作的啦。

在map()调用之后,我们操作的就是新的Observable对象,我们可以把它取名为Observable2,我们这里调用subscribe,完整的就是Observable2.subscribe,继续看到subscribe里,重要的几个调用:

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);

注意注意 ! 这里的observable是Observable$2!!也就是说,这里的onSubscribe是,lift中定义的!!

OK,我们追踪下去,回到lift的定义中。

return new Observable<R>(new OnSubscribe<R>() {
    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so ‘onStart‘ it
                st.onStart();
                onSubscribe.call(st); //请注意我!! 这个onSubscribe是原始的OnSubScribe对象!!
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                if (e instanceof OnErrorNotImplementedException) {
                    throw (OnErrorNotImplementedException) e;
                }
                st.onError(e);
            }
        } catch (Throwable e) {
            if (e instanceof OnErrorNotImplementedException) {
                throw (OnErrorNotImplementedException) e;
            }
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don‘t have the operator available to us
            o.onError(e);
        }
    }
});

一定一定要注意这段函数执行的上下文!,这段函数中的onSubscribe对象指向的是外部类,也就是第一个Observable的onSubScribe!而不是Observable$2中的onSubscribe,OK,谨记这一点之后,看看

Subscriber<? super T> st = hook.onLift(operator).call(o);

这行代码,就是定义operator,生成一个经过operator操作过的Subscriber,看下OperatorMap这个类中的call方法

@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
    return new Subscriber<T>(o) {

        @Override
        public void onCompleted() {
            o.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            o.onError(e);
        }

        @Override
        public void onNext(T t) {
            try {
                o.onNext(transformer.call(t));
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(OnErrorThrowable.addValueAsLastCause(e, t));
            }
        }

    };
}

没错,对传入的Subscriber做了一个代理,把转换后的值传入。

这样就生成了一个代理的Subscriber,

最后我们最外层的OnSubscribe对象对我们代理的Subscriber进行了调用。。

也就是

 @Override
public void call(Subscriber<? super String> subscriber) {
    //此处的subscriber就是被map包裹(wrapper)后的对象。
    subscriber.onNext("hello");
}

然后这个subscriber传入到内部,链式的通知,最后通知到我们在subscribe函数中定义的对象。

参考:

http://blog.csdn.net/axuanqq/article/details/50423977

https://segmentfault.com/a/1190000004049841

http://blog.csdn.net/lzyzsd/article/details/50110355

时间: 2024-08-29 13:35:03

RxJava Lift的相关文章

rxjava的lift和subscirbe过程

1.subscribe过程 2.lift过程

RxJava从入门到放弃---关于RxJava-入门必看

RxJava 到底是什么 RxJava 好在哪 API 介绍和原理简析 1. 概念:扩展的观察者模式 观察者模式 RxJava 的观察者模式 2. 基本实现 1) 创建 Observer 2) 创建 Observable 3) Subscribe (订阅) 4) 场景示例 a. 打印字符串数组 b. 由 id 取得图片并显示 3. 线程控制 -- Scheduler (一) 1) Scheduler 的 API (一) 2) Scheduler 的原理 (一) 4. 变换 1) API 2) 变

给 Android 开发者的 RxJava 详解

作者:扔物线 前言 我从去年开始使用 RxJava ,到现在一年多了.今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使用的场景越来越多 .而最近这几个月,我也发现国内越来越多的人开始提及 RxJava .有人说『RxJava 真是太好用了』,有人说『RxJava 真是太难用了』,另外更多的人表示:我真的百度了也谷歌了,但我还是想问: RxJava 到底是什么? 鉴于 RxJava 目前这种既火爆又神秘的现状,而我又在一年的使用

彻底搞清楚 RxJava 是什么东西

其实从rxjava14年出现到现在,我是去年从一个朋友那里听到的,特别是随着现在app项目越来越大,分层越来越不明确的情况下,rxjava出现了,以至于出现了rxandroid.其实如果你了解观察者模式的话,rxjava并没有你说的那么神秘.再次,我对rxjava并不崇拜,我的原则是怎么写代码简单,代码结构清晰,维护简单,就是好框架. 讲rxjava之前首先说一下Android mvp开发模式. MVP的工作流程 Presenter负责逻辑的处理, Model提供数据, View负责显示.  作

深入浅出RxJava就这一篇就够了

前言: 第一次接触RxJava是在前不久,一个新Android项目的启动,在评估时选择了RxJava.RxJava是一个基于事件订阅的异步执行的一个类库.听起来有点复杂,其实是要你使用过一次,就会大概明白它是怎么回事了!为是什么一个Android项目启动会联系到RxJava呢?因为在RxJava使用起来得到广泛的认可,又是基于Java语言的.自然会有善于组织和总结的开发者联想到Android!没错,RxAndroid就这样在RxJava的基础上,针对Android开发的一个库.今天我们主要是来讲

Android 勤用RXJava compose操作符消除重复代码

相信小伙伴在使用RXJava与Retrofit请求网络时,都有遇到过这样的场景,在IO线程请求网络解析数据,接着返回主线程setData.更新View试图,那么也肯定熟悉下面这几句代码: .subscribeOn(Schedulers.io()) .unsubscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber); 如果网络请求的次数比较少, 作为一名不拘小节(懒癌)的

Rxjava基础

现在很多Android App的开发开始使用Rxjava,但是Rxjava以学习曲线陡峭著称,入门有些困难.经过一段时间的学习和使用,这里来介绍一下我对Rxjava的理解. 说到Rxjava首先需要了解的两个东西,一个是Observable(被观察者,事件源)和 Subscriber(观察者,是 Observer的子类).Observable发出一系列事件,Subscriber处理这些事件.首先来看一个基本的例子,我们如何创建并使用Observable. Observable.create(ne

RxJava(11-线程调度Scheduler)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51821940 本文出自:[openXu的博客] 目录: 使用示例 subscribeOn原理 多次subscribeOn的情况 observeOn原理 调度器的种类 各种操作符的默认调度器 源码下载 ??RxJava中 使用observeOn和subscribeOn操作符,你可以让Observable在一个特定的调度器上执行,observeOn指示一个Observable在一个特定的调度器

RxJava Map操作详解

2016-06-06 RxJava是最近两年火起来的一个框架,核心是异步,但是对于我来说印象最深的是响应式编程的思想.最近刚好想把自己的项目改成用RxJava实现,所以就研究了下.抛物线和大头鬼两位大牛也讲解的很详细和形象,其实RxJava里除了这种响应式的编程思想不太好理解外,操作符也是比较难理解的一部分.响应式编程思想不是三言两语就能讲清楚,想学习的人也不是通过看几遍blog就能学会的.我这里主要是讲操符,通过分解的方式一步一步带领大家看着到底是怎么回事,就以常用的map为例. 首先看一段伪