RxJava使用(四)变换

RxJava 提供了对事件序列进行变换的支持;所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。

       不仅可以针对事件对象,还可以针对整个事件队列。

       变换部分主要来自《给Android 开发者的 RxJava 详解》

1.   事件对象变换-map()

 
map()
:
事件对象的直接变换;它是 RxJava
最常用的变换;可以将Observable深入的对象1转换为对象2发送给Subscriber。

基本用法如下:

Observable.just(R.drawable.t)

.map(new Func1<Integer, Drawable>() {

@Override

public Drawable call(Integer integer) {

return getResources().getDrawable(integer);

}

})

.subscribe(new Action1<Drawable>() {

@Override

public void call(Drawable drawable) {

showImg(drawable);

}

});

2.    事件序列变换-flatMap()

flatMap() 也和 map() 相同,也是把传入的参数转化之后返回另一个对象;和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。

         flatMap() 的原理:

       1. 使用传入的事件对象创建一个 Observable 对象;

       2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;

       3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。

       这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。

//

扩展:由于可以在嵌套的Observable 中添加异步代码, flatMap() 也常用于嵌套的异步操作,例如嵌套的网络请求。

flatmap()基本用法如下,定义了一个Book类型,包含名称和章节列表;通过flatmap()打印出全部的章节列表。

?  Book.java

public class Book {

public String name;

public List<String> chapterList = new ArrayList<String>();

public void addChapter(String chapter) {

chapterList.add(chapter);

}

}

?  flagmap()使用

Book[] books = getBookList(5);

Observable.from(books)

.flatMap(new Func1<Book, Observable<String>>() {

@Override

public Observable<String> call(Book book) {

return Observable.from(book.chapterList);

}

})

.subscribe(new Action1<String>() {

@Override

public void call(String s) {

Log.v(TAG, s);

}

});

3.     变换的原理:lift()

所有的变换功能可能有所不同,实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。

lift()实现的源码:

/**

* Lifts a function to the current Observable and returns a new Observable that when subscribed to will pass

* the values of the current Observable through the Operator function.

* <p>

* In other words, this allows chaining Observers together on an Observable for acting on the values within

* the Observable.

* <p> {@code

* observable.map(...).filter(...).take(5).lift(new OperatorA()).lift(new OperatorB(...)).subscribe()

* }

* <p>

* If the operator you are creating is designed to act on the individual items emitted by a source

* Observable, use {@code lift}. If your operator is designed to transform the source Observable as a whole

* (for instance, by applying a particular set of existing RxJava operators to it) use {@link #compose}.

* <dl>

*  <dt><b>Scheduler:</b></dt>

*  <dd>{@code lift} does not operate by default on a particular {@link Scheduler}.</dd>

* </dl>

*

* @param operator the Operator that implements the Observable-operating function to be applied to the source

*             Observable

* @return an Observable that is the result of applying the lifted Operator to the source Observable

* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>

*/

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {

return new Observable<R>(new OnSubscribe<R>() {

@Override

public void call(Subscriber<? super R> o) {

try {

Subscriber<? super T> st = hook.onLift(operator).call(o);

try {

// new Subscriber created and being subscribed with so ‘onStart‘ it

st.onStart();

onSubscribe.call(st);

} catch (Throwable e) {

// localized capture of errors rather than it skipping all operators

// and ending up in the try/catch of the subscribe method which then

// prevents onErrorResumeNext and other similar approaches to error handling

Exceptions.throwIfFatal(e);

st.onError(e);

}

} catch (Throwable e) {

Exceptions.throwIfFatal(e);

// if the lift function failed all we can do is pass the error to the final Subscriber

// as we don‘t have the operator available to us

o.onError(e);

}

}

});

}

通过lift()源码可以发现,生成了一个新的Observable(new);并且在新的Observable(new)的OnSubscribe(new)的call()回调方法中,创建了一个新的Subscriber(new),该新的Subscriber(最终目标)使用了最终调用的Subscriber的代理;再使用原来的Observable(old)的onSubscribe(old)来调用新的Subscriber(new)。

图示lift()变换过程-(图片来自《给Android 开发者的 RxJava 详解》

lift() 实现过程

1)  lift() 创建一个新Observable ,此时加上之前的原始 Observable,已经有两个 Observable 了;

2) 而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe;

3)  当用户调用经过 lift() 后的 Observable 的 subscribe() 的时候,使用的是 lift() 所返回的新的 Observable ,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新OnSubscribe,即在 lift() 中生成的那个 OnSubscribe;

4) 而这个新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在这个 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的 Subscriber(Operator 就是在这里,通过自己的call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的『变换』代码以实现变换),然后利用这个新
Subscriber 向原始 Observable 进行订阅。

lift() 过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。

在 Observable 执行了lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。

4.   自定义Observable.Operator,直接调用lift()

下面实例实现的功能跟前面flatMap()实现的功能一致:

Observable.from(getBookList(4))

.lift(new Observable.Operator<String, Book>() {

@Override

public Subscriber<? super Book> call(final Subscriber<? super String> subscriber) {

return new Subscriber<Book>() {

@Override

public void onCompleted() {

subscriber.onCompleted();

}

@Override

public void onError(Throwable e) {

subscriber.onError(e);

}

@Override

public void onNext(Book book) {

for(String chapter : book.chapterList) {

subscriber.onNext(chapter);

}

}

};

}

})

.subscribe(new Action1<String>() {

@Override

public void call(String s) {

Log.v(TAG, s);

}

});

5.   compose(Transformer)变换

