RxJava defer操作符实现代码支持链式调用

前言

现在越来越多Android开发者使用到RxJava,在Android使用RxJava主要有如下好处:

1,轻松切换线程。以前我们切换线程主要使用Handler等手段来做。

2,轻松解决回调的嵌套问题。现在的app业务逻辑越来越复杂,多的时候3,4层回调嵌套,使得代码可维护性变得很差。RxJava链式调用使得这些调用变得扁平化。

随着RxJava的流行,越来越多的开源项目开始支持RxJava,像Retrofit、GreenDao等。这些开源项目支持RxJava使得我们解决复杂业务变得非常方便。

但是这些还不够,有的时候我们自己的封装的业务也需要支持RxJava,举个例子:查询数据、处理本地文件等操作,总而言之就是一些耗时任务。而且还要处理这些操作的成功、失败、线程切换等操作。

如果还是想以前那样做,那就太low。

下面就来探讨下如何使得代码支持RxJava风格

遇到这种问题,在我脑海里浮现的第一种方式就是通过Observable的create操作符。因为在里面我们可以控制数据的发射。就像上一篇文章那样《RxJava switchIfEmpty操作符实现Android检查本地缓存逻辑判断》

如下代码片段:

Observable.create(new Observable.OnSubscribe<Object>() {
            @Override
            public void call(Subscriber<? super Object> subscriber) {
                try {
                    List<Article> as = articleDao.queryBuilder()
                            .where(ArticleDao.Properties.CategoryId.eq(categoryId))
                            .orderDesc(ArticleDao.Properties.Id)
                            .offset((pageIndex - 1) * pageSize)
                            .limit(pageSize).list();
                    if (as == null || as.isEmpty()) {
                        subscriber.onNext(null);
                    }else{
                        subscriber.onNext(as);
                    }
                }catch (Exception e){
                    subscriber.onError(e);
                }
                subscriber.onCompleted();
            }
        });

这样确实没有没有问题。但是我们要封装下, 每个方法都这样写维护性和扩展比较差(例如有天我想换种方式来实现而不是create,如果通过方法封装一下,修改就变得容易多了)

如何封装呢?通过分析知道,大部分代码是相同的,只是我们的业务不一样。那么通过模板方法解决吧。业务方法通过接口回调的方式传递进来,因为我们不知道调用者是什么业务。

回调接口如下(T表示我们业务数据):

    public interface MyCallable<T> {
        T call();
    }

下面是模板代码:

    protected <R> Observable<R> createObservable(final MyCallable<R> callable) {
        return Observable.create(new Observable.OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> subscriber) {
                try {
                    R result = callable.call();
                    subscriber.onNext(result);
                } catch (Exception e) {
                    subscriber.onError(e);
                }
                subscriber.onCompleted();
            }
        });
    }

使用就非常简单了调用createObservable方法,实现MyCallable接口即可,然后就是跟使用RxJava一样处理逻辑。

分析greendao是如何支持RxJava风格的

看过Greendao源码的人知道,它也是通过这种方式支持RxJava的(下面看看他是怎么做的):

    /**
     * Rx version of {@link AbstractDao#loadAll()} returning an Observable.
     */
    @Experimental
    public Observable<T> load(final K key) {
        return wrap(new Callable<T>() {
            @Override
            public T call() throws Exception {
                return dao.load(key);
            }
        });
    }

最终的实现也是通过dao.load(key)同步方法来实现的,关键是wrap方法了:

    protected <R> Observable<R> wrap(Callable<R> callable) {
        return wrap(RxUtils.fromCallable(callable));
    }

    //通过这个方法再包装了一层(就是默认设置执行的线程)
    protected <R> Observable<R> wrap(Observable<R> observable) {
        if (scheduler != null) {
            return observable.subscribeOn(scheduler);
        } else {
            return observable;
        }
    }

通过代码可以看到默认执行的线程是IO线程:

    /**
     * The returned RxDao is a special DAO that let‘s you interact with Rx Observables using RX‘s IO scheduler for
     * subscribeOn.
     *
     * @see #rxPlain()
     */
    @Experimental
    public RxDao<T, K> rx() {
        if (rxDao == null) {
            rxDao = new RxDao<>(this, Schedulers.io());
        }
        return rxDao;
    }

所以使用greendao不用指定它在IO执行,因为sdk已经帮我们设置了。

然后就是RxUtils.fromCallable(callable)方法了:

class RxUtils {
    /** As of RxJava 1.1.7, Observable.fromCallable is still @Beta, so just in case... */
    @Internal
    static <T> Observable<T> fromCallable(final Callable<T> callable) {
        return Observable.defer(new Func0<Observable<T>>() {

            @Override
            public Observable<T> call() {
                T result;
                try {
                    result = callable.call();
                } catch (Exception e) {
                    return Observable.error(e);
                }
                return Observable.just(result);
            }
        });
    }
}

上面的注释说通过Observable.fromCallable也可以实现这样的逻辑,也就是说代替Observable.defer()方法。

最后greendao是通过defer操作符来实现rx风格的。

defer和create操作符有什么异同点?

通过分析greendao源码得知,他是通过defer来做的,我们是通过create操作符来做的。那两者有什么不同?

我们对defer操作符比较陌生,先看看它的源码:

    public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory) {
        return create(new OnSubscribeDefer<T>(observableFactory));
    }

