Rxjava分析—Subject

Subject在ReactiveX是作为observer和observerable的一个bridge或者proxy。因为它是一个观察者,所以它可以订阅一个或多个可观察对象,同时因为他是一个可观测对象,所以它可以传递和释放它观测到的数据对象,并且能释放新的对象。

1. Subject的类型

一共有四种为不同用途而设计的Subject,分别为AsyncSubject、BehaviorSubject、PublishSubject和ReplaySubject。

1.1 AsyncSubject

AsyncSubject仅释放Observable释放的最后一个数据,并且仅在Observable完成之后。然而如果当Observable因为异常而终止,AsyncSubject将不会释放任何数据,但是会向Observer传递一个异常通知。

1.2 BehaviorSubject

当Observer订阅了一个BehaviorSubject,它一开始就会释放Observable最近释放的一个数据对象,当还没有任何数据释放时,它则是一个默认值。接下来就会释放Observable释放的所有数据。如果Observable因异常终止,BehaviorSubject将不会向后续的Observer释放数据,但是会向Observer传递一个异常通知。

1.3 PublishSubject

PublishSubject仅会向Observer释放在订阅之后Observable释放的数据。

1.4 ReplaySubject

不管Observer何时订阅ReplaySubject,ReplaySubject会向所有Observer释放Observable释放过的数据。

有不同类型的ReplaySubject,它们是用来限定Replay的范围,例如设定Buffer的具体大小,或者设定具体的时间范围。

如果使用ReplaySubject作为Observer,注意不要在多个线程中调用onNext、onComplete和onError方法,因为这会导致顺序错乱,这个是违反了Observer规则的。

2. RxJava的Subject源码分析

2.1 Subject

Subject表示一个同时是Observable和Observer的对象。类Subject的代码如下:

package rx.subjects;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;

/**
 * Represents an object that is both an Observable and an Observer.
 */
public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {
    protected Subject(OnSubscribe<R> onSubscribe) {
        super(onSubscribe);
    }

    public abstract boolean hasObservers();

    public final SerializedSubject<T, R> toSerialized() {
        return new SerializedSubject<T, R>(this);
    }
}

2.2 BehaviorSubject

Subject有四个主要的子类,分别为AsyncSubject、BehaviorSubject、PublishSubject和ReplaySubject。接下来将以BehaviorSubject为例进行源码分析。

2.2.1 BehaviorSubject订阅subscribe过程

在需要使用subject时,调用Subject的subscribe(..)方法,该方法实际会调用下面这个subscribe(Subscriber<? super T> subscriber)方法,所以其他的subscribe方法都要将输入参数转化为一个Subscriber对象。

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        ...
        // new Subscriber so onStart it
        subscriber.onStart();

        ...

        // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(this, onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(hook.onSubscribeError(e));
            } catch (OnErrorNotImplementedException e2) {
                // special handling when onError is not implemented ... we just rethrow
                throw e2;
            } catch (Throwable e2) {
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                hook.onSubscribeError(r);
                // TODO why aren‘t we throwing the hook‘s return value.
                throw r;
            }
            return Subscriptions.unsubscribed();
        }
    }

方法中hook.onSubsribeStart(this, onSubscribe).call(subscriber)默认情况下等价于onSubscribe.call(subscriber)。onSubscriber是什么呢?这个就需要了解BehaviorSubject的构造方法

protected BehaviorSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> state) {
    super(onSubscribe);
    this.state = state;
}

其中调用了父类Subject的构造方法

protected Subject(OnSubscribe<R> onSubscribe) {
    super(onSubscribe);
}

其中调用了父类Observer的构造方法

protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
}

obSubscribe即是BehaviorSubject构造方法中传入的第一个参数。

BehaviorSubject有3个静态工厂方法用来生产BehaviorSubject对象。

public final class BehaviorSubject<T> extends Subject<T, T> {

    public static <T> BehaviorSubject<T> create() {
        return create(null, false);
    }

    public static <T> BehaviorSubject<T> create(T defaultValue) {
        return create(defaultValue, true);
    }

    private static <T> BehaviorSubject<T> create(T defaultValue, boolean hasDefault) {
        final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
        if (hasDefault) {
            state.set(NotificationLite.instance().next(defaultValue));
        }
        state.onAdded = new Action1<SubjectObserver<T>>() {

            @Override
            public void call(SubjectObserver<T> o) {
                o.emitFirst(state.get(), state.nl);
            }

        };
        state.onTerminated = state.onAdded;
        return new BehaviorSubject<T>(state, state);
    }
    ....
}

