RxJava 并发之意外情况处理

Rx 尽量避免状态泄露到数据流之外的场景。但是有些东西本身就带有状态。比如服务器可以上线和离线、手机可以访问Wifi、按钮被按下了等。在 Rx 中国,我们在一段时间内看到这些事件,并称之为窗口(window)。其他事件在这个窗口内发生可能需要特殊处理。例如,手机在使用移动收费上网的时候,会把网络请求优先级降低,来避免天价流量费的情况。

注意:上面的一段话估计是翻译的,有点语句不通。更多参考官网:

http://www.introtorx.com/Content/v1.0.10621.0/17_SequencesOfCoincidence.html#SequencesOfCoincidence

窗口 Window

buffer 函数可以缓存多个数据并整体发射。 window 操作函数和 buffer 有一对一的关系。区别在于 window 不会整体返回缓存的数据。而是把缓存的数据当做一个新的 Observable 数据流来返回。这样当源 Observable 有数据发射了,这个数据就立刻发射到 window 返回的 Observable 里面了。

下图可以看到二者的区别:

window :

如果你还没有了解 buffer, 建议你到前面的章节下去看看 buffer。 buffer 和 window 的函数形式是一样的,功能也非常类似,并且易于理解。 buffer 都可以用 window 来实现其功能:

source.buffer(...)
// 和下面的是一样的功能
source.window(...).flatMap(w -> w.toList())

Window by count

窗口内可以限定数目。当窗口发射的数据达到了限定的数目,当前窗口的 Observable 就结束并开启一个新的窗口。

和buffer 一样, 使用 window(int count, int skip) 也可以跳过数据或者重叠使用数据。

Observable
    .merge(
        Observable.range(0, 5)
            .window(3,1))
    .subscribe(System.out::println);

结果:

0
1
1
2
2
2
3
3
3
4
4
4

可以看到当有数据重叠的时候, 多个 Observable 会返回同样的数据,可以把结果输出形式修改一下,方便查看:

Observable.range(0, 5)
    .window(3, 1)
    .flatMap(o -> o.toList())
    .subscribe(System.out::println);

结果:

[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4]
[4]

这样就可以看到 window 和 buffer 是非常类似的了。

Window by time

同样也可以指定窗口的时间长度:

public final Observable<Observable<T>> window(long timespan, long timeshift, java.util.concurrent.TimeUnit unit)
Observable.interval(100, TimeUnit.MILLISECONDS)
    .take(5)
    .window(250, 100, TimeUnit.MILLISECONDS)
    .flatMap(o -> o.toList())
    .subscribe(System.out::println);

结果:

[0, 1]
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4]
[4]

上面的示例中,每隔100ms开始一个新的窗口,每个窗口持续 250ms。 第一个窗口从 0ms 开始并捕获到数据 [0, 1](0 是在第100ms的时候发射的)。

Window with signal

同样也可以用另外一个信号 Observable当做窗口结束的信号。

信号 Observable 直接也可以相互传递事件。

下面是使用信号 Observable 实现的重叠窗口:

Observable.interval(100, TimeUnit.MILLISECONDS)
    .take(5)
    .window(
        Observable.interval(100, TimeUnit.MILLISECONDS),
        o -> Observable.timer(250, TimeUnit.MILLISECONDS))
    .flatMap(o -> o.toList())
    .subscribe(System.out::println);

结果:

[1, 2]
[2, 3]
[3, 4]
[4]
[]

注意上面的数字 0 没有捕获到,原因在于源 Observable 和 信号 Observable 都是在同一时刻发生的,但是在实际操作中并没有这种情况。所以当信号 Observable发射的时候, 数字 0 已经发射出去了。

Join

join 可以把两个数据流中的数据组合一起。 zip 函数根据数据发射的顺序来组合数据。 join 可以根据时间来组合。

public final <TRight,TLeftDuration,TRightDuration,R> Observable<R> join(
    Observable<TRight> right,
    Func1<T,Observable<TLeftDuration>> leftDurationSelector,
    Func1<TRight,Observable<TRightDuration>> rightDurationSelector,
    Func2<T,TRight,R> resultSelector)

join 组合的两个 Observable 被称之为 左和右。 上面的函数并不是静态的,调用该函数的 Observable就是 左 。参数中的 leftDurationSelector 和 rightDurationSelector 分别使用 左右发射的数据为参数,然后返回一个定义时间间隔的 Observable,和 window 的最后一个重载函数类似。这个时间间隔是用来选择里面发射的数据并组合一起。里面的数据会当做参数调用 resultSelector ,然后返回一个组合后的数据。然后组合后的数据由 join 返回的 Observable 发射出去。

join 比较难以理解以及强大之处就是如果选择组合的数据。当有数据在 源 Observable 中发射,就开始一个该数据的时间窗口。对应的时间间隔用来计时该数据的窗口何时结束。在时间窗口还没结束的时候,另外一个 Observable 发射的数据就和当前的数据组合一起。左右数据流的处理方式是一样的,所以为了简化介绍,我们假定只有一个 源 Observable 有时间窗口。