说白了就是调用了create(OnSubscribe<T> f) 方法:

    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(hook.onCreate(f));
    }

其实我们上面的create操作也是调用过来这个方法。只是defer操作符传递的OnSubscribe是OnSubscribeDefer,那我们来看看这是什么鬼?

public final class OnSubscribeDefer<T> implements OnSubscribe<T> {
    final Func0<? extends Observable<? extends T>> observableFactory;

    public OnSubscribeDefer(Func0<? extends Observable<? extends T>> observableFactory) {
        this.observableFactory = observableFactory;
    }

    @Override
    public void call(final Subscriber<? super T> s) {
        Observable<? extends T> o;
        try {
            o = observableFactory.call();
        } catch (Throwable t) {
            Exceptions.throwOrReport(t, s);
            return;
        }
        o.unsafeSubscribe(Subscribers.wrap(s));
    }

}

OnSubscribeDefer也是继承自OnSubscribe,那么他的call方法肯定也是在订阅的时候被调用(就是说订阅的时候才创建这个observable,并且每次订阅都会创建一个新的observable)。

为什么Greendao没有使用create那种方式精确控制数据的发射?现在RxJava2.0对create操作符做出了一些限制,不能随随便便create了,这样出现一些问题。具体的rxJav2.0的改动可以看看

他的github说明What’s-different-in-2.0

关于RxJava的一些参考资料:

pitfalls-of-operator-implementations

subscribe vs unsafeSubscribe

What’s-different-in-2.0

时间: 2024-07-28 13:14:12

RxJava defer操作符实现代码支持链式调用的相关文章

js链式调用

我们都很熟悉jQuery了,只能jQuery中一种非常牛逼的写法叫链式操作 * $('#div').css('background','#ccc').removeClass('box').stop().animate({width:300}) 那这是如何实现的呢,我自己写了个例子:并非jQuery源码 Ferrinte.prototype.show=function () { for(var i=0;i<this.elements.length;i++) { this.elements[i].s

javascript设计模式 第6章 链式调用

链式调用是一种语法招数.作用:能让你通过重用一个初始化操作来达到用少量代码表达复杂操作的目的.. 这种技术包含两个部分: 1.一个创建代表html元素的对象的工厂.以及一批对这个html元素执行某些操作的方法. 通过例子对比:之前和之后的代码,对链式调用的概念的初步认识.  之前:   addevent($('example'),"click",function(){    setstyle(this,"color",'green');    show(this);

JavaScript设计模式——方法的链式调用

方法的链式调用: 1 (function() { 2 //私有类 3 function _$ (els) { 4 this.elements = []; 5 for(var i = 0, len = els.length; i < len; ++i){ 6 var element = els[i]; 7 if(typeof element === 'string'){ 8 element = document.getElementById(element); 9 } 10 this.elemen

如何写 JS 的链式调用 ---》JS 设计模式《----方法的链式调用

1.以$ 函数为例.通常返回一个HTML元素或一个元素集合. 代码如下: function $(){ var elements = []; for(var i=0;i<arguments.length;i++){ var element = argument[i]; if(typeOf element == "String") { element = document.getElementById(element); } if ( arguments.length === 1)

JavaScript链式调用

方法的链式调用 调用链的结构 对$函数你已经很熟悉了.它通常返回一个html元素或一个html元素的集合,如下: function$(){ var elements = []; for(vari=0,len=arguments.length;i<len;++i){ var element = arguments[i]; if(typeof element ===”string”){ element = document.getElementById(element); } if(argument

面向对象的链式调用

1. 对象的链式调用 function Chain(){ this.n=0;//属性不一定一开始的时候全部都要初始化 this.fn1=function(_obj){//this指向  new Chain()实例化的对象 alert(this.n++);//注意:alert(this.n++)与this.fn1中的this 不一定指向的对象是一样的 return this; } this.fn2=function(){//同上 alert(this.n++);//注意:alert(this.n+

进击的雨燕-------------可空链式调用

详情转自:http://wiki.jikexueyuan.com/project/swift/chapter2/07_Closures.html 可空链式调用(Optional Chaining)是一种可以请求和调用属性.方法及下标的过程,它的可空性体现于请求或调用的目标当前可能为空(nil).如果可空的目标有值,那么调用就会成功:如果选择的目标为空(nil),那么这种调用将返回空(nil).多个连续的调用可以被链接在一起形成一个调用链,如果其中任何一个节点为空(nil)将导致整个链调用失败.

在python中实现链式调用

用过jquery的一般都知道在jquery中可以链式调用,代码简洁优雅.比如$("a").addClass("test").show().html("foo");. 在redis-py中的pipeline中也可以链式调用,比如pipe.set('foo', 'bar').sadd('faz', 'baz').incr('auto_number').execute() . 那么究竟怎么实现的呢? 很简单,返回对象自己就行了,即return self

关于JavaScript中的setTimeout()链式调用和setInterval()探索

http://www.cnblogs.com/Wenwang/archive/2012/01/06/2314283.html http://www.cnblogs.com/yangjunhua/archive/2012/04/12/2444106.html 下面的参考:http://evantre.iteye.com/blog/1718777 1.选题缘起 在知乎上瞎逛的时候看到一个自问自答的问题: 知乎上,前端开发领域有哪些值得推荐的问答?,然后在有哪些经典的 Web 前端或者 JavaScr