一.扩展的观察者模式
RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。
观察者模式之前写过系列博客,可移步阅读:
http://blog.csdn.net/a910626/article/details/50766019
Rx中的发布订阅模型实现图示:
Observable和Subscriber能完成任何事情,你的Observable可以是一个数据库查询,Subscriber获得查询结果然后将其显示在屏幕上。你的Observable可以是屏幕上的一个点击,Subscriber响应该事件并做处理。你的Observable可以从网络上请求数据,Subscriber将其显示在界面上,这是一个可以处理任何事情的通用框架。
二.RxJava的优势
清晰,简单,更符合人的思维,尤其是跟lambda表达式结合更是如此
函数式风格,方面的创建事件流和数据流
异步错误处理机制:传统的try cach没办法处理异步计算,Rx提供了合适的错误处理机制
轻松使用并发
三.RxJava应用场景
RxBinding节流(防止按钮的重复点击)
轮询
定时操作
RxPermissions
RxBus
RxJava与Retrofit结合处理网络请求
代替监听/观察模型
线程管理,提供线程池,线程切换(Schedulers)
解决嵌套回调(flatMap)
提供延时,Timer处理(interval)
四.如何学习RxJava
响应式编程的主要组成部分是observable、operator和subscriber。一般响应式编程的信息流如下所示:
observable->operator1->operator2->operator3->subscriber
也就是说,observable是事件的生产者,subscriber是事件最终的消费者。
因为subscriber通常在主线程中执行,因此设计上要求其代码尽可能简单,只对事件作出响应(不对事件或数据进行修改),而修改事件的工作全部由operator执行。
observable产生数据,中间通过几个operator,也就是操作符,这个操作符是用来转换数据的。最终他将发给订阅者subscriber。也就是说observable是事件的产生者,subscriber是事件的消费者。
Observable负责产生数据,subscriber负责消费数据。
中间的operator负责数据的转化
多在项目中使用才可以精通。
五.类关系图
1.在RxJava中主要有4个角色:
? Observable
? Subject
? Observer
? Subscriber
Observable和Subject是两个“生产”实体,Observer和Subscriber是两个“消费”实体。说直白点Observable对应于观察者模式中的被观察者,而Observer和Subscriber对应于观察者模式中的观察者。Subscriber其实是一个实现了Observer的抽象类,后面我们分析源码的时候也会介绍到。Subject比较复杂,以后再分析。
2.Rxjava的事件回调方法(其实就对应着观察者的update方法)
与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
? onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的onNext() 发出时,需要触发 onCompleted() 方法作为标志。
? onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
? 在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
3.Action1 Action0
Action extends Function
public interface Action0 extends Action {? void call();?}
public interface Action2<T1, T2> extends Action {? void call(T1 t1, T2 t2);?}
public interface Func0<R> extends Function, Callable<R> {? @Override? R call();?}
function
public interface Function {??}
public interface Func1<T, R> extends Function {? R call(T t);?}
简单解释一下这段代码中出现的 Action1 和 Action0。 Action0 是 RxJava 的一个接口,它只有一个方法 call(),这个方法是无参无返回值的;由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。这样其实也可以看做将 onCompleted() 方法作为参数传进了subscribe(),相当于其他某些语言中的『闭包』。 Action1 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。事实上,虽然 Action0 和 Action1在 API 中使用最广泛,但 RxJava 是提供了多个 ActionX 形式的接口 (例如 Action2, Action3) 的,它们可以被用以包装不同的无返回值的方法。
4.Subscriber vs Observer
subscriber = observer + subcription
Subscriber ,它跟 Observer 接口几乎完全一样,只是多了两个方法
onStart() : 它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。
unsubscribe() : 用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态。 要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。
虽然多了两个方法,但是基本实现方式跟Observer是一样的,所以暂时可以不考虑两者的区别。不过值得注意的是:
实质上,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。
5.线程控制
在RxJava中,Scheduler相当于线程控制器,可以通过它来指定每一段代码运行的线程。
RxJava已经内置了几个Scheduler,下面是总结:
Schedulers.immediate() : 直接在当前线程运行,相当于不指定线程。这是默认的Scheduler。
Schedulers.newThread() : 总是启用新线程,并在新线程执行操作。
Schedulers.io() : I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。行为模式和newThread()差不多,区别在于io()的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()比newThread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。
Schedulers.computation() : 计算所使用的Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个Scheduler使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在computation()中,否则 I/O 操作的等待时间会浪费 CPU。
AndroidSchedulers.mainThread() ,Android专用线程,指定操作在主线程运行。
那我们如何切换线程呢?RxJava中提供了两个方法:subscribeOn() 和 observeOn() ,两者的不同点在于:
subscribeOn() : 指定subscribe()订阅所发生的线程,即 call() 执行的线程。或者叫做事件产生的线程。(指定被观察者执行的线程)
observeOn() : 指定Observer所运行在的线程,即onNext()执行的线程。或者叫做事件消费的线程。(制定观察者执行的线程)
六.来个栗子
Observable<String> myObservable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) {
//sub就是观察者,发送一个字符串
sub.onNext("Hello, world!");
sub.onCompleted();
}
}
);
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) { System.out.println(s); }
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
};
myObservable.subscribe(mySubscriber);
blog.csdn.net/qq122627018/article/details/51355871