浅谈RxJava源码解析(观察者),创建(create、from、just),变换(Map、flatMap)、线程调度

一、创建操作:

1、观察者模式:
RxJava的世界里,我们有四种角色:

    Observable<T>(被观察者)、Observer(观察者)
    Subscriber(订阅者)、Subject
    Observable和Subject是两个“生产”实体,Observer和Subscriber是两个“消费”实体。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

2、回调方法:
Subscribe方法用于将观察者连接到Observable,你的观察者需要实现以下方法:

    • onNext(T item)
      Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现。
    • onError(Exception ex)
      当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。
    • onComplete
      正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。

3、添加依赖:

  compile ‘io.reactivex:rxandroid:1.1.0‘

  compile ‘io.reactivex:rxjava:1.1.0‘

4、create():

Observable.create(new Observable.OnSubscribe<String>() {
    @Override    public void call(Subscriber<? super String> subscriber) {        subscriber.onNext("Hello");        subscriber.onNext("RxJava");        subscriber.onCompleted();    }}).subscribe(new Observer<String>() {    @Override    public void onCompleted() {        Log.i(TAG, "onCompleted");    }    @Override    public void onError(Throwable e) {    }    @Override    public void onNext(String s) {        Log.i(TAG, "onNext=" + s);    }});

Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的,只是多了onStart()方法,作为异步调用之前的操作:

.subscribe(new Subscriber() {    @Override    public void onStart() {        Log.i(TAG, "onStart");    }

@Override    public void onCompleted() {        Log.i(TAG, "onCompleted");    }    @Override    public void onError(Throwable e) {    }    @Override    public void onNext(Object o) {        Log.i(TAG, "onNext=" + o);    }});