前两个Public的静态构造方法实际上调用的是第三个private方法。

最后return new BehaviorSubject<T>(state, state),所以onSubscribe实际为一个SubjectSubscriptionManager的对象,onSubscribe.call(subscriber)实际调用的是SubjectSubscriptionManager的call方法。

/* package */final class SubjectSubscriptionManager<T> implements OnSubscribe<T> {
    ...
    @Override
    public void call(final Subscriber<? super T> child) {
        SubjectObserver<T> bo = new SubjectObserver<T>(child);
        addUnsubscriber(child, bo);
        onStart.call(bo);
        if (!child.isUnsubscribed()) {
            if (add(bo) && child.isUnsubscribed()) {
                remove(bo);
            }
        }
    }
}

1.调用addUnsubscriber方法,注册一个在取消订阅时执行的一个动作,即将该观擦者Observer移除掉。

/** Registers the unsubscribe action for the given subscriber. */
void addUnsubscriber(Subscriber<? super T> child, final SubjectObserver<T> bo) {
    child.add(Subscriptions.create(new Action0() {
        @Override
        public void call() {
            remove(bo);
        }
    }));
} 

2.调用add(SubjectObserver<T> o)方法,将该Observer加入已经注册的Observer[]数组当中。

boolean add(SubjectObserver<T> o) {
    do {
        State oldState = state;
        if (oldState.terminated) {
            onTerminated.call(o);
            return false;
        }
        State newState = oldState.add(o);
        if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
            onAdded.call(o);
            return true;
        }
    } while (true);
}

该方法会调用onAdd.call(o)。BehaviorSubject的onAdd对象如下,state.get()得到的是最近的数据对象,o.emitFirst即会释放最近的数据对象,这正体现了BehaviorSubject的特点。

state.onAdded = new Action1<SubjectObserver<T>>() {

    @Override
    public void call(SubjectObserver<T> o) {
        o.emitFirst(state.get(), state.nl);
    }

};

在这个过程中使用了SubjectSubscriptionManager的两个内部类。

1. State<T>

该类用来管理已经注册的Observer数组,以及他们的状态。

/** State-machine representing the termination state and active SubjectObservers. */
protected static final class State<T> {
    final boolean terminated;
    final SubjectObserver[] observers;
    static final SubjectObserver[] NO_OBSERVERS = new SubjectObserver[0];
    static final State TERMINATED = new State(true, NO_OBSERVERS);
    static final State EMPTY = new State(false, NO_OBSERVERS);

    public State(boolean terminated, SubjectObserver[] observers) {
        this.terminated = terminated;
        this.observers = observers;
    }
    public State add(SubjectObserver o) {
        ...
    }
    public State remove(SubjectObserver o) {
        ...
    }
}

2.SubjectObserver<T>

该类时Observer的一个装饰类,运用了装饰模式给Observer类添加新的功能。

以上就是Subject对象订阅Observer时的流程。

2.2.2 BehaviorSubject的onNext

Behavior的onNext(T v)方法如下

@Override
public void onNext(T v) {
    Object last = state.get();
    if (last == null || state.active) {
        Object n = nl.next(v);
        for (SubjectObserver<T> bo : state.next(n)) {
            bo.emitNext(n, state.nl);
        }
    }
}

state是SubjectSubscriptionManager类的对象,是这个对象来维护最近释放的数据对象,state.get()获取最近释放的数据对象,state.next(Object n)方法重新设置最近释放的数据对象,并返回已经注册的Observer数组。

SubjectObserver<T>[] next(Object n) {
    set(n);
    return state.observers;
}

bo.emitNext(Object n, final NotificationLite<T> nl)释放给定的数据对象。

2.2.3 BehaviorSubject的onCompleted和onError

onCompleted和onError会调用SubjectSubscriptionManager的terminate(Object n)方法,该方法会重新设置最近释放的数据对象,设置Subject状态为TERMINATED,表示终结了,最后返回已注册的Observer数组。

SubjectObserver<T>[] terminate(Object n) {
    set(n);
    active = false;

    State<T> oldState = state;
    if (oldState.terminated) {
        return State.NO_OBSERVERS;
    }
    return STATE_UPDATER.getAndSet(this, State.TERMINATED).observers;
}

版权声明:本文为博主原创文章, 转载请标明出处 http://blog.csdn.net/sun927

时间: 2024-08-27 17:46:24

Rxjava分析—Subject的相关文章

Rxjava Subject分析

