Rxjava操作符compose

原文链接:http://blog.danlew.net/2015/03/02/dont-break-the-chain/

在RxJava中一种比较nice的思想是能够通过一系列的操作符看到数据是如何转换的:

Observable.from(someSource)
    .map(data -> manipulate(data))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(data -> doSomething(data));
  • 1
  • 2
  • 3
  • 4
  • 5

假设想要给多个流重复利用一系列操作符该怎么办呢?比如,我想在工作线程中处理数据,在主线程中订阅,所以必须频繁使用subscribeOn() 和observeOn()。如果我能将这逻辑通过连续的、可重复的方式应用到我的所有流上,那简直太棒了。

糟糕的实现方式

下面这个方法是我用过数月的“反模式”方法。
注:反模式(英文:Anti-patterns或pitfalls), 是指用来解决问题的带有共同性的不良方法。

The following is the anti-pattern I used for many months and is bad (and I feel bad).

首先,创建一个方法,该方法适用于调度器

<T> Observable<T> applySchedulers(Observable<T> observable) {
    return observable.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
}
  • 1
  • 2
  • 3
  • 4

然后调用applySchedulers(),包装你的Observable链:

applySchedulers(
    Observable.from(someSource)
        .map(data -> manipulate(data))
    )
    .subscribe(data -> doSomething(data));
  • 1
  • 2
  • 3
  • 4
  • 5

虽然这样写能运行但是很丑很乱——因为applySchedulers()导致了它不再是操作符,因此很难在其后面追加其他操作符。现在,当你多用几次这种“反模式”在单一流上之后你就会觉得它有多坏了。

Transformers登场

在RXJava背后有一群聪明的人意识到了这是一个问题并且提供了解决方案: Transformer ,它和 Observable.compose() 一起使用。

Transformer is actually just Func1<Observable<T>, Observable<R>>. 
  • 1

换句话说就是提供给他一个Observable它会返回给你另一个Observable,这和内联一系列操作符有着同等功效。

实际操作下,写个方法,创建一个Transformer调度器:

