理解 RxJava 的线程模型

来源:鸟窝,

colobu.com/2016/07/25/understanding-rxjava-thread-model/

如有好文章投稿,请点击 → 这里了解详情

ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io。

Netflix参考微软的Reactive Extensions创建了Java的实现RxJava,主要是为了简化服务器端的并发。2013年二月份,Ben Christensen 和 Jafar Husain发在Netflix技术博客的一篇文章第一次向世界展示了RxJava。

RxJava也在Android开发中得到广泛的应用。

ReactiveX

An API for asynchronous programming with observable streams.

A combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.

虽然RxJava是为异步编程实现的库,但是如果不清楚它的使用,或者错误地使用了它的线程调度,反而不能很好的利用它的异步编程提到系统的处理速度。本文通过实例演示错误的RxJava的使用,解释RxJava的线程调度模型,主要介绍Scheduler、observeOn和subscribeOn的使用。

本文中的例子以并发发送http request请求为基础,通过性能检验RxJava的线程调度。

第一个例子,性能超好?

我们首先看第一个例子:

public static void testRxJavaWithoutBlocking(int count) throws Exception {

CountDownLatch finishedLatch = new CountDownLatch(1);

long t = System.nanoTime();

Observable.range(0, count).map(i -> {

//System.out.println("A:" + Thread.currentThread().getName());

return 200;

}).subscribe(statusCode -> {

//System.out.println("B:" + Thread.currentThread().getName());

}, error -> {

}, () -> {

finishedLatch.countDown();

});

finishedLatch.await();

t = (System.nanoTime() - t) / 1000000; //ms

System.out.println("RxJavaWithoutBlocking TPS: " + count * 1000 / t);

}

这个例子是一个基本的RxJava的使用,利用Range创建一个Observable, subscriber处理接收的数据。因为整个逻辑没有阻塞,程序运行起来很快,

输出结果为:

RxJavaWithoutBlocking TPS: 7692307 。

2 加上业务的模拟,性能超差

上面的例子是一个理想化的程序,没雨任何阻塞。我们模拟一下实际的应用,加上业务处理。

业务逻辑是发送一个http的请求,httpserver是一个模拟器,针对每个请求有30毫秒的延迟。subscriber统计请求结果:

public static void testRxJavaWithBlocking(int count) throws Exception {

URL url = new URL("http://127.0.0.1:8999/");

CountDownLatch finishedLatch = new CountDownLatch(1);

long t = System.nanoTime();

Observable.range(0, count).map(i -> {

try {

HttpURLConnection conn = (HttpURLConnection) url.openConnection();

conn.setRequestMethod("GET");

int responseCode = conn.getResponseCode();

BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));

String inputLine;

while ((inputLine = in.readLine()) != null) {

//response.append(inputLine);

}

in.close();

return responseCode;

} catch (Exception ex) {

return -1;

}

}).subscribe(statusCode -> {

}, error -> {

}, () -> {

finishedLatch.countDown();

});

finishedLatch.await();

t = (System.nanoTime() - t) / 1000000; //ms

System.out.println("RxJavaWithBlocking TPS: " + count * 1000 / t);

}

运行结果如下:

RxJavaWithBlocking TPS: 29。

性能怎么突降呢,第一个例子看起来性能超好啊,http server只增加了一个30毫秒的延迟,导致这个方法每秒只能处理29个请求。

如果我们估算一下, 29*30= 870 毫秒,大约1秒,正好和单个线程发送处理所有的请求的TPS差不多。

后面我们也会看到,实际的确是一个线程处理的,你可以在代码中加入

3 加上调度器,不起作用?

如果你对subscribeOn和observeOn方法有些印象的话,可能会尝试使用调度器去解决:

public static void testRxJavaWithBlocking(int count) throws Exception {

URL url = new URL("http://127.0.0.1:8999/");

CountDownLatch finishedLatch = new CountDownLatch(1);

long t = System.nanoTime();

Observable.range(0, count).map(i -> {

try {

HttpURLConnection conn = (HttpURLConnection) url.openConnection();

conn.setRequestMethod("GET");

int responseCode = conn.getResponseCode();

BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));

String inputLine;

while ((inputLine = in.readLine()) != null) {

//response.append(inputLine);

}

in.close();

return responseCode;

} catch (Exception ex) {

return -1;

}

}).subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).subscribe(statusCode -> {

}, error -> {

}, () -> {

finishedLatch.countDown();

});

finishedLatch.await();

t = (System.nanoTime() - t) / 1000000; //ms

