RxAndroid 常见用法
在项目里面添加
compile ‘io.reactivex:rxandroid:1.1.0‘
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava‘s latest version for bug fixes and new features.
compile ‘io.reactivex:rxjava:1.1.0‘
```
例子1:
```java
Observable.just("one", "two", "three", "four", "five")
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i("lxm0",s);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.i("lxm0","eeee");
}
}, new Action0() {
@Override
public void call() {
Log.i("lxm0","wwwwwww");
}
});
<div class="se-preview-section-delimiter"></div>
map对数据作为对输入数据做一次处理,然后再传给Action1
Observable.just(1, 2, 3, 4, 4)
.map(new Func1<Integer, String>() {
@Override
public String call(Integer num) {
return num+"";
}
;}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i("lxm",s);
}
});
<div class="se-preview-section-delimiter"></div>
从上面的代码可以看出, flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber的回调方法中。 flatMap() 的原理是这样的:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。
from从里面拿出每一个,调用flatmap将数据再次包裹起来,变换后再次进入filter进入过滤条件,过滤条件有前后顺序,
flatMap 将里面的元素进行每个重新包装,再次作为一个被观察者发送出去。AndroidSchedulers.mainThread() 主线程。
onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。
3.将传入的 Subscriber 作为 Subscription 返回。这是为了方便 unsubscribe(). 需要撤销
Observable.from(folders)
.flatMap(new Func1<File, Observable<File>>() {
@Override
public Observable<File> call(File file) {
return Observable.from(file.listFiles());
}
})
.filter(new Func1<File, Boolean>() {
@Override
public Boolean call(File file) {
return file.getName().endsWith(".png");
}
})
.map(new Func1<File, Bitmap>() {
@Override
public Bitmap call(File file) {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
imageCollectorView.addImage(bitmap);
}
});
<div class="se-preview-section-delimiter"></div>
构造一个被观察者
Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread()更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
}).subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
<div class="se-preview-section-delimiter"></div>
继续来看
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线程,由 observeOn() 指定
<div class="se-preview-section-delimiter"></div>
关键的东西,将被观察者的一些东西执行在主线程.doOnSubscribe的内容运行在下面的subscribeOn指定的线程上
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
<div class="se-preview-section-delimiter"></div>
RxJava 的适用场景和使用方式
- 与 Retrofit 的结合
Retrofit 是 Square 的一个著名的网络请求库。没有用过 Retrofit 的可以选择跳过这一小节也没关系,我举的每种场景都只是个例子,而且例子之间并无前后关联,只是个抛砖引玉的作用,所以你跳过这里看别的场景也可以的。
Retrofit 除了提供了传统的 Callback 形式的 API,还有 RxJava 版本的 Observable 形式 API。下面我用对比的方式来介绍 Retrofit 的 RxJava 版 API 和传统版本的区别。
以获取一个 User 对象的接口作为例子。使用Retrofit 的传统 API,你可以用这样的方式来定义请求:
@GET("/user")
public void getUser(@Query("userId") String userId, Callback<User> callback);
<div class="se-preview-section-delimiter"></div>
在程序的构建过程中, Retrofit 会把自动把方法实现并生成代码,然后开发者就可以利用下面的方法来获取特定用户并处理响应:
getUser(userId, new Callback<User>() {
@Override
public void success(User user) {
userView.setUser(user);
}
@Override
public void failure(RetrofitError error) {
// Error handling
...
}
};
<div class="se-preview-section-delimiter"></div>
而使用 RxJava 形式的 API,定义同样的请求是这样的:
@GET("/user")
public Observable<User> getUser(@Query("userId") String userId);
<div class="se-preview-section-delimiter"></div>
代码变为:
getUser(userId)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onNext(User user) {
userView.setUser(user);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable error) {
// Error handling
...
}
});
<div class="se-preview-section-delimiter"></div>
看到区别了吗?
当 RxJava 形式的时候,Retrofit 把请求封装进 Observable ,在请求结束后调用 onNext() 或在请求失败后调用 onError()。
对比来看, Callback 形式和 Observable 形式长得不太一样,但本质都差不多,而且在细节上 Observable 形式似乎还比Callback 形式要差点。那 Retrofit 为什么还要提供 RxJava 的支持呢?
因为它好用啊!从这个例子看不出来是因为这只是最简单的情况。而一旦情景复杂起来, Callback 形式马上就会开始让人头疼。比如:
假设这么一种情况:你的程序取到的 User 并不应该直接显示,而是需要先与数据库中的数据进行比对和修正后再显示。使用Callback 方式大概可以这么写:
getUser(userId, new Callback<User>() {
@Override
public void success(User user) {
processUser(user); // 尝试修正 User 数据
userView.setUser(user);
}
@Override
public void failure(RetrofitError error) {
// Error handling
...
}
};
<div class="se-preview-section-delimiter"></div>
有问题吗?
很简便,但不要这样做。为什么?因为这样做会影响性能。数据库的操作很重,一次读写操作花费 10~20ms 是很常见的,这样的耗时很容易造成界面的卡顿。所以通常情况下,如果可以的话一定要避免在主线程中处理数据库。所以为了提升性能,这段代码可以优化一下:
getUser(userId, new Callback<User>() {
@Override
public void success(User user) {
new Thread() {
@Override
public void run() {
processUser(user); // 尝试修正 User 数据
runOnUiThread(new Runnable() { // 切回 UI 线程
@Override
public void run() {
userView.setUser(user);
}
});
}).start();
}
@Override
public void failure(RetrofitError error) {
// Error handling
...
}
};
<div class="se-preview-section-delimiter"></div>
性能问题解决,但……这代码实在是太乱了,迷之缩进啊!杂乱的代码往往不仅仅是美观问题,因为代码越乱往往就越难读懂,而如果项目中充斥着杂乱的代码,无疑会降低代码的可读性,造成团队开发效率的降低和出错率的升高。
这时候,如果用 RxJava 的形式,就好办多了。 RxJava 形式的代码是这样的:
getUser(userId)
.doOnNext(new Action1<User>() {
@Override
public void call(User user) {
processUser(user);
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onNext(User user) {
userView.setUser(user);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable error) {
// Error handling
...
}
});
```
后台代码和前台代码全都写在一条链中,明显清晰了很多。
再举一个例子:假设 /user 接口并不能直接访问,而需要填入一个在线获取的 token ,代码应该怎么写?
Callback 方式,可以使用嵌套的 Callback:
<div class="se-preview-section-delimiter"></div>
```java
@GET("/token")
public void getToken(Callback<String> callback);
@GET("/user")
public void getUser(@Query("token") String token, @Query("userId") String userId, Callback<User> callback);
...
getToken(new Callback<String>() {
@Override
public void success(String token) {
getUser(userId, new Callback<User>() {
@Override
public void success(User user) {
userView.setUser(user);
}
@Override
public void failure(RetrofitError error) {
// Error handling
...
}
};
}
@Override
public void failure(RetrofitError error) {
// Error handling
...
}
});
倒是没有什么性能问题,可是迷之缩进毁一生,你懂我也懂,做过大项目的人应该更懂。
而使用 RxJava 的话,代码是这样的:
@GET("/token")
public Observable<String> getToken();
@GET("/user")
public Observable<User> getUser(@Query("token") String token, @Query("userId") String userId);
...
getToken()
.flatMap(new Func1<String, Observable<User>>() {
@Override
public Observable<User> onNext(String token) {
return getUser(token, userId);
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onNext(User user) {
userView.setUser(user);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable error) {
// Error handling
...
}
});
```
用一个 flatMap() 就搞定了逻辑,依然是一条链。看着就很爽,是吧?
好,Retrofit 部分就到这里。
2. RxBinding
RxBinding 是 Jake Wharton 的一个开源库,它提供了一套在 Android 平台上的基于 RxJava 的 Binding API。所谓 Binding,就是类似设置 OnClickListener 、设置 TextWatcher 这样的注册绑定对象的 API。
举个设置点击监听的例子。使用 RxBinding ,可以把事件监听用这样的方法来设置:
Button button = ...;
RxView.clickEvents(button) // 以 Observable 形式来反馈点击事件
.subscribe(new Action1<ViewClickEvent>() {
@Override
public void call(ViewClickEvent event) {
// Click handling
}
});
看起来除了形式变了没什么区别,实质上也是这样。甚至如果你看一下它的源码,你会发现它连实现都没什么惊喜:它的内部是直接用一个包裹着的 setOnClickListener() 来实现的。然而,仅仅这一个形式的改变,却恰好就是 RxBinding 的目的:扩展性。通过 RxBinding 把点击监听转换成 Observable 之后,就有了对它进行扩展的可能。扩展的方式有很多,根据需求而定。一个例子是前面提到过的 throttleFirst() ,用于去抖动,也就是消除手抖导致的快速连环点击:
```java
// 感谢大家能够完整阅读整篇文章,如果喜欢,请分享并打赏支持下~.~!
// 更多文章,敬请期待啦。