RxJava 并发之线程调度

由于 Rx 目标是用在异步系统上并且 Rx 支持多线程处理,所以很多 Rx 开发者认为默认情况下 Rx 就是多线程的。 其实实际情况不是这样的,Rx 默认是单线程的。

除非你明确的指定线程,否则所有 onNext/onError/onCompleted 以及各个操作函数的调用都是在同一个线程中完成的。例如下面的示例:

final BehaviorSubject<Integer> subject = BehaviorSubject.create();
subject.subscribe(i -> {
    System.out.println("Received " + i + " on " + Thread.currentThread().getId());
});

int[] i = {1}; // naughty side-effects for examples only ;)
Runnable r = () -> {
    synchronized(i) {
        System.out.println("onNext(" + i[0] + ") on " + Thread.currentThread().getId());
        subject.onNext(i[0]++);
    }
};

r.run(); // Execute on main thread
new Thread(r).start();
new Thread(r).start();

结果:

onNext(1) on 1
Received 1 on 1
onNext(2) on 11
Received 2 on 11
onNext(3) on 12
Received 3 on 12

上面在三个线程中分别调用 subject 的onNext 函数。和 Runnable 中的线程是同一个线程。不管用多少个操作函数串联调用,结果都是同一个线程。

subscribeOn 和 observeOn

subscribeOn 和 observeOn 分别用来控制 subscription 的调用线程和 接受事件通知(Observer 的 onNext/onError/onCompleted 函数)的线程。

public final Observable<T> observeOn(Scheduler scheduler)
public final Observable<T> subscribeOn(Scheduler scheduler)

在Rx 中你并不直接和 线程 打交道,而是通过 Scheduler 来处理多线程。

subscribeOn

subscribeOn 用来指定 Observable.create 中的代码在那个 Scheduler 中执行。即使你没有调用 create 函数,但是内部也有一个 create 实现。例如:

System.out.println("Main: " + Thread.currentThread().getId());

Observable.create(o -> {
        System.out.println("Created on " + Thread.currentThread().getId());
        o.onNext(1);
        o.onNext(2);
        o.onCompleted();
    })
    //.subscribeOn(Schedulers.newThread())
    .subscribe(i -> {
        System.out.println("Received " + i + " on " + Thread.currentThread().getId());
    });

System.out.println("Finished main: " + Thread.currentThread().getId());

结果:

Main: 1
Created on 1
Received 1 on 1
Received 2 on 1
Finished main: 1

可以看到上面的代码是在同一个线程中执行,并且是按循序执行的。subscribe 执行完后(包括create 函数里面的 Lambda 表达式的代码)才继续执行后面的代码。

如果你把上面的注释掉的代码 .subscribeOn(Schedulers.newThread()) 启用,这结果是这样的:

Main: 1
Finished main: 1
Created on 11
Received 1 on 11
Received 2 on 11

这样 create 里面的 Lambda 表达式代码将会在 Schedulers.newThread() 返回的线程中执行。subscribe 不再是阻塞的了。后面的代码可以立即执行,而不用等待 subscribe 返回。

有些 Observable 内部会使用它们自己创建的线程。例如 Observable.interval 就是异步的。这种情况下,无需指定新的线程。

System.out.println("Main: " + Thread.currentThread().getId());

Observable.interval(100, TimeUnit.MILLISECONDS)
    .subscribe(i -> {
        System.out.println("Received " + i + " on " + Thread.currentThread().getId());
    });

System.out.println("Finished main: " + Thread.currentThread().getId());

结果:

Main: 1
Finished main: 1
Received 0 on 11
Received 1 on 11
Received 2 on 11

observeOn

observeOn 控制数据流的另外一端。你的 observer 如何收到事件。也就是在那个线程中回调 observer 的 onNext/onError/onCompleted 函数。