下面的示例中, 左Observable 数据流从来不结束而右Observable的时间窗口为 0.

Observable<String> left =
        Observable.interval(100, TimeUnit.MILLISECONDS)
            .map(i -> "L" + i);
Observable<String> right =
        Observable.interval(200, TimeUnit.MILLISECONDS)
            .map(i -> "R" + i);

left
    .join(
        right,
        i -> Observable.never(),
        i -> Observable.timer(0, TimeUnit.MILLISECONDS),
        (l,r) -> l + " - " + r
    )
    .take(10)
    .subscribe(System.out::println);

结果:

L0 - R0
L1 - R0
L0 - R1
L1 - R1
L2 - R1
L3 - R1
L0 - R2
L1 - R2
L2 - R2
L3 - R2

由于左边的 Observable 时间窗口是永久,这意味着左边每个发射的数据都会和右边的 数据组合。 当右边数据发射的比左边的慢一倍。所以当左边的数据发射了两个对应右边的同一个数据。然后右边发射下一个数据就开启了右边的一个新的时间窗口,然后左右的数据会从开始的数据和右边的新窗口中的数据组合。

下面示例把左右源 Observable 发射的间隔都设置为 100ms,然后把左时间窗口设置为 150ms:

Observable<String> left =
        Observable.interval(100, TimeUnit.MILLISECONDS)
            .map(i -> "L" + i);
Observable<String> right =
        Observable.interval(100, TimeUnit.MILLISECONDS)
            .map(i -> "R" + i);

left
    .join(
        right,
        i -> Observable.timer(150, TimeUnit.MILLISECONDS),
        i -> Observable.timer(0, TimeUnit.MILLISECONDS),
        (l,r) -> l + " - " + r
    )
    .take(10)
    .subscribe(System.out::println);

结果:

L0 - R0
L0 - R1
L1 - R1
L1 - R2
L2 - R2
L2 - R3
L3 - R3
L3 - R4
L4 - R4
L4 - R5

左右同时发射数据,所以左右同时开始第一个时间窗口,然后组合的数据为 “L0 – R0”。然后 左边的时间窗口继续,而右边发射新的数据 R1 则右边的数据R1和左边的 L0 组合 “L0 – R1”,然后过了 50ms 后, 左边的时间窗口结束了,开启下一个时间窗口,结果为 “L1 – R1”。 一直重复下去。

两个数据流都有时间窗口。每个数据流中的每个值按照如下方式组合:

  • 如果旧的数据时间窗口还没有结束,则和另外一个数据流中的每个旧的数据组合
  • 如果当前数据的时间窗口还没有结束,则和另外一个数据流中的每个新的数据组合。

groupJoin

只要检测到一个组合数据,join 就用两个数据调用 resultSelector 并发射返回的数据。 而 groupJoin 又有不同的功能:

public final <T2,D1,D2,R> Observable<R> groupJoin(
    Observable<T2> right,
    Func1<? super T,? extends Observable<D1>> leftDuration,
    Func1<? super T2,? extends Observable<D2>> rightDuration,
    Func2<? super T,? super Observable<T2>,? extends R> resultSelector)

除了 resultSelector 以外,其他参数和 join 函数的参数是一样的。这个 resultSelector 从左边的数据流中获取一个数据并从右边数据流中获取一个 Observable。这个 Observable 会发射和左边数据配对的所有数据。groupJoin 中的配对和 join 一样是对称的,但是结果是不一样的。可以把 resultSelect 实现为一个 GroupedObservable, 左边的数据当做 key,而把右边的数据发射出去。

还使用第一个 join的示例,左边的数据流的时间窗口重来不关闭:

Observable<String> left =
        Observable.interval(100, TimeUnit.MILLISECONDS)
            .map(i -> "L" + i)
            .take(6);
Observable<String> right =
        Observable.interval(200, TimeUnit.MILLISECONDS)
            .map(i -> "R" + i)
            .take(3);

left
    .groupJoin(
        right,
        i -> Observable.never(),
        i -> Observable.timer(0, TimeUnit.MILLISECONDS),
        (l, rs) -> rs.toList().subscribe(list -> System.out.println(l + ": " + list))
    )
    .subscribe();

结果:

L0: [R0, R1, R2]
L1: [R0, R1, R2]
L2: [R1, R2]
L3: [R1, R2]
L4: [R2]
L5: [R2]

上面的 示例和 jion 中的示例数据配对是一样的,只是 resultSelector 不一样导致输出的结果不一样。

使用 groupJoin 和 flatMap 可以实现 jion的 功能:

.join(
    right,
    leftDuration
    rightDuration,
    (l,r) -> joinResultSelector(l,r)
)
// 和下面的一样
.groupJoin(
    right,
    leftDuration
    rightDuration,
    (l, rs) -> rs.map(r -> joinResultSelector(l,r))
)
.flatMap(i -> i)

