RxJava 并发 之测试

在开发软件的时候,我们需要确保代码正确执行。为了快速的获取每次修改后的反馈,通常开发人员使用自定义测试。

在同步的 Rx 中测试和普通 Java 中的单元测试没有太大的区别。如果要测试异步代码,可能会有点需要注意的地方,比如要测试下面的代码:

Observable.interval(1, TimeUnit.SECONDS)
    .take(5)

上面的 Observable 发射一个数据流,需要 5秒 来发射完所有的数据。如果我们使用自动化测试这个代码,则是不是意味着测试代码也要执行 5秒,如果我们有成千上万个这样的测试,测试将消耗很多时间去完成。

TestScheduler

上面示例的代码,5秒钟的时间其实大部分都在等待。如果我们可以加快系统时钟,则可以很快的完成数据流的发射。虽然实际操作中,无法加速系统时钟,但是可以加速一个虚拟的时钟。在 Rx 设计中,考虑到只在 scheduler 中使用时间相关的操作。这样可以用一个虚拟的 TestScheduler 来替代真实的 Scheduler。

TestScheduler 和前面一节介绍的线程调度功能是一样的。调度的任务要么立刻执行,要么在将来某个时刻执行。区别在于 TestScheduler 中的时间是不动的,只有被调用了时间才会继续。

advanceTimeTo

advanceTimeTo 函数就是把 TestScheduler 中的时钟前进指定的时间刻度。

TestScheduler s = Schedulers.test();

s.createWorker().schedule(
        () -> System.out.println("Immediate"));
s.createWorker().schedule(
        () -> System.out.println("20s"),
        20, TimeUnit.SECONDS);
s.createWorker().schedule(
        () -> System.out.println("40s"),
        40, TimeUnit.SECONDS);

System.out.println("Advancing to 1ms");
s.advanceTimeTo(1, TimeUnit.MILLISECONDS);
System.out.println("Virtual time: " + s.now());

System.out.println("Advancing to 10s");
s.advanceTimeTo(10, TimeUnit.SECONDS);
System.out.println("Virtual time: " + s.now());

System.out.println("Advancing to 40s");
s.advanceTimeTo(40, TimeUnit.SECONDS);
System.out.println("Virtual time: " + s.now());

结果:

Advancing to 1ms
Immediate
Virtual time: 1
Advancing to 10s
Virtual time: 10000
Advancing to 40s
20s
40s
Virtual time: 40000

上面示例中创建的 3 个任务,第一个任务立刻执行,第二个和第三个在将来执行。可以看到如果不调用 advanceTimeTo 来使时间前进,则所有任务都不会执行,因为在 TestScheduler 中时间是停止的。当时间前进的时候, TestScheduler 会同步执行所有满足时间条件的任务。

advanceTimeTo 可以设置时间为任意时刻。也可以回到过去(设置的时间比当前的时间还早)。所以这个函数如果不注意使用,可能会导致不可预见的 bug。一般建议使用下面这个函数。

advanceTimeBy

advanceTimeBy 顾名思义,在当前时间基础上前进多少。

TestScheduler s = Schedulers.test();

s.createWorker().schedule(
        () -> System.out.println("Immediate"));
s.createWorker().schedule(
        () -> System.out.println("20s"),
        20, TimeUnit.SECONDS);
s.createWorker().schedule(
        () -> System.out.println("40s"),
        40, TimeUnit.SECONDS);

System.out.println("Advancing by 1ms");
s.advanceTimeBy(1, TimeUnit.MILLISECONDS);
System.out.println("Virtual time: " + s.now());

System.out.println("Advancing by 10s");
s.advanceTimeBy(10, TimeUnit.SECONDS);
System.out.println("Virtual time: " + s.now());

System.out.println("Advancing by 40s");
s.advanceTimeBy(40, TimeUnit.SECONDS);
System.out.println("Virtual time: " + s.now());

结果:

Advancing by 1ms
Immediate
Virtual time: 1
Advancing by 10s
Virtual time: 10001
Advancing by 40s
20s
40s
Virtual time: 50001

triggerActions

triggerActions 不会修改时间。只是用来执行当前可以调度的任务。

TestScheduler s = Schedulers.test();

s.createWorker().schedule(
        () -> System.out.println("Immediate"));