System.out.println("RxJavaWithBlocking TPS: " + count * 1000 / t);

}

加上.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation())看一下性能:

RxJavaWithBlocking TPS: 30。

性能没有改观,是时候了解一下RxJava线程调度的问题了。

4 RxJava的线程模型

首先,依照Observable Contract, onNext是顺序执行的,不会同时由多个线程并发执行。

默认情况下,它是在调用subscribe方法的那个线程中执行的。如第一个例子和第二个例子,Rx的操作和消息接收处理都是在同一个线程中执行的。一旦由阻塞,比如第二个例子,久会导致这个线程被阻塞,吞吐量下降。

但是subscribeOn可以改变Observable的运行线程。

上图中可以看到,如果你使用了subscribeOn方法,则Rx的运行将会切换到另外的线程上,而不是默认的调用线程。

需要注意的是,如果在Observable链中调用了多个subscribeOn方法,无论调用点在哪里,Observable链只会使用第一个subscribeOn指定的调度器,正所谓”一见倾情”。

但是onNext还是顺序执行的,所以第二个例子的性能依然低下。

observeOn可以中途改变Observable链的线程。前面说了,subscribeOn方法改变的源Observable的整个的运行线程,要想中途切换线程,就需要observeOn方法。

官方的一个简略晦涩的解释如下:

The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.

一图胜千言:

注意箭头的颜色和横轴的颜色,不同的颜色代表不同的线程。

5 Schedulers

上面我们了解了RxJava可以使用subscribeOn和observeOn可以改变和切换线程,以及onNext是顺序执行的,不是并发执行,至多也就切换到另外一个线程,如果它中间的操作是阻塞的,久会影响整个Rx的执行。

Rx是通过调度器来选择哪个线程执行的,RxJava内置了几种调度器,分别为不同的case提供线程:

  • io() : 这个调度器时用于I/O操作, 它可以增长或缩减来确定线程池的大小它是使用CachedThreadScheduler来实现的。需要注意的是,它的线程池是无限制的,如果你使用了大量的线程的话,可能会导致OutOfMemory等资源用尽的异常。
  • computation() : 这个是计算工作默认的调度器,它与I/O操作无关。它也是许多RxJava方法的默认调度器:buffer(), debounce(), delay(), interval(), sample(), skip()。

因为这些方法内部已经调用的调度器,所以你再调用subscribeOn是无效的,比如下面的例子总是使用computation调度器的线程。

Observable.just(1,2,3)

.delay(1, TimeUnit.SECONDS)

.subscribeOn(Schedulers.newThread())

.map(i -> {

System.out.println("map: " + Thread.currentThread().getName());

return i;

})

.subscribe(i -> {});

  • immediate() :这个调度器允许你立即在当前线程执行你指定的工作。它是timeout(),timeInterval(),以及timestamp()方法默认的调度器。
  • newThread() :创建一个新的线程只从。
  • trampoline() :为当前线程建立一个队列,将当前任务加入到队列中依次执行。

同时,Schedulers还提供了from静态方法,用户可以定制线程池:

ExecutorService es = Executors.newFixedThreadPool(200, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());

Schedulers.from(es)

6 改造,异步执行

现在,我们已经了解了RxJava的线程运行,以及相关的调度器。可以看到上面的例子还是顺序阻塞执行的,即使是切换到另外的线程上,依然是顺序阻塞执行,显示它的吞吐率非常非常的低。下一步我们就要改造这个例子,让它能异步的执行。

下面是一种改造方案,我先把代码贴出来,再解释:

public static void testRxJavaWithFlatMap(int count) throws Exception {

ExecutorService es = Executors.newFixedThreadPool(200, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());

URL url = new URL("http://127.0.0.1:8999/");

CountDownLatch finishedLatch = new CountDownLatch(1);

long t = System.nanoTime();

Observable.range(0, count).subscribeOn(Schedulers.io()).flatMap(i -> {

//System.out.println("A: " + Thread.currentThread().getName());

return Observable.just(i).subscribeOn(Schedulers.from(es)).map(v -> {

//System.out.println("B: " + Thread.currentThread().getName());

try {

HttpURLConnection conn = (HttpURLConnection) url.openConnection();

conn.setRequestMethod("GET");

int responseCode = conn.getResponseCode();

BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));

String inputLine;

while ((inputLine = in.readLine()) != null) {

//response.append(inputLine);

}

in.close();

return responseCode;

} catch (Exception ex) {

return -1;

}

}

);

}

).observeOn(Schedulers.computation()).subscribe(statusCode -> {

//System.out.println("C: " + Thread.currentThread().getName());

}, error -> {

}, () -> {

finishedLatch.countDown();

});