Observable.create(o -> {
        System.out.println("Created on " + Thread.currentThread().getId());
        o.onNext(1);
        o.onNext(2);
        o.onCompleted();
    })
    .observeOn(Schedulers.newThread())
    .subscribe(i ->
        System.out.println("Received " + i + " on " + Thread.currentThread().getId()));

结果:

Created on 1
Received 1 on 13
Received 2 on 13

observeOn 只影响调用该函数以后的操作函数。你可以认为 observeOn 只是拦截了数据流并且对后续的操作有作用。例如:

Observable.create(o -> {
        System.out.println("Created on " + Thread.currentThread().getId());
        o.onNext(1);
        o.onNext(2);
        o.onCompleted();
    })
    .doOnNext(i ->
        System.out.println("Before " + i + " on " + Thread.currentThread().getId()))
    .observeOn(Schedulers.newThread())
    .doOnNext(i ->
        System.out.println("After " + i + " on " + Thread.currentThread().getId()))
    .subscribe();

结果:

Created on 1
Before 1 on 1
Before 2 on 1
After 1 on 13
After 2 on 13

可以看到在遇到 observeOn 之前,所有的操作发生在一个线程,之后在另外一个线程。这样可以在 Rx 数据流中不同地方设置不同的线程。

如果你知道数据流处理在那些情况需要很长时间,则可以通过这个操作来避免阻塞生产者线程。 比如在 Android 开发过程中的 UI 线程,如果在该线程中读取文件,可能会导致 UI 卡死(ANR)无响应,通过该函数可以指定读取文件在另外一个线程中执行。

unsubscribeOn

有些 Observable 会依赖一些资源,当该 Observable 完成后释放这些资源。如果释放资源比较耗时的话,可以通过 unsubscribeOn 来指定 释放资源代码执行的线程。

Observable<Object> source = Observable.using(
    () -> {
        System.out.println("Subscribed on " + Thread.currentThread().getId());
        return Arrays.asList(1,2);
    },
    (ints) -> {
        System.out.println("Producing on " + Thread.currentThread().getId());
        return Observable.from(ints);
    },
    (ints) -> {
        System.out.println("Unubscribed on " + Thread.currentThread().getId());
    }
);

source
    .unsubscribeOn(Schedulers.newThread())
    .subscribe(System.out::println);

结果:

Subscribed on 1
Producing on 1
1
2
Unubscribed on 11

Schedulers

observeOn 和 subscribeOn 的参数为一个 Scheduler 对象。Scheduler 是用来协调任务执行的。 RxJava 包含了一些常用的 Scheduler,你也可以自定义 Scheduler。 通过调用 Schedulers 的工厂函数来获取标准的预定义的 Scheduler。

RxJava 内置的 Scheduler 有:

  • immediate 同步执行
  • trampoline 把任务放到当前线程的队列中,等当前任务执行完了,再继续执行队列中的任务
  • newThread 对于每个任务创建一个新的线程去执行
  • computation 计算线程,用于需要大量 CPU 计算的任务
  • io 用于执行 io 操作的任务
  • test 用于测试和调试

当前 computation 和 io 的实现是类似的,他们两个主要用来确保调用的场景,相当于文档说明,来表明你当前的任务是何种类型的。

大部分的 Rx 操作函数内部都使用了schedulers 。并且大部分的 Observable 操作函数也都有一个使用 Scheduler 参数的重载函数。通过重载函数可以指定该操作函数执行的线程。

scheduler 的高级特性

Rx scheduler 的使用场景并没有限定在 Rx 中,也可以在普通 Java 代码中使用。

执行一个任务

Scheduler 有个 createWorker 函数,用来创建一个可以执行的任务(Scheduler.Worker)。然后可以调度该任务:

Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(
    () -> System.out.println("Action"));

上面的任务被分配到其指定的线程中了。

还可以重复执行任务,或者只执行一次,也可以推迟任务执行:

Subscription schedule(
    Action0 action,
    long delayTime,
    java.util.concurrent.TimeUnit unit)