*****************************************************源码解析订阅**************************************
public final Subscription subscribe(Subscriber<? super T> subscriber) {    return Observable.subscribe(subscriber, this);}

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { // validate and proceed    if (subscriber == null) {        throw new IllegalArgumentException("observer can not be null");    }    if (observable.onSubscribe == null) {        throw new IllegalStateException("onSubscribe function can not be null.");        /*         * the subscribe function can also be overridden but generally that‘s not the appropriate approach         * so I won‘t mention that in the exception         */    }

// new Subscriber so onStart it    subscriber.onStart();

/*     * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls     * to user code from within an Observer"     */    // if not already wrapped    if (!(subscriber instanceof SafeSubscriber)) {        // assign to `observer` so we return the protected version        subscriber = new SafeSubscriber<T>(subscriber);    }

// The code below is exactly the same an unsafeSubscribe but not used because it would     // add a significant depth to already huge call stacks.    try {        // allow the hook to intercept and/or decorate        hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);        return hook.onSubscribeReturn(subscriber);    } catch (Throwable e) {        // special handling for certain Throwable/Error/Exception types        Exceptions.throwIfFatal(e);        // if an unhandled error occurs executing the onSubscribe we will propagate it        try {            subscriber.onError(hook.onSubscribeError(e));//捕获异常并回调onError()        } catch (Throwable e2) {            Exceptions.throwIfFatal(e2);            // if this happens it means the onError itself failed (perhaps an invalid function implementation)            // so we are unable to propagate the error correctly and will just throw            RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);            // TODO could the hook be the cause of the error in the on error handling.            hook.onSubscribeError(r);            // TODO why aren‘t we throwing the hook‘s return value.            throw r;        }        return Subscriptions.unsubscribed();    }}
*****************************************************源码解析订阅**************************************

5、from():

String[] arrays={"Hello","RxJava"};Observable.from(arrays)        .subscribe(new Subscriber() {            @Override            public void onStart() {                Log.i(TAG, "onStart");            }

@Override            public void onCompleted() {                Log.i(TAG, "onCompleted");            }            @Override            public void onError(Throwable e) {            }            @Override            public void onNext(Object o) {                Log.i(TAG, "onNext=" + o);            }        });
*****************************************************源码解析订阅**************************************
public final class{  ...  //此处create()就不多做解释了,...call()...  public final static <T> Observable<T> from(Iterable<? extends T> iterable) {             return create(new OnSubscribeFromIterable<T>(iterable));       }  ...}
public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {    final Iterable<? extends T> is;    public OnSubscribeFromIterable(Iterable<? extends T> iterable) {        if (iterable == null) {            throw new NullPointerException("iterable must not be null");        }        this.is = iterable;    }

@Override    public void call(final Subscriber<? super T> o) {        final Iterator<? extends T> it = is.iterator();        if (!it.hasNext() && !o.isUnsubscribed())            o.onCompleted();        else            o.setProducer(new IterableProducer<T>(o, it));//未执行完继续迭代    }    private static final class IterableProducer<T> extends AtomicLong implements Producer {        /** */        private static final long serialVersionUID = -8730475647105475802L;        private final Subscriber<? super T> o;        private final Iterator<? extends T> it;

private IterableProducer(Subscriber<? super T> o, Iterator<? extends T> it) {            this.o = o;            this.it = it;        }

@Override        public void request(long n) {            if (get() == Long.MAX_VALUE) {                // already started with fast-path                return;            }            if (n == Long.MAX_VALUE && compareAndSet(0, Long.MAX_VALUE)) {                fastpath();            } else            if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) {                slowpath(n);            }

}

void slowpath(long n) {            // backpressure is requested            final Subscriber<? super T> o = this.o;            final Iterator<? extends T> it = this.it;

long r = n;            while (true) {            /*             * This complicated logic is done to avoid touching the             * volatile `requested` value during the loop itself. If             * it is touched during the loop the performance is             * impacted significantly.             */                long numToEmit = r;                while (true) {                    if (o.isUnsubscribed()) {                        return;                    } else if (it.hasNext()) {                        if (--numToEmit >= 0) {                            o.onNext(it.next());                        } else                            break;                    } else if (!o.isUnsubscribed()) {                        o.onCompleted();                        return;                    } else {                        // is unsubscribed                        return;                    }                }                r = addAndGet(-r);                if (r == 0L) {                    // we‘re done emitting the number requested so                    // return                    return;                }

}        }

void fastpath() {            // fast-path without backpressure            final Subscriber<? super T> o = this.o;            final Iterator<? extends T> it = this.it;

while (true) {                if (o.isUnsubscribed()) {                    return;                } else if (it.hasNext()) {                    o.onNext(it.next());                } else if (!o.isUnsubscribed()) {                    o.onCompleted();                    return;                } else {                    // is unsubscribed                    return;                }            }        }    }}
*****************************************************源码解析订阅**************************************

6、just():

Observable.just("Hello","RxJava")

  .subscribe(new Observer<String>() {

    @Override

    public void onCompleted() {

      Log.i("wxl", "onCompleted");

    }

    @Override

    public void onError(Throwable e) {

    }

    @Override

    public void onNext(String s) {

      Log.i("wxl", "onNext=" + s);

    }

  });

*****************************************************源码解析订阅**************************************
public final static <T> Observable<T> just(T t1, T t2) {   //然而源码解析完之后你会觉得 同上from()    return from(Arrays.asList(t1, t2));}
*****************************************************源码解析订阅**************************************

二、变换操作:(重点:当然还是要先创建被观察者)
1、Map(): (多输入,单输出
的概念,用代理模式去理解map()方法执行过程,简单说就是Observable和OnSubscribe被新的取代了)

Observable.just("Hello", "RxJava")

  .map(new Func1<String, String>() {

    @Override

    public String call(String s) {

      return s.toUpperCase();

    }

  }).subscribe(new Subscriber<String>() {

    @Override

    public void onCompleted() {

      Log.i("wxl", "onCompleted");

    }

    @Override

    public void onError(Throwable e) {

    }

    @Override

    public void onNext(String s) {

      Log.i("wxl", "onNext=" + s);

    }

});

*****************************************************源码解析订阅**************************************
public class Observable<T> {    ...  //订阅,跟上面一样    private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {        // validate and proceed        if (subscriber == null) {            throw new IllegalArgumentException("observer can not be null");        }        if (observable.onSubscribe == null) {            throw new IllegalStateException("onSubscribe function can not be null.");        /*         * the subscribe function can also be overridden but generally that‘s not the appropriate approach         * so I won‘t mention that in the exception         */        }

// new Subscriber so onStart it        subscriber.onStart();

/*     * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls     * to user code from within an Observer"     */        // if not already wrapped        if (!(subscriber instanceof SafeSubscriber)) {            // assign to `observer` so we return the protected version            subscriber = new SafeSubscriber<T>(subscriber);        }

// The code below is exactly the same an unsafeSubscribe but not used because it would         // add a significant depth to already huge call stacks.        try {            // allow the hook to intercept and/or decorate            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);//发射OnSubscrible中的call();注意,此时已替换了,用代理思维去理解。            return hook.onSubscribeReturn(subscriber);        } catch (Throwable e) {            // special handling for certain Throwable/Error/Exception types            Exceptions.throwIfFatal(e);            // if an unhandled error occurs executing the onSubscribe we will propagate it            try {                subscriber.onError(hook.onSubscribeError(e));            } catch (Throwable e2) {                Exceptions.throwIfFatal(e2);                // if this happens it means the onError itself failed (perhaps an invalid function implementation)                // so we are unable to propagate the error correctly and will just throw                RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);                // TODO could the hook be the cause of the error in the on error handling.                hook.onSubscribeError(r);                // TODO why aren‘t we throwing the hook‘s return value.                throw r;            }            return Subscriptions.unsubscribed();        }    }

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {        return new Observable<R>(new OnSubscribe<R>() {  //重新创建了新的Observable和OnSubscribe            @Override            public void call(Subscriber<? super R> o) {                try {                    Subscriber<? super T> st = hook.onLift(operator).call(o);//回调替换的部分逻辑                    try {                        // new Subscriber created and being subscribed with so ‘onStart‘ it                        st.onStart();                        onSubscribe.call(st);//发射新的                    } catch (Throwable e) {                        // localized capture of errors rather than it skipping all operators                         // and ending up in the try/catch of the subscribe method which then                        // prevents onErrorResumeNext and other similar approaches to error handling                        Exceptions.throwIfFatal(e);                        st.onError(e);                    }                } catch (Throwable e) {                    Exceptions.throwIfFatal(e);                    // if the lift function failed all we can do is pass the error to the final Subscriber                    // as we don‘t have the operator available to us                    o.onError(e);                }            }        });    }  ...}
*****************************************************源码解析订阅**************************************

2、flatMap():

同上差不多,可以看做是扁平化的一种map(二次转换)

比较flatMap与map:

假设:如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程。)首先可以这样实现:Student[] students = ...;Subscriber<Student> subscriber = new Subscriber<Student>() {  @Override  public void onNext(Student student) {    List<Course> courses = student.getCourses();    for (int i = 0; i < courses.size(); i++) {      Course course = courses.get(i);         Log.d(tag, course.getName());      }  }      ...};Observable.from(students)  .subscribe(subscriber);而flatMap:Student[] students = ...;Subscriber<Course> subscriber = new Subscriber<Course>() {  @Override  public void onNext(Course course) {        Log.d(tag, course.getName());   }        ...};Observable.from(students)  .flatMap(new Func1<Student, Observable<Course>>() {    @Override    public Observable<Course> call(Student student) {          return Observable.from(student.getCourses());       }    })     .subscribe(subscriber);

3、Filter():
Observable.just(4, 2, 1, 7, 5)        .filter(new Func1<Integer, Boolean>() {            @Override            public Boolean call(Integer integer) {                return integer > 3;            }        })        .subscribe(new Observer<Integer>() {            @Override            public void onCompleted() {                Log.i("wxl", "onCompleted");            }            @Override            public void onError(Throwable e) {            }            @Override            public void onNext(Integer integer) {                Log.i("wxl", "onNext=" + integer);            }        });
***************************************************源码解析****************************************************************
public final Observable<T> filter(Func1<? super T, Boolean> predicate) {    return lift(new OperatorFilter<T>(predicate));}
public final class OperatorFilter<T> implements Operator<T, T> {

private final Func1<? super T, Boolean> predicate;

public OperatorFilter(Func1<? super T, Boolean> predicate) {        this.predicate = predicate;    }

@Override    public Subscriber<? super T> call(final Subscriber<? super T> child) {        return new Subscriber<T>(child) {

@Override            public void onCompleted() {                child.onCompleted();            }

@Override            public void onError(Throwable e) {                child.onError(e);            }

@Override            public void onNext(T t) {                try {                    if (predicate.call(t)) {//判断                        child.onNext(t);                    } else {                        // TODO consider a more complicated version that batches these                        request(1);                    }                } catch (Throwable e) {                    Exceptions.throwOrReport(e, child, t);                }            }

};    }

}
***************************************************源码解析****************************************************************
4、线程调度:
.subscribeOn(Schedulers.io()):
***************************************************源码解析****************************************************************
public final Observable<T> subscribeOn(Scheduler scheduler) {    if (this instanceof ScalarSynchronousObservable) {        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);    }    return nest().lift(new OperatorSubscribeOn<T>(scheduler));}
 
public final class Schedulers {  //    private final Scheduler computationScheduler;    private final Scheduler ioScheduler;    private final Scheduler newThreadScheduler;

private static final Schedulers INSTANCE = new Schedulers();   ....}
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {

private final Scheduler scheduler;

public OperatorSubscribeOn(Scheduler scheduler) {        this.scheduler = scheduler;    }

@Override    public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {        final Worker inner = scheduler.createWorker();        subscriber.add(inner);        return new Subscriber<Observable<T>>(subscriber) {

@Override            public void onCompleted() {                // ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext            }

@Override            public void onError(Throwable e) {                subscriber.onError(e);            }

@Override            public void onNext(final Observable<T> o) {                inner.schedule(new Action0() {//在选择的线程里做处理

@Override                    public void call() {                        final Thread t = Thread.currentThread();                        o.unsafeSubscribe(new Subscriber<T>(subscriber) {

@Override                            public void onCompleted() {                                subscriber.onCompleted();                            }

@Override                            public void onError(Throwable e) {                                subscriber.onError(e);                            }

@Override                            public void onNext(T t) {                                subscriber.onNext(t);                            }

@Override                            public void setProducer(final Producer producer) {                                subscriber.setProducer(new Producer() {

@Override                                    public void request(final long n) {                                        if (Thread.currentThread() == t) {                                            // don‘t schedule if we‘re already on the thread (primarily for first setProducer call)                                            // see unit test ‘testSetProducerSynchronousRequest‘ for more context on this                                            producer.request(n);                                        } else {                                            inner.schedule(new Action0() {

@Override                                                public void call() {                                                    producer.request(n);                                                }                                            });                                        }                                    }

});                            }

});                    }                });            }

};    }}
***************************************************源码解析****************************************************************
.observeOn(AndroidSchedulers.mainThread()):参考OperatorObserveOn中源码;

基本思路:最好用代理思维去理解

(被观察者)Observable<T>:(订阅者)OnSubscribe<T> extends Action1<T> extends Action extends Function

——> 订阅 subscribe(Observer<T>) 回调处理

(观察者)Observer<T>:Subscriber<T> implements Observer<T>

 应用场景还没有,希望多交流多指正!
 
时间: 2024-08-10 21:15:26

浅谈RxJava源码解析(观察者),创建(create、from、just),变换(Map、flatMap)、线程调度的相关文章

dubbo源码解析-zookeeper创建节点

前言 在之前dubbo源码解析-本地暴露中的前言部分提到了两道高频的面试题,其中一道dubbo中zookeeper做注册中心,如果注册中心集群都挂掉,那发布者和订阅者还能通信吗?在上周的dubbo源码解析-zookeeper连接中已经讲到,这周解析的是另一道,即服务提供者能实现失效踢出是根据什么原理? 上周就有朋友问到我,为什么我的源码解析总是偏偏要和面试题挂上钩呢?原因很简单 1.dubbo源码这么多,试问你从哪里做为切入点?也就是说该从哪里看起?所以以面试题为切入点,你可以理解为我是在回答"

rxjava源码解析:操作符subscribeOn

1.subscribe流程 先看一个简单的例子: //标记为Observable1 Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hello world!"); subscriber.onCompleted(); } })

rocketmq源码解析之NamesrvController创建

说在前面 本次开始进行rocketmq源码解析,比较喜欢rocketmq的架构设计,rocketmq内嵌了namesrv注册中心保存了元数据,进行负载均衡.容错的一些处理,4.3以上支持消息事务,有管理控制台.命令行工具,底层namesrv与broker.client与server交互netty实现. 源码解析 创建NamesrvController,进入这个方法org.apache.rocketmq.namesrv.NamesrvStartup#main,再进入这个方法org.apache.r

Android进阶:四、RxJava2 源码解析 1

本文适合使用过Rxjava2或者了解Rxjava2的基本用法的同学阅读 一.Rxjava是什么Rxjava在GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的.基于事件的程序的库). 通俗来说,Rxjava是一个采用了观察者模式设计处理异

Android进阶:RxJava2 源码解析 1

本文适合使用过Rxjava2或者了解Rxjava2的基本用法的同学阅读 一.Rxjava是什么 Rxjava在GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的.基于事件的程序的库). 通俗来说,Rxjava是一个采用了观察者模式设计处理

Spring IoC源码解析——Bean的创建和初始化

Spring介绍 Spring(http://spring.io/)是一个轻量级的Java 开发框架,同时也是轻量级的IoC和AOP的容器框架,主要是针对JavaBean的生命周期进行管理的轻量级容器,可以单独使用,也可以和Struts框架,MyBatis框架等组合使用. IoC介绍 IoC是什么 Ioc-Inversion of Control,即"控制反转",不是什么技术,而是一种设计思想.在Java开发中,Ioc意味着将你设计好的对象交给容器控制,而不是传统的在你的对象内部直接控

Fresco源码解析 - 创建一个ImagePipeline(一)

在Fresco源码解析 - 初始化过程分析章节中,我们分析了Fresco的初始化过程,两个initialize方法中都用到了 ImagePipelineFactory类. ImagePipelineFactory.initialize(context);会创建一个所有参数都使用默认值的ImagePipelineConfig来初始化ImagePipeline. ImagePipelineFactory.initialize(imagePipelineConfig)会首先用 imagePipelin

Mybatis源码解析,一步一步从浅入深(六):映射代理类的获取

在文章:Mybatis源码解析,一步一步从浅入深(二):按步骤解析源码中我们提到了两个问题: 1,为什么在以前的代码流程中从来没有addMapper,而这里却有getMapper? 2,UserDao明明是我们定义的一个接口类,根本没有定义实现类,那这个userMapper是什么?是mybatis自动为我们生成的实现类吗? 为了更好的解释着两个问题,我们需要重新认识Configuration这个类. 但是在这之前,你需要了解一个概念(设计模式):JAVA设计模式-动态代理(Proxy)示例及说明

RxJava2 源码解析(二)

转载请标明出处: http://blog.csdn.net/zxt0601/article/details/61637439 本文出自:[张旭童的博客](http://blog.csdn.net/zxt0601) 概述 承接上一篇RxJava2 源码解析(一), 本系列我们的目的: 知道源头(Observable)是如何将数据发送出去的. 知道终点(Observer)是如何接收到数据的. 何时将源头和终点关联起来的 知道线程调度是怎么实现的 知道操作符是怎么实现的 本篇计划讲解一下4,5. Rx