Rxjava Subject分析

Subject = Observable + Observer

看看官方的描述:

Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。

由于一个Observable订阅一个Observable,它可以触发这个Observable开始发射数据(如果那个Observable是”冷”的–就是说,它等待有订阅才开始发射数据)。因此有这样的效果,Subject可以把原来那个”冷”的Observable变成”热”的。

通俗的理解就是:

subject是一个神奇的对象,它可以是一个Observable同时也可以是一个Observer:它作为连接这两个世界的一座桥梁。一个Subject可以订阅一个Observable,就像一个观察者,并且它可以发射新的数据,或者传递它接受到的数据,就像一个Observable。很明显,作为一个Observable,观察者们或者其它Subject都可以订阅它。

一旦Subject订阅了Observable,它将会触发Observable开始发射。如果原始的Observable是“冷”的,这将会对订阅一个“热”的Observable变量产生影响。

Subject的种类

针对不同的场景一共有四种类型的Subject。他们并不是在所有的实现中全部都存在,而且一些实现使用其它的命名约定(例如,在RxScala中Subject被称作PublishSubject)。

RxJava提供的四种不同的Subject:

1.PublishSubject

PublishSubject仅会向Observer释放在订阅之后Observable释放的数据。

官方介绍:

PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据(除非你可以阻止它发生),因此这里有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。如果要确保来自原始Observable的所有数据都被分发,你需要这样做:或者使用Create创建那个Observable以便手动给它引入”冷”Observable的行为(当所有观察者都已经订阅时才开始发射数据),或者改用ReplaySubject。

如果原始的Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何数据,只是简单的向前传递这个错误通知。

2.BehaviorSubject

当Observer订阅了一个BehaviorSubject,它一开始就会释放Observable最近释放的一个数据对象,当还没有任何数据释放时,它则是一个默认值。接下来就会释放Observable释放的所有数据。如果Observable因异常终止,BehaviorSubject将不会向后续的Observer释放数据,但是会向Observer传递一个异常通知。

官方介绍:

当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据。

然而,如果原始的Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何数据,只是简单的向前传递这个错误通知。

3.ReplaySubject

不管Observer何时订阅ReplaySubject,ReplaySubject会向所有Observer释放Observable释放过的数据。

有不同类型的ReplaySubject,它们是用来限定Replay的范围,例如设定Buffer的具体大小,或者设定具体的时间范围。

如果使用ReplaySubject作为Observer,注意不要在多个线程中调用onNext、onComplete和onError方法,因为这会导致顺序错乱,这个是违反了Observer规则的。

官方介绍:

ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。也有其它版本的ReplaySubject,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据(原始Observable发射的)。

如果你把ReplaySubject当作一个观察者使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。

4.AsyncSubject

AsyncSubject仅释放Observable释放的最后一个数据,并且仅在Observable完成之后。然而如果当Observable因为异常而终止,AsyncSubject将不会释放任何数据,但是会向Observer传递一个异常通知。

官方介绍:

一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。(如果原始Observable没有发射任何值,AsyncObject也不发射任何值)它会把这最后一个值发射给任何后续的观察者。

然而,如果原始的Observable因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知。

RxJava的对应类

假设你有一个Subject,你想把它传递给其它的代理或者暴露它的Subscriber接口,你可以调用它的asObservable方法,这个方法返回一个Observable。具体使用方法可以参考Javadoc文档。

如果你把 Subject 当作一个 Subscriber 使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。

要避免此类问题,你可以将 Subject 转换为一个 SerializedSubject ,类似于这样:

mySafeSubject = new SerializedSubject( myUnsafeSubject );

通过BehaviorSubject 来制作缓存

代码大致形式:

api.getData().subscribe(behaviorSubject); // 判断cache为空则获取数据,网络数据会被缓存
behaviorSubject.subscribe(observer);// 之前的缓存将直接送达observer

详细代码:

BehaviorSubject<List<Item>> cache;
public Subscription subscribeData(@NonNull Observer<List<Item>> observer) {
         //判断内存缓存是否为空
        if (cache == null) {
            cache = BehaviorSubject.create();
            Observable.create(new Observable.OnSubscribe<List<Item>>() {
                @Override
                public void call(Subscriber< ? super List<Item>> subscriber) {
                    List<Item> items = Database.getInstance().readItems();
                    //判断硬盘缓存是否为空
                    if (items == null) {
                        //从网络读取数据
                        loadFromNetwork();
                    } else {
                        //发送硬盘数据
                        subscriber.onNext(items);
                    }
                }
            }).subscribeOn(Schedulers.io())
              .subscribe(cache);
        }
        return cache.observeOn(AndroidSchedulers.mainThread()).subscribe(observer);
    }

  subscription = subscribeData(new Observer<List<Item>>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {
                        swipeRefreshLayout.setRefreshing(false);
                        Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
                    }

                    @Override
                    public void onNext(List<Item> items) {
                        swipeRefreshLayout.setRefreshing(false);
                        adapter.setItems(items);
                    }
                });