通过 join 和 groupBy 也可以实现 groupJoin。在示例代码中有这个实现,感兴趣的可以去看看。

本文出自 云在千峰 http://blog.chengyunfeng.com/?p=980

时间: 2024-11-05 22:52:38

RxJava 并发之意外情况处理的相关文章

Matlab 读取excel文件提示服务器出现意外情况或无法读取问题解决

1.问题描述: 该错误通常发生在应用函数读取excel文件(后缀xls或xlsx)时. 应用xlsread函数读取提示服务器出现意外情况: 应用importdata读取时提示can‘t open file: 其他格式如txt文档则可以正常读取. 2.问题解决 该问题与Matlab无关,为Excel端出现了问题.解决方案如下: 步骤:Excel选项——加载项——Com加载项 :去掉VisualStudion相关勾选. 去掉勾选的时候还可能出现“无法更改 中注册的office 加载项的连接状态”的提

如何写一个正经的音乐播放器 四 意外情况

四,意外情况的控制. 在音频播放时候,容易遇到一些意外情况,这时候,我们就要处理这些意外情况,这时候,我们需要针对不同的意外情况进行处理.大概可以分成两种情况. 1,失去audio_focus的控制. 造成我们的播放器失去焦点的情况很多,主要是其他声音请求了焦点,例如说,其他音乐播放器开始播放音乐,突然来电,短息等. 以上的焦点失去,都可以用AudioManager.OnAudioFocusChangeListener中的回调来处理.先取得AudioManager AudioManager au

字符串作为函数模版实参的意外情况

有时,当把c风格的不同字符串去实例化函数模版的同一个模版参数时,在实参演绎的过程中经常会发生 意想不到的事情,那就是编译失败,并报错类型不匹配. 正如下面的例子一样: #include<iostream> using namespace std; /* *匹配测试 */ template<typename T> int ref_fun(T & t1,T & t2) { return strlen(t1) - strlen(t2); } template<typ

数组作为方法参数时的一些意外情况

数组作为方法参数时的一些意外情况 今天在刷题时,使用数组作为方法的参数,出现了一些意外情况. 1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 6 namespace MyArray 7 { 8 class Program 9 { 10 static void Main(string[] args) 11 { 12 int[] nums = { 1, 2,

字符串作为函数模版实參的意外情况

有时,当把c风格的不同字符串去实例化函数模版的同一个模版參数时,在实參演绎的过程中常常会发生 意想不到的事情,那就是编译失败,并报错类型不匹配. 正如以下的样例一样: #include<iostream> using namespace std; /* *匹配測试 */ template<typename T> int ref_fun(T & t1,T & t2) { return strlen(t1) - strlen(t2); } template<typ

TCP三次握手、四次挥手出现意外情况时,为保证稳定,是如何处理的?

一. 序当我们聊到 TCP 协议的时候,聊的最多的就是三次握手与四次挥手.但是大部分资料和文章,写的都是正常的情况下的流程.但是你有没有想过,三次握手或者四次挥手时,如果发生异常了,是如何处理的?又是由谁来处理? TCP 作为一个靠谱的协议,在传输数据的前后,需要在双端之间建立连接,并在双端各自维护连接的状态.TCP 并没有什么特别之处,在面对多变的网络情况,也只能通过不断的重传和各种算法来保证可靠性. 建立连接前,TCP 会通过三次握手来保证双端状态正确,然后就可以正常传输数据了.当数据传输完

RxJava 并发之线程调度

由于 Rx 目标是用在异步系统上并且 Rx 支持多线程处理,所以很多 Rx 开发者认为默认情况下 Rx 就是多线程的. 其实实际情况不是这样的,Rx 默认是单线程的. 除非你明确的指定线程,否则所有 onNext/onError/onCompleted 以及各个操作函数的调用都是在同一个线程中完成的.例如下面的示例: final BehaviorSubject<Integer> subject = BehaviorSubject.create(); subject.subscribe(i -&

RxJava 并发之数据流发射太快如何办

Backpressure Rx 中的数据流是从一个地方发射到另外一个地方.每个地方处理数据的速度是不一样的.如果生产者发射数据的速度比消费者处理的快会出现什么情况?在同步操作中,这不是个问题,例如: // Produce Observable<Integer> producer = Observable.create(o -> { o.onNext(1); o.onNext(2); o.onCompleted(); }); // Consume producer.subscribe(i

军规3 关注多任务和意外情况处理

在购物的APP中填写信息,比如说收货地址的时候,忘记了具体地址,然后切换出去到"印象比较"之类的记录APP中查找地址,复制下来,再切换回购物的APP时会发现,刚才填写的信息都不见了,还得手动再输一遍,这样就会觉得APP的功能和体验很差. 这其实就是没有处理好多任务时APP的表现. 在使用智能机的时候,经常会同时运行多个程序,这就要求测试人员考虑到APP被别的程序或者用户切换到后台,需要进行什么操作. 3.1 第一个场景 典型场景就是使用APP过程中接听一个来电,APP该如何处理. 不同