Rxjava
一.为啥要用它?她能干啥?
- 网络请求等耗时操作必须放到子线程。线程切换
- reactive 式编程。代码更加健壮
使用观察者模式
创建:Rx可以方便的创建事件流和数据流
组合:Rx使用查询式的操作符组合和变换数据流
监听:Rx可以订阅任何可观察的数据流并执行操作
简化代码
函数式风格:对可观察数据流使用无副作用的输入输出函数,避免了程序里错综复杂的状态
简化代码:Rx的操作符通通常可以将复杂的难题简化为很少的几行代码
异步错误处理:传统的try/catch没办法处理异步计算,Rx提供了合适的错误处理机制
轻松使用并发:Rx的Observables和Schedulers让开发者可以摆脱底层的线程同步和各种并发问题
二.概念
Observable
:可观察对象,一旦数据产生或发生变化,会通过某种方式通知观察者(或订阅者)。Single
是它的特殊实现。- 操作符 :对
Observable
发射的数据项进行操作。 Observer
:观察者对象,监听Observable
发射的数据项并做出响应,Subscriber
是它的一个特殊实现。
NOTE: 真正强大的地方在于它的操作符,操作符让你可以变换、组合、操纵和处理Observable发射的数据。
三.操作符
“reactive”-某个问题已经发生时或之后执行某些操作。
大多数操作符处理一个Observable并返回一个Observable。这样你就可以在一个链条上一个接着一个地使用这些操作符。在这个链条上的每一个操作符可以“加工”前一个操作符处理后产生的Observable。
一个链条上的Observable操作符并不是孤立地处理产生这个链条的事件源Observable,而是轮流地处理。每一个操作符处理的是紧挨的上一个操作符产生的Observable。
创建操作符
create、just、from、defer、range、interval、timer、empty、never
create:
通过调用observabler的一系列方法从零开始组装一个Observable。
from:
将一些对象或其他数据结构(集合、Future)转化为Observable。
defer:
只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observable。
empty/never/error:
创建一个什么都不做直接通知完成/不发送任何元素/什么都不做直接通知错误的Observable。
just
类似于from,但是from会将数组或Iterable的元素取出然后逐个发射,而just只是简单的原样发射,将数组或Iterable当做单个数据。
注意:如果你传递null给Just,它会返回一个发射null值的Observable。不要误认为它会返回一个空Observable(完全不发射任何数据的Observable),如果需要空Observable你应该使用empty操作符。
- create(Observable.OnSubscribe f)
Observable.create(new Observable.OnSubscribe<PersonObj>() {
@Override
public void call(Subscriber<? super PersonObj> subscriber) {
try {
if (!subscriber.isUnsubscribed()) {
ArrayList<PersonObj> dataList = getDataList();
int size = dataList.size();
System.out.println("dataListSize = [" + size + "]");
for (int i = 0; i < size; i++) {
subscriber.onNext(dataList.get(i));
//if(i==3)subscriber.onCompleted();
}
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
}).subscribe(new Subscriber<PersonObj>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
System.out.println("e = [" + e.getMessage() + "]");
}
@Override
public void onNext(PersonObj personObj) {
System.out.println("personObj = [" + personObj + "]");
}
});
`defer(rx.functions.Func0<Observable<T>> observableFactory)
可以延迟发射Observable的数据项,直到观察者Observer订阅了这个Observable,这样可以使一个Observer获取最新的数据。
public class PersonObj {
....
public Observable<String> getAddressObservable() {
// return Observable.just(address);
return Observable.defer(() -> Observable.just(address));
}
...
}
改变这个人的地址后
public void defer() {
PersonObj personObj = new PersonObj("hefei", 19);
Observable<String> addressObservable = personObj.getAddressObservable();
personObj.setAddress("beijing");
addressObservable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("s = [" + s + "]");
}
});
}
- from vs just :逐个发射
from(@NotNull java.lang.Iterable<? extends T> iterable)
;整体发射just(T value...)
public void justVsFrom() {
ArrayList<String> strings = new ArrayList<>();
strings.add("12");
strings.add("13");
strings.add("14");
// Observable.from(strings)
Observable.just(strings)
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
System.out.println("o = [" + o + "]");
}
});
/* Observable.just(null)
.subscribe(o -> {
System.out.println("o = [" + o + "]");
});*/
}
- timer vs interval vs range
看一下下面的结果:
// Observable.timer(1, TimeUnit.SECONDS)
// Observable.interval(1, TimeUnit.SECONDS)
Observable.range(1, 60)
.subscribe(aLong -> {
System.out.println("aLong = [" + aLong + "]");
});
下面是一个获取验证码的倒计时例子:
public static final int DELAY_SECONDS = 60;
private int timer = 60;
...
private void startTimer() {
Subscription s = Observable.timer(1, TimeUnit.SECONDS) //延迟1s
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.repeat(DELAY_SECONDS)
.subscribe(aLong -> {
timer--;
if (timer > 0) {
btnSend.setText("重新发送(" + timer + "s)");
} else {
timer = DELAY_SECONDS;
btnSend.setText("重新发送");
btnSend.setEnabled(true);
}
});
addSubscription(s);
}
变换操作符
map、buffer、window、flatMap、groupBy
- map(?) —
map(rx.functions.Func1<? super T, ? extends R> func)
对Observable发射的每一个数据项,都应用一个函数来变换。
Observable.from(getDataList())
//.map(PersonObj::getName)
//入参和出参。上一个操作符传递给它的类型,它传递给下一个操作符的类型
.map(new Func1<PersonObj, String>() {
@Override
public String call(PersonObj personObj) {
//进行数据处理,返回给下一个操作符的相应类型的数据项
return personObj.getName();
}
})
.subscribe(s -> {
System.out.println("name = [" + s + "]");
});
- buffer() —
Observable<List<T>> buffer(long timespan, TimeUnit unit,int count)
(有多个重载方法) 它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个。
Observable.from(getDataList())//20个
.buffer(4)//每次打包的数据项的最大个数
.subscribe(new Action1<List<PersonObj>>() {
@Override
public void call(List<PersonObj> personObjs) {
System.out.println("personObjs = [" + personObjs + "]");//打印5次
}
});
- window() — 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项。
Observable.from(getDataList())//20个
.window(4)//每个Observable里发射的数据项最大个数
.subscribe(new Action1<Observable<PersonObj>>() {
@Override
public void call(Observable<PersonObj> personObjObservable) {
System.out.println("personObjObservable = [" + personObjObservable + "]");//5个Observable
personObjObservable.subscribe(new Action1<PersonObj>() {
@Override
public void call(PersonObj personObj) {
//每个Observable里发射的数据项
System.out.println("personObj = [" + personObj + "]");
}
});
}
});
- flatMap — 将接收到的数据项变换为Observable。
1.flatMap(rx.functions.Func1<? super T, ? extends Observable<? extends R>> func)
Observable.from(getDataList())
.flatMap(new Func1<PersonObj, Observable<String>>() {
@Override
public Observable<String> call(PersonObj personObj) {
//将接收到的数据项转换为一个Observable,返回给下一个操作符
return personObj.getAddressObservable();
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("s = [" + s + "]");
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println("throwable = [" + throwable.getMessage() + "]");
}
});
2.flatMap(Func1<? super T, ? extends Observable<? extends U>> collectionSelector,Func2<? super T, ? super U, ? extends R> resultSelector)
Observable.from(getDataList())
.flatMap(new Func1<PersonObj, Observable<String>>() {
@Override
public Observable<String> call(PersonObj personObj) {
return personObj.getAddressObservable();
}
},
//接收到的数据项的类型、Fuc1处理后的Observable里的数据类型、要发射给下一个操作的数据类型
new Func2<PersonObj, String, Integer>() {
@Override
public Integer call(PersonObj personObj, String str) {
//personObj,是flatMap接收到的数据项,str是Func1处理后的Observable里发射的数据项
System.out.println("personObj = [" + personObj + "], s = [" + str + "]");
return personObj.getAge();
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer s) {
System.out.println("s = [" + s + "]");
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println("throwable = [" + throwable.getMessage() + "]");
}
});
flatMap将接收到数据项进行映射转换时,任何一个数据项发生异常时,这个Observable将立刻被放弃,并调用观察者的onError方法。
- concatMap —— flatMap()可能交错的发送事件,最终结果的顺序可能并是不原始Observable发送时的顺序。为了防止交错的发生,可以使用与之类似的concatMap()操作符。
- groupBy —— 将原始Observable按照指定的key进行分拆。它们中的每一个发射原始Observable数据序列的一个子序列。哪个数据项由哪一个Observable发射是由一个函数判定的,这个函数给每一组指定一个Key,Key相同的数据会被同一个Observable发射。
1.groupBy(Func1<? super T, ? extends K> keySelector)
Observable.from(getBookDataList())
.groupBy(new Func1<BookObj, Integer>() {
@Override
public Integer call(BookObj bookObj) {
return bookObj.getType();
}
})
.subscribe(new Action1<GroupedObservable<Integer, BookObj>>() {
@Override
public void call(GroupedObservable<Integer, BookObj> groupedObservable) {
System.out.println("groupedObservable = [" + groupedObservable + "]");
Integer key = groupedObservable.getKey();
if (key == BOOK_TYPE_CIRCLR) {
setupCircleBooks(groupedObservable);
} else {
setupTimeBooks(groupedObservable);
}
}
});
2.groupBy(Func1<? super T, ? extends K> keySelector,Func1<? super T, ? extends R> elementSelector)
Observable.from(getBookDataList())
.groupBy(new Func1<BookObj, Integer>() {
@Override
public Integer call(BookObj bookObj) {
return bookObj.getType();//分组的依据
}
},
new Func1<BookObj, String>() {//类似于map的一个变换
@Override
public String call(BookObj bookObj) {
return bookObj.getName();//在这里可以对接收到的数据项进行处理。
}
})
.subscribe(new Action1<GroupedObservable<Integer, String>>() {
@Override
public void call(GroupedObservable<Integer, String> groupedObservable) {
//....
}
});
注意:groupBy将原始Observable分解为一个发射多个GroupedObservable的Observable,一旦有订阅,每个GroupedObservable就开始缓存数据。因此,如果你忽略这些GroupedObservable中的任何一个,这个缓存可能形成一个潜在的内存泄露。因此,如果你不想观察,也不要忽略GroupedObservable。你应该使用像take(0)这样会丢弃自己的缓存的操作符。
结合操作符
startWith、merge、concatWith、zip、join
- startWith 在发射数据之前先发射一个指定的数据序列
observableA.startWith(observableB):
- concatWith 发射的数据末尾追加一个数据序列
observableA.concatWith(observableB):
- merge 将多个Observables的输出合并,就好像它们是一个单个的Observable一样
observableA.merge(observableB):
- zipWith 使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果。
1.zipWith(Iterable<? extends T2> other,Func2<? super T, ? super T2, ? extends R> zipFunction)
2.zipWith(Observable<? extends T2> other,Func2<? super T, ? super T2, ? extends R> zipFunction)
Observable.from(getDataList())//ObservableA
.zipWith(
Observable.from(getBookDataList()),//ObservableB
//ObservableA发射的数据项类型,ObservableB发射的数据项的类型、返回给下一操作符的数据类型
new Func2<PersonObj, BookObj, PersonObj>() {
@Override
public PersonObj call(PersonObj personObj, BookObj bookObj) {
//对ObservableA和ObservableB发射的数据项进行处理
personObj.setBookObj(bookObj);
return personObj;//给下一个操作符处理的数据项
}
})
.subscribe(new Action1<PersonObj>() {
@Override
public void call(PersonObj personObj) {
System.out.println("s = [" + personObj + "]");
}
});
其他类别的操作符:
过滤 filter、 debounce
条件 contains、all
连接 connect、publish、refCount、replay
转换 toList、toMap
阻塞 forEach、first
字符串操作 byLine/decode/encode/from/join/split/stringConcat
算术操作 average/concat/count/max/min/sum/reduce
错误处理 catch/retry/retryWhen
Subject
class Subject<T, R> extends Observable<R> implements Observer<T>
Represents an object that is both an Observable and an Observer.
AsyncSubject、BehaviorSubject、PublishSubject、ReplaySubject
- AsyncSubject 数据源观察者* 发射完成之后 *,仅向它的每个订阅者发射最后一个数据项。
AsyncSubject<Object> asyncSubject = AsyncSubject.create();
asyncSubject.subscribe(//这个订阅方法也有多个重载方法
new Action1<Object>() {
@Override
public void call(Object o) {
System.out.println("o = [" + o + "]");//只打印一个`时光`
}
},
throwable -> {
System.out.println(throwable.getMessage());
});
asyncSubject.onNext(12);
Model model = new Model();
model.age = 18;
model.name = "江一燕";
asyncSubject.onNext(model);
asyncSubject.onNext("时光");
asyncSubject.onCompleted();//completed后,订阅者会接收到最后一个发射的数据
- BehaviorSubject 订阅者可以接收到它订阅时
BehaviorSubject
发射的最近的一个和订阅后BehaviorSubject
发射的所有数据项。
// observer will receive all events.
BehaviorSubject<Object> subject = BehaviorSubject.create("default");
subject.subscribe(observer);
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
// observer will receive the "one", "two" and "three" events, but not "zero"
BehaviorSubject<Object> subject = BehaviorSubject.create("default");
subject.onNext("zero");
subject.onNext("one");
subject.subscribe(observer);
subject.onNext("two");
subject.onNext("three");
// observer will receive only onCompleted
BehaviorSubject<Object> subject = BehaviorSubject.create("default");
subject.onNext("zero");
subject.onNext("one");
subject.onCompleted();
subject.subscribe(observer);
// observer will receive only onError
BehaviorSubject<Object> subject = BehaviorSubject.create("default");
subject.onNext("zero");
subject.onNext("one");
subject.onError(new RuntimeException("error"));
subject.subscribe(observer);
- PublishSubject 从那里订阅就从那里开始接收数据。
PublishSubject<Object> subject = PublishSubject.create();
// observer1 will receive all onNext and onCompleted events
subject.subscribe(observer1);
subject.onNext("one");
subject.onNext("two");
// observer2 will only receive "three" and onCompleted
subject.subscribe(observer2);
subject.onNext("three");
subject.onCompleted();
- ReplaySubject 无论何时订阅,都会接收到
PublishSubject
发射过的所有数据。
ReplaySubject<Object> subject = ReplaySubject.create();
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.onCompleted();
// both of the following will get the onNext/onCompleted calls from above
subject.subscribe(observer1);
subject.subscribe(observer2);
ReplaySubject<Object> replaySubject = ReplaySubject.create();
replaySubject.onNext("one");
replaySubject.onNext("two");
replaySubject.onNext("three");
replaySubject.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
System.out.println("o = [" + o + "]");
}
});
// replaySubject.onCompleted();//注意这个的位置
replaySubject.onNext("four");
replaySubject.subscribe(observer1);
replaySubject.subscribe(observer2);
调度器 Scheduler
Schedulers.computation(?) 用于计算任务,如事件循环或和回调处理,不要用于IO操作,IO操作请使用Schedulers.io());默认线程数等于处理器的数量 。
Schedulers.from(executor) 使用指定的Executor作为调度器。
Schedulers.immediate(?) 在当前线程立即开始执行任务。
Schedulers.io(?) 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用 Schedulers.computation();Schedulers.io(?)默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器。
Schedulers.newThread(?) 为每个任务创建一个新线程。
Schedulers.trampoline(?) 当其它排队的任务完成后,在当前线程排队开始执行 。
除了将这些调度器传递给RxJava的Observable操作符,也可以用它们调度你自己的任务。
- 调度自己的任务
下面的示例展示了Scheduler.Worker的用法:
worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
@Override
public void call() {
yourWork();
}
});
// some time later...
worker.unsubscribe();
- 递归调度器
要调度递归的方法调用,可以使用schedule,然后再用schedule(this),示例:
worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
@Override
public void call() {
yourWork();
// recurse until unsubscribed (schedule will do nothing if unsubscribed)
worker.schedule(this);
}
});
// some time later...
worker.unsubscribe();
- 检查或设置取消订阅状态
Worker类的对象实现了Subscription接口,使用它的isUnsubscribed和unsubscribe方法,所以你可以在订阅取消时停止任务,或者从正在调度的任务内部取消订阅,示例:
Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {
@Override
public void call() {
while(!worker.isUnsubscribed()) {
status = yourWork();
if(QUIT == status) { worker.unsubscribe(); }
}
}
});
Worker同时是Subscription,因此你可以(通常也应该)调用它的unsubscribe方法通知可以挂起任务和释放资源了。
- 延时和周期调度器
你可以使用schedule(action,delayTime,timeUnit)在指定的调度器上延时执行你的任务,下面例子中的任务将在500毫秒之后开始执行:
someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);
使用另一个版本的schedule,schedulePeriodically(action,initialDelay,period,timeUnit)方法让你可以安排一个定期执行的任务,下面例子的任务将在500毫秒之后执行,然后每250毫秒执行一次:
someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);
- 测试调度器
TestScheduler让你可以对调度器的时钟表现进行手动微调。这对依赖精确时间安排的任务的测试很有用处。这个调度器有三个额外的方法:
advanceTimeTo(time,unit) 向前波动调度器的时钟到一个指定的时间点
advanceTimeBy(time,unit) 将调度器的时钟向前拨动一个指定的时间段
triggerActions(?) 开始执行任何计划中的但是未启动的任务,如果它们的计划时间等于或者早于调度器时钟的当前时间
线程变换
- 在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
让UI线程与工作线程间的跳转变得简单
NOTE:
subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。
一个事件流应该只有一个subscribeOn()起作用;可以有多个observeOn().
compose: 对 Observable 整体的变换。 compose() 是针对 Observable 自身进行变换。举个例子,假设在程序中有多个 Observable ,并且他们都需要应用一组相同的 lift() 变换。
- doOnSubscribe()
它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
Backpressure
生产者-消费者(producer-consumer)模式中,一定要小心Backpressure(背压,反压)的出现。一个宽泛的解释就是:事件产生的速度比消费快。一旦发生overproducing,当你的链式结构不能承受数据压力的时候,就会抛出MissingBackpressureException异常。 在Android中最容易出现的Backpressure就是连续快速点击跳转界面、数据库查询、键盘输入,甚至联网等操作都有可能造成Backpressure,可能有些情况并不会导致程序崩溃,但是会造成一些我们不想见到的小麻烦。
https://www.gitbook.com/book/mcxiaoke/rxdocs/details
https://github.com/ReactiveX/RxJava/wiki