通过PublishSubject实现传统的Observable Hello World

PublishSubject<String> stringPublishSubject = PublishSubject.create();
Subscription subscriptionPrint = stringPublishSubject.subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
        System.out.println("Observable completed");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Oh,no!Something wrong happened!");
    }

    @Override
    public void onNext(String message) {
        System.out.println(message);
    }
});
stringPublishSubject.onNext("Hello World");

在刚才的例子中,我们创建了一个PublishSubject,用create()方法发射一个String值,然后我们订阅了PublishSubject。此时,没有数据要发送,因此我们的观察者只能等待,没有阻塞线程,也没有消耗资源。就在这随时准备从subject接收值,如果subject没有发射值那么我们的观察者就会一直在等待。再次声明的是,无需担心:观察者知道在每个场景中该做什么,我们不用担心什么时候是因为它是响应式的:系统会响应。我们并不关心它什么时候响应。我们只关心它响应时该做什么。

最后一行代码展示了手动发射字符串“Hello World”,它触发了观察者的onNext()方法,让我们在控制台打印出“Hello World”信息。

让我们看一个更复杂的例子。话说我们有一个private声明的Observable,外部不能访问。Observable在它生命周期内发射值,我们不用关心这些值,我们只关心他们的结束。

首先,我们创建一个新的PublishSubject来响应它的onNext()方法,并且外部也可以访问它。

final PublishSubject<Boolean> subject = PublishSubject.create();

subject.subscribe(new Observer<Boolean>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Boolean aBoolean) {
        System.out.println("Observable Completed");
    }
});

然后,我们创建“私有”的Observable,只有subject才可以访问的到。

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i < 5; i++) {
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
}).doOnCompleted(new Action0() {
    @Override
    public void call() {
        subject.onNext(true);
    }
}).subscribe();

Observable.create()方法包含了我们熟悉的for循环,发射数字。doOnCompleted()方法指定当Observable结束时要做什么事情:在subject上发射true。最后,我们订阅了Observable。很明显,空的subscribe()调用仅仅是为了开启Observable,而不用管已发出的任何值,也不用管完成事件或者错误事件。为了这个例子我们需要它像这样。

在这个例子中,我们创建了一个可以连接Observables并且同时可被观测的实体。当我们想为公共资源创建独立、抽象或更易观测的点时,这是极其有用的。


RxJava的Subject源码分析

package rx.subjects;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;

/**
 * Represents an object that is both an Observable and an Observer.
 */
public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {
    protected Subject(OnSubscribe<R> onSubscribe) {
        super(onSubscribe);
    }

    public abstract boolean hasObservers();

    public final SerializedSubject<T, R> toSerialized() {
        return new SerializedSubject<T, R>(this);
    }
}

BehaviorSubject源码分析

BehaviorSubject订阅subscribe过程

在需要使用subject时,调用Subject的subscribe(..)方法,该方法实际会调用下面这个subscribe(Subscriber< ? super T> subscriber)方法,所以其他的subscribe方法都要将输入参数转化为一个Subscriber对象。

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        ...
        // new Subscriber so onStart it
        subscriber.onStart();

        ...

        // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(this, onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(hook.onSubscribeError(e));
            } catch (OnErrorNotImplementedException e2) {
                // special handling when onError is not implemented ... we just rethrow
                throw e2;
            } catch (Throwable e2) {
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                hook.onSubscribeError(r);
                // TODO why aren‘t we throwing the hook‘s return value.
                throw r;
            }
            return Subscriptions.unsubscribed();
        }
    }

方法中hook.onSubsribeStart(this, onSubscribe).call(subscriber)默认情况下等价于onSubscribe.call(subscriber)。onSubscriber是什么呢?这个就需要了解BehaviorSubject的构造方法

protected BehaviorSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> state) {
        super(onSubscribe);
        this.state = state;
    }

其中调用了父类Subject的构造方法

protected Subject(OnSubscribe<R> onSubscribe) {
        super(onSubscribe);
    }

其中调用了父类Observer的构造方法

protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }

onSubscribe即是BehaviorSubject构造方法中传入的第一个参数。

BehaviorSubject有3个静态工厂方法用来生产BehaviorSubject对象。

public final class BehaviorSubject<T> extends Subject<T, T> {

    public static <T> BehaviorSubject<T> create() {
        return create(null, false);
    }

    public static <T> BehaviorSubject<T> create(T defaultValue) {
        return create(defaultValue, true);
    }

