一、创建操作:
1、观察者模式:
RxJava的世界里,我们有四种角色:
Observable<T>(被观察者)、Observer(观察者)
Subscriber(订阅者)、Subject
Observable和Subject是两个“生产”实体,Observer和Subscriber是两个“消费”实体。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
2、回调方法:
Subscribe方法用于将观察者连接到Observable,你的观察者需要实现以下方法:
- onNext(T item)
Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现。 - onError(Exception ex)
当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。 - onComplete
正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。
3、添加依赖:
compile ‘io.reactivex:rxandroid:1.1.0‘
compile ‘io.reactivex:rxjava:1.1.0‘
4、create():
Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("RxJava"); subscriber.onCompleted(); }}).subscribe(new Observer<String>() { @Override public void onCompleted() { Log.i(TAG, "onCompleted"); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.i(TAG, "onNext=" + s); }});
Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的,只是多了onStart()方法,作为异步调用之前的操作:
.subscribe(new Subscriber() { @Override public void onStart() { Log.i(TAG, "onStart"); } @Override public void onCompleted() { Log.i(TAG, "onCompleted"); } @Override public void onError(Throwable e) { } @Override public void onNext(Object o) { Log.i(TAG, "onNext=" + o); }}); *****************************************************源码解析订阅**************************************
public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this);} private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { // validate and proceed if (subscriber == null) { throw new IllegalArgumentException("observer can not be null"); } if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); /* * the subscribe function can also be overridden but generally that‘s not the appropriate approach * so I won‘t mention that in the exception */ } // new Subscriber so onStart it subscriber.onStart(); /* * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */ // if not already wrapped if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber<T>(subscriber); } // The code below is exactly the same an unsafeSubscribe but not used because it would // add a significant depth to already huge call stacks. try { // allow the hook to intercept and/or decorate hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(hook.onSubscribeError(e));//捕获异常并回调onError() } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. hook.onSubscribeError(r); // TODO why aren‘t we throwing the hook‘s return value. throw r; } return Subscriptions.unsubscribed(); }}
*****************************************************源码解析订阅**************************************
5、from():
String[] arrays={"Hello","RxJava"};Observable.from(arrays) .subscribe(new Subscriber() { @Override public void onStart() { Log.i(TAG, "onStart"); } @Override public void onCompleted() { Log.i(TAG, "onCompleted"); } @Override public void onError(Throwable e) { } @Override public void onNext(Object o) { Log.i(TAG, "onNext=" + o); } });
*****************************************************源码解析订阅**************************************
public final class{ ... //此处create()就不多做解释了,...call()... public final static <T> Observable<T> from(Iterable<? extends T> iterable) { return create(new OnSubscribeFromIterable<T>(iterable)); } ...}
public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> { final Iterable<? extends T> is; public OnSubscribeFromIterable(Iterable<? extends T> iterable) { if (iterable == null) { throw new NullPointerException("iterable must not be null"); } this.is = iterable; } @Override public void call(final Subscriber<? super T> o) { final Iterator<? extends T> it = is.iterator(); if (!it.hasNext() && !o.isUnsubscribed()) o.onCompleted(); else o.setProducer(new IterableProducer<T>(o, it));//未执行完继续迭代 } private static final class IterableProducer<T> extends AtomicLong implements Producer { /** */ private static final long serialVersionUID = -8730475647105475802L; private final Subscriber<? super T> o; private final Iterator<? extends T> it; private IterableProducer(Subscriber<? super T> o, Iterator<? extends T> it) { this.o = o; this.it = it; } @Override public void request(long n) { if (get() == Long.MAX_VALUE) { // already started with fast-path return; } if (n == Long.MAX_VALUE && compareAndSet(0, Long.MAX_VALUE)) { fastpath(); } else if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) { slowpath(n); } } void slowpath(long n) { // backpressure is requested final Subscriber<? super T> o = this.o; final Iterator<? extends T> it = this.it; long r = n; while (true) { /* * This complicated logic is done to avoid touching the * volatile `requested` value during the loop itself. If * it is touched during the loop the performance is * impacted significantly. */ long numToEmit = r; while (true) { if (o.isUnsubscribed()) { return; } else if (it.hasNext()) { if (--numToEmit >= 0) { o.onNext(it.next()); } else break; } else if (!o.isUnsubscribed()) { o.onCompleted(); return; } else { // is unsubscribed return; } } r = addAndGet(-r); if (r == 0L) { // we‘re done emitting the number requested so // return return; } } } void fastpath() { // fast-path without backpressure final Subscriber<? super T> o = this.o; final Iterator<? extends T> it = this.it; while (true) { if (o.isUnsubscribed()) { return; } else if (it.hasNext()) { o.onNext(it.next()); } else if (!o.isUnsubscribed()) { o.onCompleted(); return; } else { // is unsubscribed return; } } } }}
*****************************************************源码解析订阅**************************************
6、just():
Observable.just("Hello","RxJava")
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.i("wxl", "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i("wxl", "onNext=" + s);
}
});
*****************************************************源码解析订阅**************************************
public final static <T> Observable<T> just(T t1, T t2) { //然而源码解析完之后你会觉得 同上from() return from(Arrays.asList(t1, t2));}
*****************************************************源码解析订阅**************************************
二、变换操作:(重点:当然还是要先创建被观察者)
1、Map(): (多输入,单输出的概念,用代理模式去理解map()方法执行过程,简单说就是Observable和OnSubscribe被新的取代了)
Observable.just("Hello", "RxJava")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s.toUpperCase();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.i("wxl", "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i("wxl", "onNext=" + s);
}
});
*****************************************************源码解析订阅**************************************
public class Observable<T> { ... //订阅,跟上面一样 private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { // validate and proceed if (subscriber == null) { throw new IllegalArgumentException("observer can not be null"); } if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); /* * the subscribe function can also be overridden but generally that‘s not the appropriate approach * so I won‘t mention that in the exception */ } // new Subscriber so onStart it subscriber.onStart(); /* * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */ // if not already wrapped if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber<T>(subscriber); } // The code below is exactly the same an unsafeSubscribe but not used because it would // add a significant depth to already huge call stacks. try { // allow the hook to intercept and/or decorate hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);//发射OnSubscrible中的call();注意,此时已替换了,用代理思维去理解。 return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(hook.onSubscribeError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. hook.onSubscribeError(r); // TODO why aren‘t we throwing the hook‘s return value. throw r; } return Subscriptions.unsubscribed(); } } public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribe<R>() { //重新创建了新的Observable和OnSubscribe @Override public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = hook.onLift(operator).call(o);//回调替换的部分逻辑 try { // new Subscriber created and being subscribed with so ‘onStart‘ it st.onStart(); onSubscribe.call(st);//发射新的 } catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); // if the lift function failed all we can do is pass the error to the final Subscriber // as we don‘t have the operator available to us o.onError(e); } } }); } ...}
*****************************************************源码解析订阅**************************************
2、flatMap():
同上差不多,可以看做是扁平化的一种map(二次转换)
比较flatMap与map:
假设:如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程。)首先可以这样实现:Student[] students = ...;Subscriber<Student> subscriber = new Subscriber<Student>() { @Override public void onNext(Student student) { List<Course> courses = student.getCourses(); for (int i = 0; i < courses.size(); i++) { Course course = courses.get(i); Log.d(tag, course.getName()); } } ...};Observable.from(students) .subscribe(subscriber);而flatMap:Student[] students = ...;Subscriber<Course> subscriber = new Subscriber<Course>() { @Override public void onNext(Course course) { Log.d(tag, course.getName()); } ...};Observable.from(students) .flatMap(new Func1<Student, Observable<Course>>() { @Override public Observable<Course> call(Student student) { return Observable.from(student.getCourses()); } }) .subscribe(subscriber); 3、Filter():
Observable.just(4, 2, 1, 7, 5) .filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer > 3; } }) .subscribe(new Observer<Integer>() { @Override public void onCompleted() { Log.i("wxl", "onCompleted"); } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { Log.i("wxl", "onNext=" + integer); } });
***************************************************源码解析****************************************************************
public final Observable<T> filter(Func1<? super T, Boolean> predicate) { return lift(new OperatorFilter<T>(predicate));}
public final class OperatorFilter<T> implements Operator<T, T> { private final Func1<? super T, Boolean> predicate; public OperatorFilter(Func1<? super T, Boolean> predicate) { this.predicate = predicate; } @Override public Subscriber<? super T> call(final Subscriber<? super T> child) { return new Subscriber<T>(child) { @Override public void onCompleted() { child.onCompleted(); } @Override public void onError(Throwable e) { child.onError(e); } @Override public void onNext(T t) { try { if (predicate.call(t)) {//判断 child.onNext(t); } else { // TODO consider a more complicated version that batches these request(1); } } catch (Throwable e) { Exceptions.throwOrReport(e, child, t); } } }; } }
***************************************************源码解析****************************************************************
4、线程调度:
.subscribeOn(Schedulers.io()):
***************************************************源码解析****************************************************************
public final Observable<T> subscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return nest().lift(new OperatorSubscribeOn<T>(scheduler));}
public final class Schedulers { // private final Scheduler computationScheduler; private final Scheduler ioScheduler; private final Scheduler newThreadScheduler; private static final Schedulers INSTANCE = new Schedulers(); ....}
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> { private final Scheduler scheduler; public OperatorSubscribeOn(Scheduler scheduler) { this.scheduler = scheduler; } @Override public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); return new Subscriber<Observable<T>>(subscriber) { @Override public void onCompleted() { // ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext } @Override public void onError(Throwable e) { subscriber.onError(e); } @Override public void onNext(final Observable<T> o) { inner.schedule(new Action0() {//在选择的线程里做处理 @Override public void call() { final Thread t = Thread.currentThread(); o.unsafeSubscribe(new Subscriber<T>(subscriber) { @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } @Override public void onNext(T t) { subscriber.onNext(t); } @Override public void setProducer(final Producer producer) { subscriber.setProducer(new Producer() { @Override public void request(final long n) { if (Thread.currentThread() == t) { // don‘t schedule if we‘re already on the thread (primarily for first setProducer call) // see unit test ‘testSetProducerSynchronousRequest‘ for more context on this producer.request(n); } else { inner.schedule(new Action0() { @Override public void call() { producer.request(n); } }); } } }); } }); } }); } }; }}
***************************************************源码解析****************************************************************
.observeOn(AndroidSchedulers.mainThread()):参考OperatorObserveOn中源码; 基本思路:最好用代理思维去理解
(被观察者)Observable<T>:(订阅者)OnSubscribe<T> extends Action1<T> extends Action extends Function
——> 订阅 subscribe(Observer<T>) 回调处理
(观察者)Observer<T>:Subscriber<T> implements Observer<T>
应用场景还没有,希望多交流多指正!