Subject = Observable + Observer 看看官方的描述: Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色.因为它是一个Observer,它可以订阅一个或多个Observable:又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据. 由于一个Observable订阅一个Observable,它可以触发这个Observable开始

打造RxJava生命周期管理框架RxLife

在前边RxJava实战技巧大全一文中,我们介绍了RxJava使用过程中常见的应用场景和可能遇到的问题,其中我们谈到利用RxLifeCycle来管理RxJava的生命周期,避免内存泄漏问题,今天自己动手打生命周期管理框RxLife来加深对RxJava的认识. 详解Subject 什么是Subject 在RxJava当中,有四个对象值得我们关注:Observable,Subject,Observer,Subscriber,它们之间的关系如下: 对于Observable,Observer,Subscr

Shiro 登录认证源码详解

Shiro 登录认证源码详解 Apache Shiro 是一个强大且灵活的 Java 开源安全框架,拥有登录认证.授权管理.企业级会话管理和加密等功能,相比 Spring Security 来说要更加的简单. 本文主要介绍 Shiro 的登录认证(Authentication)功能,主要从 Shiro 设计的角度去看这个登录认证的过程. 一.Shiro 总览 首先,我们思考整个认证过程的业务逻辑: 获取用户输入的用户名,密码: 从服务器数据源中获取相应的用户名和密码: 判断密码是否匹配,决定是否

RxJava &amp;&amp; Agera 从源码简要分析基本调用流程(2)

版权声明:本文由晋中望原创文章,转载请注明出处: 文章原文链接:https://www.qcloud.com/community/article/124 来源:腾云阁 https://www.qcloud.com/community 接上篇RxJava && Agera 从源码简要分析基本调用流程(1)我们从"1.订阅过程"."2.变换过程"进行分析,下篇文章我们继续分析"3.线程切换过程" 3.线程切换过程 从上文中我们知道了R

RxJava中的doOnSubscribe操作符默认执行线程分析

前言 在有心课堂<RxJava之旅>中有学员留言:map和doOnSubscribe默认调度器是IO调度器,这里说错了吧? 下面我们分析下. 在前面讲 Subscriber 的时候,提到过 Subscriber 的 onStart() 可以用作流程开始前的初始化.然而 onStart() 由于在 subscribe() 发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe() 被调用时的线程.这就导致如果 onStart() 中含有对线程有要求的代码(例如在界面上显示一个 P

RxJava中的doOnSubscribe默认运行线程分析

假设你对RxJava1.x还不是了解,能够參考以下文章. 1. RxJava使用介绍 [视频教程] 2. RxJava操作符 ? Creating Observables(Observable的创建操作符) [视频教程] ? Transforming Observables(Observable的转换操作符) [视频教程] ? Filtering Observables(Observable的过滤操作符) [视频教程] ? Combining Observables(Observable的组合操

RxJava &amp;&amp; Agera 从源码简要分析基本调用流程(1)

版权声明:本文由晋中望原创文章,转载请注明出处: 文章原文链接:https://www.qcloud.com/community/article/123 来源:腾云阁 https://www.qcloud.com/community 相信很多做Android或是Java研发的同学对RxJava应该都早有耳闻了,尤其是在Android开发的圈子里,RxJava渐渐开始广为流行.同样有很多同学已经开始在自己的项目中使用RxJava.它能够帮助我们在处理异步事件时能够省去那些复杂而繁琐的代码,尤其是当

Rxjava 执行阻塞的原因分析 tolist() observable.from()等。

开发中多次碰到了tolist方法阻塞住的问题.一直为了赶进度,避开使用该操作符号. 直到有一天发现flatmap中的 observable.from()也会阻塞.排查原因才发现是  onComplete()方法没有调用的原因. 根据rxjava的链式调用原理,有从下到上一步步传递回调函数,在从上到下逐步执行的过程. 而该过程中有的步骤执行需要等待oncomplete调用. Rxjava 执行阻塞的原因分析 tolist() observable.from()等.

RxJava 常见误区(一):过度使用 Subject

本文首发:http://prototypez.github.io/2016/04/10/rxjava-common-mistakes-1/ 转载请注明出处 准备写这篇文章的时候看了下 RxJava 在 Github 上已经 12000+ 个 star 了,可见火爆程度,自己使用 RxJava 也已经有一小段时间.最初是在社区对 RxJava 一片赞扬之声下,开始使用 RxJava 来代替项目中一些简单异步请求,到后来才开始接触一些高级玩法,这中间阅读别人的代码加上自己踩的坑,慢慢积累了一些经验,