compose(Transformer)变换,针对Observable 自身进行变换。假设在程序中有多个 Observable ,并且他们都需要应用一组相同的 lift() 变换,可以使用compose方式处理,而不是每个都调用一次相同的变换过程

实例代码,如需要将Book转换得到对应的全部章节,两个Observable对章节数据有不同的处理方式:

final Observable.Transformer bookTtransformer = new Observable.Transformer<Book, String>() {

@Override

public Observable<String> call(Observable<Book> bookObservable) {

return bookObservable.flatMap(new Func1<Book, Observable<String>>() {

@Override

public Observable<String> call(Book book) {

return Observable.from(book.chapterList);

}

});

}

};

Observable.from(getBookList(4))

.compose(bookTtransformer)

.subscribe(new Action1<String>() {

@Override

public void call(String s) {

//处理方式1

Log.v(TAG, s);

}

});

////

Observable.from(getBookList(2))

.compose(bookTtransformer)

.subscribe(new Action1<String>() {

@Override

public void call(String s) {

//处理方式2

Log.v(TAG, s);

}

});

时间: 2024-10-12 02:32:31

RxJava使用(四)变换的相关文章

RxJava 教程-1 简介 原理 线程控制 变换

简介 RxJava 是什么? RxJava 在 GitHub 主页上的自我介绍是 RxJava is a Java VM implementation of ReactiveX: a library for composing asynchronous and event-based programs by using observable sequences. RxJava是 ReactiveX 在JVM上的一个实现:一个使用可观测的序列(observable sequences)来组成(co

RxJava系列之二 变换类操作符具体解释1

1.回想 上一篇文章我们主要介绍了RxJava , RxJava 的Observables和 RxJava的just操作符.以及RxJava一些经常使用的操作. 没看过的抓紧点我去看吧. 事实上RxJava有非常多的操作符, 而我们学过的just仅仅是创建类操作符的当中一种. 以后我会陆续介绍其它的创建类操作符. 文章代码地址:https://github.com/jiang111/RxJavaDemo 2. 变换类操作符之map 開始本篇文章的解说: map操作符的详细使用方法. map是属于

RxJava从入门到放弃---关于RxJava-入门必看

RxJava 到底是什么 RxJava 好在哪 API 介绍和原理简析 1. 概念:扩展的观察者模式 观察者模式 RxJava 的观察者模式 2. 基本实现 1) 创建 Observer 2) 创建 Observable 3) Subscribe (订阅) 4) 场景示例 a. 打印字符串数组 b. 由 id 取得图片并显示 3. 线程控制 -- Scheduler (一) 1) Scheduler 的 API (一) 2) Scheduler 的原理 (一) 4. 变换 1) API 2) 变

给 Android 开发者的 RxJava 详解

作者:扔物线 前言 我从去年开始使用 RxJava ,到现在一年多了.今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使用的场景越来越多 .而最近这几个月,我也发现国内越来越多的人开始提及 RxJava .有人说『RxJava 真是太好用了』,有人说『RxJava 真是太难用了』,另外更多的人表示:我真的百度了也谷歌了,但我还是想问: RxJava 到底是什么? 鉴于 RxJava 目前这种既火爆又神秘的现状,而我又在一年的使用

深入浅出RxJava就这一篇就够了

前言: 第一次接触RxJava是在前不久,一个新Android项目的启动,在评估时选择了RxJava.RxJava是一个基于事件订阅的异步执行的一个类库.听起来有点复杂,其实是要你使用过一次,就会大概明白它是怎么回事了!为是什么一个Android项目启动会联系到RxJava呢?因为在RxJava使用起来得到广泛的认可,又是基于Java语言的.自然会有善于组织和总结的开发者联想到Android!没错,RxAndroid就这样在RxJava的基础上,针对Android开发的一个库.今天我们主要是来讲

RxJava响应式编程之初级了解

据说现在流行的开发模式是 Retrofit+RxJava+MVP+ButterKnife 如果想要简单学习ButterKnife.MVP模式,可以参考我以前的例子 使用butterknife注解框架 Android-MVP设计模式高级(三) 今天我就简单来学习下RxJava的相关知识 以前我也只是听说过RxJava,RxJava这个到底是什么东西呢? 呵呵,它其实是一个库,所以我们使用里面的方法,得需要下载库,所以我们需要在AS中进行配置 1.RxJava 地址以及添加 github地址: ht

78. Android之 RxJava 详解

转载:http://gank.io/post/560e15be2dca930e00da1083 前言 我从去年开始使用 RxJava ,到现在一年多了.今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使用的场景越来越多 .而最近这几个月,我也发现国内越来越多的人开始提及 RxJava .有人说『RxJava 真是太好用了』,有人说『RxJava 真是太难用了』,另外更多的人表示:我真的百度了也谷歌了,但我还是想问: RxJava

转:给 Android 开发者的 RxJava 详解

转自:  http://gank.io/post/560e15be2dca930e00da1083 评注:多图解析,但是我还是未看懂. 前言 我从去年开始使用 RxJava ,到现在一年多了.今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使用的场景越来越多 .而最近这几个月,我也发现国内越来越多的人开始提及 RxJava .有人说『RxJava 真是太好用了』,有人说『RxJava 真是太难用了』,另外更多的人表示:我真的百度

RxJava初探

1. 简史 ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET.JavaScript和C++,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网

RxJava 入门

简介 项目地址: https://github.com/ReactiveX/RxJava https://github.com/ReactiveX/RxAndroid 引入依赖: compile 'io.reactivex:rxjava:1.1.0' compile 'io.reactivex:rxandroid:1.1.0' RxJava 是什么? 一个词:异步. RxJava 在 GitHub 主页上的自我介绍是["a library for composing组成 asynchronous