s.createWorker().schedule(
        () -> System.out.println("20s"),
        20, TimeUnit.SECONDS);

s.triggerActions();
System.out.println("Virtual time: " + s.now());

结果:

Immediate
Virtual time: 0

调度冲突

有些任务可能在同一时刻执行。如果发生这种情况,则被称之为 调度冲突。 这些任务调度的顺序就是他们执行的顺序(也就是按照顺序执行)

TestScheduler s = Schedulers.test();

s.createWorker().schedule(
        () -> System.out.println("First"),
        20, TimeUnit.SECONDS);
s.createWorker().schedule(
        () -> System.out.println("Second"),
        20, TimeUnit.SECONDS);
s.createWorker().schedule(
        () -> System.out.println("Third"),
        20, TimeUnit.SECONDS);

s.advanceTimeTo(20, TimeUnit.SECONDS);

结果:

First
Second
Third

测试

Rx 的 Observable的 大部分操作函数都有一个可以指定 Scheduler 的重载形式。在这些函数上同样可以使用 TestScheduler。

@Test
public void test() {
    TestScheduler scheduler = new TestScheduler();
    List<Long> expected = Arrays.asList(0L, 1L, 2L, 3L, 4L);
    List<Long> result = new ArrayList<>();
    Observable
        .interval(1, TimeUnit.SECONDS, scheduler)
        .take(5)
        .subscribe(i -> result.add(i));
    assertTrue(result.isEmpty());
    scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
    assertTrue(result.equals(expected));
}

这样测试代码就可以很开的完成,比较适合测试简短的 Rx 代码。在实际代码中,可以把获取 Scheduler 的函数封装起来,在 debug 版本中使用 TestScheduler ,而在发布版本中使用真实的 Scheduler。

TestSubscriber

上面的测试中,我们手工的收集发射的数据并根据期望的数据去对比,来判断测试是否成功。由于这样的测试很常见,Rx 提供了一个 TestSubscriber 来帮助简化测试过程。 前面的测试代码使用 TestSubscriber 可以变为这样:

@Test
public void test() {
    TestScheduler scheduler = new TestScheduler();
    TestSubscriber<Long> subscriber = new TestSubscriber<>();
    List<Long> expected = Arrays.asList(0L, 1L, 2L, 3L, 4L);
    Observable
        .interval(1, TimeUnit.SECONDS, scheduler)
        .take(5)
        .subscribe(subscriber);
    assertTrue(subscriber.getOnNextEvents().isEmpty());
    scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
    subscriber.assertReceivedOnNext(expected);
}

TestSubscriber 不仅仅只收集数据,还有如下一些函数:

java.lang.Thread getLastSeenThread()
java.util.List<Notification<T>> getOnCompletedEvents()
java.util.List<java.lang.Throwable> getOnErrorEvents()
java.util.List<T> getOnNextEvents()

有两点需要额外注意:

  1. getLastSeenThread 函数。 TestSubscriber 会检查在那个线程执行回调函数,并记录最后一个线程。如果你想测试回调函数是否发生在 GUI 线程,则可以使用这个函数。
  2. 有趣的是 getOnCompletedEvents 可以返回多个结束事件。这是违反 Rx 约定的情况,可以通过测试来检查。

TestSubscriber 还提供了一些常见的判断函数:

void assertNoErrors()
void assertReceivedOnNext(java.util.List<T> items)
void assertTerminalEvent()
void assertUnsubscribed()

另外还可以阻塞直到特定的事件发生:

void awaitTerminalEvent()
void awaitTerminalEvent(long timeout, java.util.concurrent.TimeUnit unit)
void awaitTerminalEventAndUnsubscribeOnTimeout(long timeout, java.util.concurrent.TimeUnit unit)

指定时间可能会导致超时的异常(没有在规定的时间内完成)。

本文出自 云在千峰 http://blog.chengyunfeng.com/?p=979

时间: 2024-10-11 07:10:49

RxJava 并发 之测试的相关文章

JMeter并发性测试