    private static <T> BehaviorSubject<T> create(T defaultValue, boolean hasDefault) {
        final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
        if (hasDefault) {
            state.set(NotificationLite.instance().next(defaultValue));
        }
        state.onAdded = new Action1<SubjectObserver<T>>() {

            @Override
            public void call(SubjectObserver<T> o) {
                o.emitFirst(state.get(), state.nl);
            }

        };
        state.onTerminated = state.onAdded;
        return new BehaviorSubject<T>(state, state);
    }
    ....
}

前两个Public的静态构造方法实际上调用的是第三个private方法。

最后return new BehaviorSubject(state, state),所以onSubscribe实际为一个SubjectSubscriptionManager的对象,onSubscribe.call(subscriber)实际调用的是SubjectSubscriptionManager的call方法。

/* package */final class SubjectSubscriptionManager<T> implements OnSubscribe<T> {
    ...
    @Override
    public void call(final Subscriber<? super T> child) {
        SubjectObserver<T> bo = new SubjectObserver<T>(child);
        addUnsubscriber(child, bo);
        onStart.call(bo);
        if (!child.isUnsubscribed()) {
            if (add(bo) && child.isUnsubscribed()) {
                remove(bo);
            }
        }
    }
}
  • 调用addUnsubscriber方法,注册一个在取消订阅时执行的一个动作,即将该观擦者Observer移除掉。
/** Registers the unsubscribe action for the given subscriber. */
    void addUnsubscriber(Subscriber<? super T> child, final SubjectObserver<T> bo) {
        child.add(Subscriptions.create(new Action0() {
            @Override
            public void call() {
                remove(bo);
            }
        }));
    } 
  • 调用add(SubjectObserver< T> o)方法,将该Observer加入已经注册的Observer[]数组当中。
boolean add(SubjectObserver<T> o) {
        do {
            State oldState = state;
            if (oldState.terminated) {
                onTerminated.call(o);
                return false;
            }
            State newState = oldState.add(o);
            if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
                onAdded.call(o);
                return true;
            }
        } while (true);
    }

该方法会调用onAdd.call(o)。BehaviorSubject的onAdd对象如下,state.get()得到的是最近的数据对象,o.emitFirst即会释放最近的数据对象,这正体现了BehaviorSubject的特点。

state.onAdded = new Action1<SubjectObserver<T>>() {

            @Override
            public void call(SubjectObserver<T> o) {
                o.emitFirst(state.get(), state.nl);
            }

        };

在这个过程中使用了SubjectSubscriptionManager的两个内部类。

  1. State< T>

    该类用来管理已经注册的Observer数组,以及他们的状态。

/** State-machine representing the termination state and active SubjectObservers. */
    protected static final class State<T> {
        final boolean terminated;
        final SubjectObserver[] observers;
        static final SubjectObserver[] NO_OBSERVERS = new SubjectObserver[0];
        static final State TERMINATED = new State(true, NO_OBSERVERS);
        static final State EMPTY = new State(false, NO_OBSERVERS);

        public State(boolean terminated, SubjectObserver[] observers) {
            this.terminated = terminated;
            this.observers = observers;
        }
        public State add(SubjectObserver o) {
            ...
        }
        public State remove(SubjectObserver o) {
            ...
        }
    }

2.SubjectObserver

该类时Observer的一个装饰类,运用了装饰模式给Observer类添加新的功能。

以上就是Subject对象订阅Observer时的流程。


BehaviorSubject的onNext

Behavior的onNext(T v)方法如下

@Override
    public void onNext(T v) {
        Object last = state.get();
        if (last == null || state.active) {
            Object n = nl.next(v);
            for (SubjectObserver<T> bo : state.next(n)) {
                bo.emitNext(n, state.nl);
            }
        }
    }

state是SubjectSubscriptionManager类的对象,是这个对象来维护最近释放的数据对象,state.get()获取最近释放的数据对象,state.next(Object n)方法重新设置最近释放的数据对象,并返回已经注册的Observer数组。

SubjectObserver<T>[] next(Object n) {
        set(n);
        return state.observers;
    }

bo.emitNext(Object n, final NotificationLite< T> nl)释放给定的数据对象。

BehaviorSubject的onCompleted和onError

onCompleted和onError会调用SubjectSubscriptionManager的terminate(Object n)方法,该方法会重新设置最近释放的数据对象,设置Subject状态为TERMINATED,表示终结了,最后返回已注册的Observer数组。

SubjectObserver<T>[] terminate(Object n) {
        set(n);
        active = false;

        State<T> oldState = state;
        if (oldState.terminated) {
            return State.NO_OBSERVERS;
        }
        return STATE_UPDATER.getAndSet(this, State.TERMINATED).observers;
    }

参考文章:

http://www.liuhaihua.cn/archives/133614.html

http://blog.csdn.net/sun927/article/details/44818845

时间: 2024-08-30 09:10:19