finishedLatch.await();

t = (System.nanoTime() - t) / 1000000; //ms

System.out.println("RxJavaWithFlatMap TPS: " + count * 1000 / t);

es.shutdownNow();

}

通过flatmap可以将源Observable的元素项转成n个Observable,生成的每个Observable可以使用线程池并发的执行,同时flatmap还会将这n个Observable merge成一个Observable。你可以将其中的注释打开,看看线程的执行情况。

性能还不错:

RxJavaWithFlatMap TPS: 3906。

FlatMap — transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable

7 另一种解决方案

我们已经清楚了要并行执行提高吞吐率的解决办法就是创建多个Observable并且并发执行。基于这种解决方案,我们还可以有其它的解决方案。

上一方案中利用flatmap创建多个Observable,针对我们的例子,我们何不直接创建多个Observable呢?

public static void testRxJavaWithParallel(int count) throws Exception {

ExecutorService es = Executors.newFixedThreadPool(200, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());

URL url = new URL("http://127.0.0.1:8999/");

CountDownLatch finishedLatch = new CountDownLatch(count);

long t = System.nanoTime();

for (int k = 0; k < count; k++) {

Observable.just(k).map(i -> {

//System.out.println("A: " + Thread.currentThread().getName());

try {

HttpURLConnection conn = (HttpURLConnection) url.openConnection();

conn.setRequestMethod("GET");

int responseCode = conn.getResponseCode();

BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));

String inputLine;

while ((inputLine = in.readLine()) != null) {

//response.append(inputLine);

}

in.close();

return responseCode;

} catch (Exception ex) {

return -1;

}

}).subscribeOn(Schedulers.from(es)).observeOn(Schedulers.computation()).subscribe(statusCode -> {

}, error -> {

}, () -> {

finishedLatch.countDown();

});

}

finishedLatch.await();

t = (System.nanoTime() - t) / 1000000; //ms

System.out.println("RxJavaWithParallel TPS: " + count * 1000 / t);

es.shutdownNow();

}

性能更好一点:

RxJavaWithParallel2 TPS: 4716。

这个例子没有使用Schedulers.io()作为它的调度器,这是因为如果在大并发的情况下,可能会出现创建过多的线程导致资源不错,所以我们限定使用200个线程。

8 总结

  • subscribeOn() 改变的Observable运行(operate)使用的调度器,多次调用无效。
  • observeOn() 改变Observable发送notifications的调度器,会影响后续的操作,可以多次调用
  • 默认情况下, 操作链使用的线程是调用subscribe()的线程
  • Schedulers提供了多个调度器,可以并行运行多个Observable
  • 使用RxJava可以实现异步编程,但是依然要小心线程阻塞。而且由于这种异步的编程,调试代码可能更加的困难

9 参考文档

  • http://reactivex.io/documentation/contract.html
  • http://reactivex.io/documentation/operators/subscribeon.html 中文翻译
  • http://reactivex.io/documentation/operators/observeon.html 中文翻译
  • http://reactivex.io/documentation/scheduler.html
  • http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html
  • http://tomstechnicalblog.blogspot.com/2015/11/rxjava-achieving-parallelization.html
  • https://medium.com/@diolor/observe-in-the-correct-thread-1939bb9bb9d2 中文翻译
  • https://github.com/mcxiaoke/RxDocs
时间: 2024-12-13 09:46:55

理解 RxJava 的线程模型的相关文章

理解RxJava线程模型