Subscription schedulePeriodically(
    Action0 action,
    long initialDelay,
    long period,
    java.util.concurrent.TimeUnit unit)
Scheduler scheduler = Schedulers.newThread();
long start = System.currentTimeMillis();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(
    () -> System.out.println(System.currentTimeMillis()-start),
    5, TimeUnit.SECONDS);
worker.schedule(
    () -> System.out.println(System.currentTimeMillis()-start),
    5, TimeUnit.SECONDS);

结果:

5033
5035

上面示例中可以看到,推迟执行是从调度开始的时候计算时间的。

取消任务

Scheduler.Worker 继承至 Subscription。调用 unsubscribe 函数可以取消队列中的任务:

Scheduler scheduler = Schedulers.newThread();
long start = System.currentTimeMillis();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(
    () -> {
        System.out.println(System.currentTimeMillis()-start);
        worker.unsubscribe();
    },
    5, TimeUnit.SECONDS);
worker.schedule(
    () -> System.out.println(System.currentTimeMillis()-start),
    5, TimeUnit.SECONDS);

结果:

5032

第一个任务中调用了 unsubscribe,这样第二个任务被取消了。下面一个示例演示任务没有执行完,被取消的情况,会抛出一个 InterruptedException 异常:

Scheduler scheduler = Schedulers.newThread();
long start = System.currentTimeMillis();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    try {
        Thread.sleep(2000);
        System.out.println("Action completed");
    } catch (InterruptedException e) {
        System.out.println("Action interrupted");
    }
});
Thread.sleep(500);
worker.unsubscribe();

结果:

Action interrupted

schedule 返回的是一个 Subscription 对象,可以在该对象上调用取消操作,这样可以只取消这一个任务,而不是取消所有任务。

RxJava 中现有的 scheduler

ImmediateScheduler

ImmediateScheduler 并没有做任何线程调度。只是同步的执行任务。嵌套调用会导致任务被递归执行:

Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    System.out.println("Start");
    worker.schedule(() -> System.out.println("Inner"));
    System.out.println("End");
});

结果:

Start
Inner
End

TrampolineScheduler

TrampolineScheduler 也是同步执行,但是不嵌套任务。而是把后来的任务添加到任务队列中,等前面的任务执行完了 再执行后面的。

Scheduler scheduler = Schedulers.trampoline();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    System.out.println("Start");
    worker.schedule(() -> System.out.println("Inner"));
    System.out.println("End");
});

结果:

Start
End
Inner

TrampolineScheduler 把任务安排到第一次执行任务的那个线程中执行。这样,第一次调用 schedule 的操作是阻塞的,直到队列执行完。后续的任务,会在这个线程中一个一个的执行,并且后续的调用不会阻塞。

NewThreadScheduler

NewThreadScheduler 给每个任务创建一个新的线程。

定义一个打印线程信息的辅助函数:

public static void printThread(String message) {
    System.out.println(message + " on " + Thread.currentThread().getId());
}

示例:

printThread("Main");
Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    printThread("Start");
    worker.schedule(() -> printThread("Inner"));
    printThread("End");
});
Thread.sleep(500);
worker.schedule(() -> printThread("Again"));

结果:

Main on 1
Start on 11
End on 11
Inner on 11
Again on 11

本文出自 云在千峰 http://blog.chengyunfeng.com/?p=978

时间: 2024-12-20 08:06:51

RxJava 并发之线程调度的相关文章

RxJava 并发之数据流发射太快如何办

Backpressure Rx 中的数据流是从一个地方发射到另外一个地方.每个地方处理数据的速度是不一样的.如果生产者发射数据的速度比消费者处理的快会出现什么情况?在同步操作中,这不是个问题,例如: // Produce Observable<Integer> producer = Observable.create(o -> { o.onNext(1); o.onNext(2); o.onCompleted(); }); // Consume producer.subscribe(i

RxJava 并发之意外情况处理