<T> Transformer<T, T> applySchedulers() {
    return new Transformer<T, T>() {
        @Override
        public Observable<T> call(Observable<T> observable) {
            return observable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

用lambda表达式看上去会好看些:

<T> Transformer<T, T> applySchedulers() {
    return observable -> observable.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
}
  • 1
  • 2
  • 3
  • 4

现在调用这个方法会是怎么样的呢:

Observable.from(someSource)
    .map(data -> manipulate(data))
    .compose(applySchedulers())
    .subscribe(data -> doSomething(data));
  • 1
  • 2
  • 3
  • 4

是不是好很多!我们既重用了代码又保护了链式。如果你用的是JDK 7以下版本,你不得不利用compose() 做一些多余的工作。比如说,你得告诉编译器返回的类型,像这样:

Observable.from(someSource)
    .map(data -> manipulate(data))
    .compose(this.<YourType>applySchedulers())
    .subscribe(data -> doSomething(data));
  • 1
  • 2
  • 3
  • 4

复用Transformers

如果你经常做从一个具体类型转换到另一个类型来创建一个实例:

Transformer<String, String> myTransformer = new Transformer<String, String>() {
    // ...Do your work here...
};
  • 1
  • 2
  • 3

用Transformer会怎么样呢?它压根就不会考虑类型问题,但你又不能定义一个通用的实例。

// Doesn‘t compile; where would T come from?
Transformer<T, T> myTransformer;  
  • 1
  • 2

你可以把它改成Transformer< Object, Object>,但是返回的Observable会丢失它的类型信息。

解决这个问题我是从 Collections 中得到了灵感,Collections是一堆类型安全、不可变的空集合的生成方法(比如 Collections.emptyList() )。本质上它采用了非泛型实例,然后通过方法包裹附加泛型信息。

final Transformer schedulersTransformer =
    observable -> observable.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());

@SuppressWarnings("unchecked")
<T> Transformer<T, T> applySchedulers() {
    return (Transformer<T, T>) schedulersTransformer;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

现在我们终于把他做成一个“单例”了(译者注:此单例非彼单例)。

警告:无论如何你都有可能陷入类型转换异常的坑。确保你的Transformer真的是类型无关的。另一方面,你可能会在运行时抛出ClassCastException异常,即使你的代码编译通过了。在这个例子里面,因为调度器没有和发射的项目发生互动,所以它是类型安全的。

和flatMap()有啥区别?

compose()和flatMap()有啥区别呢。他们都是发射出Observable,是不是就是说他们都可以复用一系列操作符呢?

The difference is that compose() is a higher level abstraction: it operates on the entire stream, not individually emitted items. In more specific terms:

中文翻译:

区别在于compose()是高等级的抽象,他操作的是整个流,而不是单一发射出的项目,这里有更多的解释:
VS
不同点在于compose()操作符拥有更高层次的抽象概念:它操作于整个数据流中,不仅仅是某一个被发送的事件。具体如下:

原文:

1 compose() is the only way to get the original Observable from the stream. Therefore, operators that affect the whole stream (like subscribeOn() and observeOn()) need to use compose().

In contrast, if you put subscribeOn()/observeOn() in flatMap(), it would only affect the Observable you create in flatMap() but not the rest of the stream.

2 compose() executes immediately when you create the Observable stream, as if you had written the operators inline. flatMap() executes when its onNext() is called, each time it is called. In other words, flatMap() transforms each item, whereas compose() transforms the whole stream.

3 flatMap() is necessarily less efficient because it has to create a new Observable every time onNext() is called. compose() operates on the stream as it is.

中文翻译1:

  1. compose()是唯一一个能从流中获取原生Observable 的方法,因此,影响整个流的操作符(像subscribeOn()和observeOn())需要使用compose(),相对的,如果你在flatMap()中使用subscribeOn()/observeOn(),它只影响你创建的flatMap()中的Observable,而不是整个流。
  2. 当你创建一个Observable流并且内联了一堆操作符以后,compose()会立即执行,flatMap()则是在onNext()被调用以后才会执行,换句话说,flatMap()转换的是每个项目,而compose()转换的是整个流。
  3. flatMap()一定是低效率的,因为他每次调用onNext()之后都需要创建一个新的Observable,compose()是操作在整个流上的。

中文翻译2:

  1. compose()是唯一一个能够从数据流中得到原始Observable的操作符,所以,那些需要对整个数据流产生作用的操作(比如,subscribeOn()和observeOn())需要使用compose()来实现。相较而言,如果在flatMap()中使用subscribeOn()或者observeOn(),那么它仅仅对在flatMap()中创建的Observable起作用,而不会对剩下的流产生影响(译者注:深坑,会在后面的系列着重讲解,欢迎关注)。
  2. 当创建Observable流的时候,compose()会立即执行,犹如已经提前写好了一个操作符一样,而flatMap()则是在onNext()被调用后执行,onNext()的每一次调用都会触发flatMap(),也就是说,flatMap()转换每一个事件,而compose()转换的是整个数据流。
  3. 因为每一次调用onNext()后,都不得不新建一个Observable,所以flatMap()的效率较低。事实上,compose()操作符只在主干数据流上执行操作。

如果你想用可重用的代码替换一些操作符,可以利用compose()和flatMap(),但他们不是唯一的解决办法。

再分享一下我老师大神的人工智能教程吧。零基础!通俗易懂!风趣幽默!还带黄段子!希望你也加入到我们人工智能的队伍中来!http://www.captainbed.net

原文地址:https://www.cnblogs.com/skiwnchhw/p/10472339.html

时间: 2024-11-05 17:30:04

Rxjava操作符compose的相关文章

不要打断链式结构:使用 RxJava的 compose() 操作符

不要打断链式结构:使用 RxJava的 compose() 操作符

RxJava操作符总结之过滤

RxJava操作符总结之过滤 jsut() just(T t1, T t2, T t3 ....) ,just能够传入多个同样类型的參数,并将当前參数一个接着一个的发送. Observable.just("1","2","3") .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); 1

RxJava操作符——条件和布尔操作符(Conditional and Boolean Operators)

RxJava系列教程: 1. RxJava使用介绍 [视频教程] 2. RxJava操作符 ? Creating Observables(Observable的创建操作符) [视频教程] ? Transforming Observables(Observable的转换操作符) [视频教程] ? Filtering Observables(Observable的过滤操作符) [视频教程] ? Combining Observables(Observable的组合操作符) [视频教程] ? Erro

Android RxJava操作符一览

前言 把现在接触到的操作符全部整理进来,方便查阅,遇到新的也会添加进来.和RxJavaLearn 的README.md同步更新. 操作符决策树 直接创建一个Observable(创建操作) 组合多个Observable(组合操作) 对Observable发射的数据执行变换操作(变换操作) 从Observable发射的数据中取特定的值(过滤操作) 转发Observable的部分值(条件/布尔/过滤操作) 对Observable发射的数据序列求值(算术/聚合操作) 创建操作 用于创建Observab

RxJava操作符(03-变换操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51649975 本文出自:[openXu的博客] 目录: Buffer FlatMap flatMapIterable concatMap switchMap GroupBy Map cast Scan Window 源码下载 ??变换操作符的作用是对Observable发射的数据按照一定规则做一些变换操作,然后将变换后的数据发射出去,接下来看看RxJava中主要有哪些变换操作符: 1. B

RxJava操作符 -创建型

操作符类型 创建操作 变换操作 过滤操作 组合操作 错误处理 辅助操作 条件和布尔操作 算术和聚合操作 连接操作 转换操作 创建操作 create 你可以使用create操作符从头开始创建一个Observable,给这个操作符传递一个接受观察者作为参数的函数,编写这个函数让它的行为表现为一个Observable–恰当的调用观察者的onNext,onError和onCompleted方法. 一个形式正确的有限Observable必须尝试调用观察者的onCompleted正好一次或者它的onErro

RxJava操作符(二) __变换操作

RxJava变换操作符 这周真心比较累,一直都在加班,今天才有点自己的时间来学习新的内容,外包工作苦啊! 上周学习了各种创建操作符,像create,from,Just,Defer-.等等,这周中也工作中也用了不少,有时间也需要总结一下自己在工作中使用的操作符.好了,现在来开始学习一个变换操作符吧,不知道什么意思没关系,一个一个去试错吧. map 官方的翻译是对于Observable发射的每一项数据,都会应用一个函数,执行变换操作,然后返回一个发射这些结果的Observable. 还是举个例子吧,

RxJava操作符(05-结合操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51656736 本文出自:[openXu的博客] 目录: CombineLatest Join Merge StartWith Switch Zip 源码下载 结合操作就是将多个Observable发射的数据按照一定规则组合后发射出去,接下来看看RxJava中的结合操作符: 1. CombineLatest ??当两个Observables中的任何一个发射数据时,使用一个函数结合每个Obse

RxJava操作符repeatWhen()和retryWhen()

第一次见到.repeatWhen()和.retryWhen()这两个操作符的时候就非常困惑了.不得不说,它们绝对是"最令人困惑弹珠图"的有力角逐者. 然而它们都是非常有用的操作符:允许你有条件的重新订阅已经结束的Observable.我最近研究了它们的工作原理,现在我希望尝试着去解释它们(因为,我也是耗费了一些精力才参透它们). Repeat与Retry的对比 首先,来了解一下.repeat()和.retry()之间最直观的区别是什么?这个问题并不难:区别就在于什么样的终止事件会触发重