JMeter并发性测试 一.JMeter简介 Apache JMeter是Apache组织开发的基于Java的压力测试工具.用于对软件做压力测试,它最初被设计用于Web应用测试但后来扩展到其他测试领域. 它可以用于测试静态和动态资源例如静态文件.Java小服务程序.CGI脚本.Java 对象.数据库, FTP服务器, 等等.JMeter 可以用于对服务器.网络或对象模拟巨大的负载,来在不同压力类别下测试它们的强度和分析整体性能.另外,JMeter能够对应用程序做功能/回归测试,通过创建带有断言的

jmeter多用户并发压力测试(导入文件)

jmeter多用户并发压力测试可用CSV Data Set Config (添加--配置元件--CSV Data Set Config) Filename:  文件的位置(如果是同目录下csv文件,可不填写路径,写名称即可) Variable Names: 变量名称.多个变量用 ,  或者  ; 隔开 Recycle on EOF:文件结束后是否要循环 Stop thread on EOF(文件结束是否中止线程) File Encoding: 默认为ANSI Allow Quoated data

C程序片段并发性测试以及前趋图的自动生成

VX2012平台.C++完成主程序,JAVA实现界面. 所谓程序并发性是指在计算机系统中同时存在有多个程序,宏观上看,这些程序是同时向前推进的.在单CPU环境下,这些并发执行的程序是交替在CPU上运行的.分析程序之间的的可并发性,并利用伯恩斯坦条件来判定各程序之间能否并发执行,同时通过词法语法分析自动生成程序之间的前趋图(DAG图),对我们分析程序很有好处. //by hfut yzk #include"stdafx.h" #include<iostream> #inclu

Apache压力(并发)测试工具ab的使用教程收集

说明:用ab的好处,在处理多并发的情况下不用自己写线程模拟. 官网: http://httpd.apache.org/(Apache服务器) http://httpd.apache.org/docs/2.0/programs/(Apache工具文档大全) http://httpd.apache.org/docs/2.0/programs/ab.html(文档教程) http://httpd.apache.org/docs/current/programs/ab.html(文档教程) 教程: 要对

使用ab 进行并发压力测试

ab全称为:apache bench. 是apache自带的压力测试工具.ab非常实用,它不仅可以对apache服务器进行网站访问压力测试,也可以对或其它类型的服务器进行压力测试.比如nginx.tomcat.IIS等. 安装ab命令: ubuntu: sudo apt-get install apache2-utils centos: yum install apr-util # 安装依赖 yum-utils中的yumdownload 工具 # 如果没有找到 yumdownload 命令可以

TestNG并发兼容性测试

Web测试项目中经常进行浏览器兼容性相关的测试工作,因为兼容性测试的工作重复性相当高,所以导致手工测试效率低下测试人员积极性降低.TestNG提供了并发执行测试用例的功能,可以让测试用例以并发的形式执行,实现测试不同浏览器的兼容性测试. 下面代码中分别使用Chrome.Firefox和IE浏览器,TestNG以并发方式去百度搜索"selenium"然后校验搜索结果. 测试类中代码: package com.selenium; import org.openqa.selenium.By;

高并发写测试悲观锁,乐观锁

源码地址 有纰漏,错误,欢迎指正,谢谢 JMeter测试工具 需要创建一个心的工程: 添加一个线程组-这里面设置秒级并发数: 添加一个请求-这里设置压力测试的接口:参数使用:${ }可以从csv文件中获取数据 请求头管理-添加需要修改的请求头信息: CSV文件-可以将请求的参数,以变量的形式,从csv文件中获取,模拟多种请求数据: 具体怎么用,百度 测试场景 测试场景为:请求对应商品,减少对应库存: 并发量:秒级1w请求: 不加锁 Controller就不写了,但是要测试,需要通过Request

Apache如何进行模拟高并发压力测试?

利用apache,自带的ab.exe软件就可以,在cmd中,首先输入ab.exe物理地址, 然后-c 高并发数量 -n访问请求次数 请求的页面的地址, 如-c 10 -n 20,就是-c10相当于10个客户端,每个访问页面两次,总共就是-n20次

CVR并发写入测试

测试环境: 主机:DELL Optiplex 790 系统:Windows7SP1 64bit CPU:i5-2500 3.3GB 4核 内存:8GB 硬盘:希捷 7200rpm 1TB(通电次数668,已使用5914小时) =========================================================== 写入时间(不含等待时间) 写入时间(含等待时间):