RxJava作为目前一款超火的框架,它便捷的线程切换一直被人们津津乐道,本文从源码的角度,来对RxJava的线程模型做一次深入理解.(注:本文的多处代码都并非原本的RxJava的源码,而是用来说明逻辑的伪代码) 入手体验 RxJava 中切换线程非常简单,例如最常见的异步线程处理,主线程回调的模型,可以很优雅的用如下代码来做处理: Observable.just("magic") .map(str -> doExpensiveWork(str)) .subscribeOn(Sch

Netty系列之Netty线程模型

1. 背景 1.1. Java线程模型的演进 1.1.1. 单线程 时间回到十几年前,那时主流的CPU都还是单核(除了商用高性能的小机),CPU的核心频率是机器最重要的指标之一. 在Java领域当时比较流行的是单线程编程,对于CPU密集型的应用程序而言,频繁的通过多线程进行协作和抢占时间片反而会降低性能. 1.1.2. 多线程 随着硬件性能的提升,CPU的核数越来越越多,很多服务器标配已经达到32或64核.通过多线程并发编程,可以充分利用多核CPU的处理能力,提升系统的处理效率和并发性能. 相关

HBase的Write Ahead Log (WAL) —— 整体架构、线程模型【转】

转自:http://www.cnblogs.com/ohuang/p/5807543.html 解决的问题 HBase的Write Ahead Log (WAL)提供了一种高并发.持久化的日志保存与回放机制.每一个业务数据的写入操作(PUT / DELETE)执行前,都会记账在WAL中. 如果出现HBase服务器宕机,则可以从WAL中回放执行之前没有完成的操作. 本文主要探讨HBase的WAL机制,如何从线程模型.消息机制的层面上,解决这些问题: 1. 由于多个HBase客户端可以对某一台HBa

Chromium线程模型、消息循环

多线程的麻烦 多线程编程是一件麻烦的事,相信很多人深有体会.执行顺序的不确定性,资源的并发访问一直困扰着众多程序员.解决多线程编程问题的方法分为两类:一是对并发访问的资源直接加锁:二是避免并发访问资源:Chromium采用第二种思想来设计多线程模型,通过在线程之间传递消息来实现跨进程通讯. 设计原则 Chromium希望尽量保持UI处于响应状态.为此遵循如下设计原则: 1 不在UI线程上执行任何阻塞I/O操作,以及其它耗时操作. 2 少用锁和线程安全对象 3 避免阻塞I/O线程 4 线程之间不要

C#线程模型脉络

今天在看同事新买到的<C#本质论 Edition 4>的时候,对比下以前Edtion3的新特性时针对Async/Await关键字时发现对一些线程方面的定义还理解的不是很透彻,脉络还不是很清晰,这样有了本文,希望对有同样困惑的朋友有些帮助. 文中部分内容摘取自<Essential C# 5.0 Edition 4>,还有一些我个人的对线程方面知识的理解与概括,如果有错误的地方还请指出,如果您觉得文章还不错,请点击“推荐” :) C#线程模型脉络 缩写: SPM:Synchronous

经典的线程模型

既然我们已经明白为什么线程会有用以及如何使用它们,不如让我们用更近一步的眼光来审查一下上面的想法.进程模型基于两种独立的概念:资源分组处理与执行.有时,将这两种概念分开会更有益,这也引入了"线程"这一概念.我们将先来看经典的线程模型:之后我们会来研究"模糊进程与线程分界线"的Linux线程模型. 理解进程的一个角度是,用某种方法把相关的资源集中在一起.进程有存放程序正文和数据以及其他资源的地址空间.这些资源中包括打开的文件.子进程.即将发生的报警.信号处理程序.账号

Netty in Action (十七) 第七章节 EventLoop和线程模型

本章节包括: 1)线程模型总览 2)Event Loop概念和具体实现 3)任务调度 4)实现细节 简单地陈述一下,对于一个操作系统,编程语言,框架,或者应用来说,线程模型对其都是至关重要的一部分,在什么时间如何创建一个线程都会对你的代码执行有很重要的影响,所以对于开发人员而言,懂得在各种线程模型里面权衡利弊就是一个很重要的事情,是直接使用线程模型本身还是通过一些框架或者语言提供的线程框架对于开发者而言都是需要选择的 在这个章节,我们将会详细地讲解Netty的线程模型,这个模型是很强大的,且易于

Java线程模型、线程状态 - 线程(1)

1. 概述 众所周知,线程 - Thread 是比进程 - Progress 更轻量级的调度单位.简单来说,引入线程带来的好处是: 可以把一个进程 的资源分配和执行调度分开,各个线程 既可以共享进程 资源(内存地址.文件I/O等),又可以独立调度. 线程实现方式: 主流的操作系统都实现了线程 ,而编程语言一般会提供关于线程 的统一API操作.那么,编程语言如何去调用系统线程 呢?这方面主要有3种方式: 使用内核线程 - Kernel Thread. 一对一线程模型 ,这个最重要,下面详细讲. 使

Memcached源码分析之线程模型

作者:Calix 一)模型分析 memcached到底是如何处理我们的网络连接的? memcached通过epoll(使用libevent,下面具体再讲)实现异步的服务器,但仍然使用多线程,主要有两种线程,分别是“主线程”和“worker线程”,一个主线程,多个worker线程. 主线程负责监听网络连接,并且accept连接.当监听到连接时,accept后,连接成功,把相应的client fd丢给其中一个worker线程.worker线程接收主线程丢过来的client fd,加入到自己的epol