Rx 尽量避免状态泄露到数据流之外的场景.但是有些东西本身就带有状态.比如服务器可以上线和离线.手机可以访问Wifi.按钮被按下了等.在 Rx 中国,我们在一段时间内看到这些事件,并称之为窗口(window).其他事件在这个窗口内发生可能需要特殊处理.例如,手机在使用移动收费上网的时候,会把网络请求优先级降低,来避免天价流量费的情况. 注意:上面的一段话估计是翻译的,有点语句不通.更多参考官网: http://www.introtorx.com/Content/v1.0.10621.0/17_S

RxJava中的doOnSubscribe默认运行线程分析

假设你对RxJava1.x还不是了解,能够參考以下文章. 1. RxJava使用介绍 [视频教程] 2. RxJava操作符 ? Creating Observables(Observable的创建操作符) [视频教程] ? Transforming Observables(Observable的转换操作符) [视频教程] ? Filtering Observables(Observable的过滤操作符) [视频教程] ? Combining Observables(Observable的组合操

RxJava操作符——条件和布尔操作符(Conditional and Boolean Operators)

RxJava系列教程: 1. RxJava使用介绍 [视频教程] 2. RxJava操作符 ? Creating Observables(Observable的创建操作符) [视频教程] ? Transforming Observables(Observable的转换操作符) [视频教程] ? Filtering Observables(Observable的过滤操作符) [视频教程] ? Combining Observables(Observable的组合操作符) [视频教程] ? Erro

Android异步框架RxJava 1.x系列(二) - 事件及事件序列转换原理

前言 在介绍 RxJava 1.x 线程调度器之前,首先引入一个重要的概念 - 事件序列转换.RxJava 提供了对事件序列进行转换的支持,这是它的核心功能之一. 正文 1. 事件序列转换定义 所谓转换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列,有点类似 Java 1.8 中的流处理. 2. 事件序列转换API 首先看一个 map() 的例子: Observable.just("images/logo.png") // 输入类型 String .map(

浅谈RxJava源码解析(观察者),创建(create、from、just),变换(Map、flatMap)、线程调度

一.创建操作: 1.观察者模式:RxJava的世界里,我们有四种角色: Observable<T>(被观察者).Observer(观察者) Subscriber(订阅者).Subject Observable和Subject是两个"生产"实体,Observer和Subscriber是两个"消费"实体.Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Ob

RxJava 和 RxAndroid 五(线程调度)

对rxJava不了解的同学可以先看 RxJava 和 RxAndroid 一 (基础)RxJava 和 RxAndroid 二(操作符的使用)RxJava 和 RxAndroid 三(生命周期控制和内存优化) RxJava 和 RxAndroid 四(RxBinding的使用) 本文将有几个例子说明,rxjava线程调度的正确使用姿势. 例1 Observable .create(new Observable.OnSubscribe<String>() { @Override public v

Android异步框架RxJava 1.x系列(三) - 线程调度器Scheduler

前言 RxJava 事件的发出和消费都在同一个线程,基于同步的观察者模式.观察者模式的核心是后台处理,前台回调的异步机制.要实现异步,需要引入 RxJava 的另一个概念 - 线程调度器 Scheduler. 正文 在不指定线程的情况下,RxJava 遵循的是线程不变的原则.即在哪个线程调用 subscribe() 方法,就在哪个线程生产事件:在哪个线程生产事件,就在哪个线程消费事件.如果需要切换线程,就需要用到线程调度器 Scheduler. 1. 几种Scheduler介绍 在 RxJava

Android开发之Retrofit+RxJava的使用

Retrofit是Square公司开发的一款针对Android网络请求的一个当前很流行的网络请求库. http://square.github.io/retrofit/ https://github.com/square/retrofit 使用需引入: compile 'com.squareup.retrofit2:retrofit:2.1.0' compile 'com.squareup.retrofit2:adapter-rxjava:2.1.0' compile 'io.reactivex