Rxjava Subject分析的相关文章

ReactiveX--响应式编程の相关概念 浅析

在许多软件编程任务中,你或多或少期待你的指令将会按照你已经写好的顺序,依次增量执行和完成.但在ReactiveX,很多指令可以通过“观察者”并行执行,其结果将以任意顺序被捕获.你定义了一种“可观察的形式“的检索和转换数据机制而不??是调用方法,然后订阅观察者给它,每当之前定义好的机制已经准备好了,这些机制就会触发常设的哨兵去捕获并反馈结果. 这种方法的优点是,当你有一大堆的任务是不相互依赖,你就可以同时执行他们,而不是等待每一个来启动下一个前完成,这样你的整个任务包只需要花最长的任务时间. 有很

mysql20170410练习代码+笔记

今天的几道高级sql查询真的挺难的,感觉好像视频里讲过,当时也没有练,已经淡化了很多,sql还是要多练习啊!确实逻辑性挺强的. SELECT studentResult,studentNO FROM result WHERE result.`subjectNo`=(SELECT subjectNo FROM `subject`) SELECT studentResult FROM result WHERE result.subjectNo = (SELECT subjectNo FROM SUB

mysql03

第1次课 -- 01.启动mysql服务 使用管理员身份启动cmd net start mysql -- 02.关闭服务 net stop mysql -- 03.连接mysql数据库 如果是本机 可以省略 -h ip地址 -p和密码不能有空格 mysql -h ip地址 -u用户名 -p密码 -- 04.退出数据库 exit -- 05.查询当前所有的数据库 show databases; 系统默认的四个数据库 information_schema:存放的是一些数据库对象信息,包含:用户表信息

Python基于nginx访问日志并统计IP访问量

如果想看看Nginx有多少IP访问量,有哪些国家访问,并显示IP地址的归属地分布,python可以结合使用高春辉老师ipip.net[免费版 IP 地址数据库],Shell可以使用nali,我这边主要使用python语言来实现需求,并将查询结果以邮件形式发送,也是为了学习和回顾python语言.很感谢高春辉老师提供的免费版IP地址数据库. 一.Ningx日志如下: 41.42.97.104 - - [26/Feb/2015:03:35:40 -0500] "GET /root/ HTTP/1.1

C++和Java继承关系中方法覆盖的不同

下面是一段java代码: package chapter5.game; class Subject{ int num; public void print(){ System.out.println("Subject"); } } class ChildClass extends Subject { public void print(int i){ System.out.println("ChildClass"); } } public class SimpleD

为什么大部分的程序员成不了架构师?为什么很多程序员没有升级到架构师?

对我们程序员来说,发展的途径要么是走管理岗,从开发升级到项目经理甚至是部门经理:要么走技术升级路线.不过在技术路线方面,无法升级到架构师的程序员不在少数.一方面,在不少公司的高级开发岗位上,无法让程序员实践甚至接触到架构师的技能,另一方面,有不少程序员甚至不清楚架构师所需要掌握的技能和升级途径.所以从结果上来看,至少有5成的程序员止步于"高级开发"的程度,这是非常令人可惜的. 而小团队一般 10 人左右,其中常常是技术最牛的人做架构师(或TL).所以,架构师在广大码农中的占比大概平均不

Rxjava分析—Subject

Subject在ReactiveX是作为observer和observerable的一个bridge或者proxy.因为它是一个观察者,所以它可以订阅一个或多个可观察对象,同时因为他是一个可观测对象,所以它可以传递和释放它观测到的数据对象,并且能释放新的对象. 1. Subject的类型 一共有四种为不同用途而设计的Subject,分别为AsyncSubject.BehaviorSubject.PublishSubject和ReplaySubject. 1.1 AsyncSubject Asyn

RxJava &amp;&amp; Agera 从源码简要分析基本调用流程(2)

版权声明:本文由晋中望原创文章,转载请注明出处: 文章原文链接:https://www.qcloud.com/community/article/124 来源:腾云阁 https://www.qcloud.com/community 接上篇RxJava && Agera 从源码简要分析基本调用流程(1)我们从"1.订阅过程"."2.变换过程"进行分析,下篇文章我们继续分析"3.线程切换过程" 3.线程切换过程 从上文中我们知道了R

RxJava中的doOnSubscribe操作符默认执行线程分析

前言 在有心课堂<RxJava之旅>中有学员留言:map和doOnSubscribe默认调度器是IO调度器,这里说错了吧? 下面我们分析下. 在前面讲 Subscriber 的时候,提到过 Subscriber 的 onStart() 可以用作流程开始前的初始化.然而 onStart() 由于在 subscribe() 发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe() 被调用时的线程.这就导致如果 onStart() 中含有对线程有要求的代码(例如在界面上显示一个 P