RxJava 并发之数据流发射太快如何办

Backpressure

Rx 中的数据流是从一个地方发射到另外一个地方。每个地方处理数据的速度是不一样的。如果生产者发射数据的速度比消费者处理的快会出现什么情况?在同步操作中,这不是个问题,例如:

// Produce
Observable<Integer> producer = Observable.create(o -> {
    o.onNext(1);
    o.onNext(2);
    o.onCompleted();
});
// Consume
producer.subscribe(i -> {
    try {
        Thread.sleep(1000);
        System.out.println(i);
    } catch (Exception e) { }
});

虽然上面的消费者处理数据的速度慢,但是由于是同步调用的,所以当 o.onNext(1) 执行后,一直阻塞到消费者处理完才执行 o.onNext(2)。

但是生产者和消费者异步处理的情况很常见。如果是在异步的情况下会出现什么情况呢?

在传统的 pull 模型中,当消费者请求数据的时候,如果生产者比较慢,则消费者会阻塞等待。如果生产者比较快,则生产者会等待消费者处理完后再生产新的数据。

而 Rx 为 push 模型。 在 Rx 中,只要生产者数据好了就发射出去了。如果生产者比较慢,则消费者就会等待新的数据到来。如果生产者快,则就会有很多数据发射给消费者,而不管消费者当前有没有能力处理数据。这样会导致一个问题,例如:

Observable.interval(1, TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.newThread())
    .subscribe(
        i -> {
            System.out.println(i);
            try {
                Thread.sleep(100);
            } catch (Exception e) { }
        },
        System.out::println);

结果:

0
1
rx.exceptions.MissingBackpressureException

上面的 MissingBackpressureException 告诉我们,生产者太快了,我们的操作函数无法处理这种情况。

消费者的补救措施

有些操作函数可以减少发送给消费者的数据。

过滤数据

sample 操作函数可以指定生产者发射数据的最大速度,多余的数据被丢弃了。

Observable.interval(1, TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.newThread())
    .sample(100, TimeUnit.MILLISECONDS)
    .subscribe(
        i -> {
            System.out.println(i);
            try {
                Thread.sleep(100);
            } catch (Exception e) { }
        },
        System.out::println);

结果:

82
182
283
...

throttle 和 Debounce 也能实现类似的效果。

Collect

如果你不想丢弃数据,则当消费者忙的时候可以使用 buffer 和 window 操作函数来收集数据。如果批量处理数据速度比较快,则可以使用这种方式。

Observable.interval(10, TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.newThread())
    .buffer(100, TimeUnit.MILLISECONDS)
    .subscribe(
        i -> {
            System.out.println(i);
            try {
                Thread.sleep(100);
            } catch (Exception e) { }
        },
        System.out::println);

结果:

[0, 1, 2, 3, 4, 5, 6, 7]
[8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
[18, 19, 20, 21, 22, 23, 24, 25, 26, 27]
...

Reactive pull

上面的方式有时候可以解决问题,但是并不是 Rx 中最好的处理方式。有时候在 生产者这里处理可能是最好的情况。Backpressure 是一种用来在生产者端降低发射速度的方式。

RxJava 实现了一种通过 Subscriber 来通知 Observable 发射数据的方式。Subscriber 有个函数 request(n),调用该函数用来通知 Observable 现在 Subscriber 准备接受下面 n 个数据了。在 Subscriber 的 onStart 函数里面调用 request 函数则就开启了reactive pull backpressure。这并不是传统的 pull 模型,并不会阻塞调用。只是 Subscriber 通知 Observable 当前 Subscriber 的处理能力。 通过调用 request 可以发射更多的数据。

class MySubscriber extends Subscriber<T> {
    @Override
    public void onStart() {
      request(1);
    }

    @Override
    public void onCompleted() {
        ...
    }

    @Override
    public void onError(Throwable e) {
        ...
    }

    @Override
    public void onNext(T n) {
        ...
        request(1);
    }
}

在 onStart 函数中调用 request(1) 开启了 backpressure 模式,告诉 Observable 一次只发射一个数据。在 onNext 里面处理完该数据后,可以请求下一个数据。通过 quest(Long.MAX_VALUE) 可以取消 backpressure 模式。

doOnRequested

在副作用一节讨论 doOn_ 函数的时候,我们没有讨论 doOnRequested 这个函数:

public final Observable<T> doOnRequest(Action1<java.lang.Long> onRequest)

当 Subscriber 请求更多的时候的时候, doOnRequest 就会被调用。参数中的值为请求的数量。

当前 doOnRequest 还是一个 beta 测试版本的 api。 所以在开发过程中尽量避免使用。下面来演示一下这个 api:

Observable.range(0, 3)
    .doOnRequest(i -> System.out.println("Requested " + i))
    .subscribe(System.out::println);

结果:

Requested 9223372036854775807
0
1
2

可以看到 subscriber 在开始的时候,请求了最大数量的数据。这意味着没有使用 backpressure 模型。只有当一个 Subscriber 实现了 backpressure 的时候,Subscribe 才能使用该功能。下面是一个在外部实现 控制backpressure 的示例:

public class ControlledPullSubscriber<T> extends Subscriber<T> {

    private final Action1<T> onNextAction;
    private final Action1<Throwable> onErrorAction;
    private final Action0 onCompletedAction;

    public ControlledPullSubscriber(
            Action1<T> onNextAction,
            Action1<Throwable> onErrorAction,
            Action0 onCompletedAction) {
        this.onNextAction = onNextAction;
        this.onErrorAction = onErrorAction;
        this.onCompletedAction = onCompletedAction;
    }

    public ControlledPullSubscriber(
            Action1<T> onNextAction,
            Action1<Throwable> onErrorAction) {
        this(onNextAction, onErrorAction, () -> {});
    }

    public ControlledPullSubscriber(Action1<T> onNextAction) {
        this(onNextAction, e -> {}, () -> {});
    }

    @Override
    public void onStart() {
      request(0);
    }

    @Override
    public void onCompleted() {
        onCompletedAction.call();
    }

    @Override
    public void onError(Throwable e) {
        onErrorAction.call(e);
    }

    @Override
    public void onNext(T t) {
        onNextAction.call(t);
    }

    public void requestMore(int n) {
        request(n);
    }
}

上面的实现中,如果不主动调用 requestMore 函数,则 Observable 是不会发射数据的。

ControlledPullSubscriber<Integer> puller =
        new ControlledPullSubscriber<Integer>(System.out::println);

Observable.range(0, 3)
    .doOnRequest(i -> System.out.println("Requested " + i))
    .subscribe(puller);

puller.requestMore(2);
puller.requestMore(1);

结果:

Requested 0
Requested 2
0
1
Requested 1
2

ControlledPullSubscriber 在onStart 中告诉 Observable 先不要发射数据。然后我们分别请求 2个数据和1 个数据。

Rx 操作函数内部使用队列和缓冲来实现 backpressure ,从而避免保存无限量的数据。大量数据的缓冲应该使用专门的操作函数来处理,例如:cache、buffer 等。 zip 函数就是一个示例,第一个 Observable 可能在第二个 Observable 发射数据之前就发射了一个或者多个数据。所以 zip 需要一个较小的缓冲来匹配两个 Observable,从而避免操作失败。因此, zip 内部使用了一个 128 个数据的小缓冲。

Observable.range(0, 300)
    .doOnRequest(i -> System.out.println("Requested " + i))
    .zipWith(
            Observable.range(10, 300),
            (i1, i2) -> i1 + " - " + i2)
    .take(300)
    .subscribe();

结果:

Requested 128
Requested 90
Requested 90
Requested 90

zip 操作函数一开始请求足够(128)的数据来填充缓冲并处理这些数据。这里 zip 操作函数具体缓冲的数据并不是主要的。读者应该记住,在 Rx 中不管开发者有没有主动启用该功能,有些操作函数内部会使用该功能。这样可以保证 Rx 数据流更加稳定可扩展。

Backpressure 策略

很多 Rx 操作函数内部都使用了 backpressure 从而避免过多的数据填满内部的队列。这样处理慢的消费者就会把这种情况传递给前面的消费者,前面的消费者开始缓冲数据直到他也缓存满为止再告诉他前面的消费者。Backpressure 并没有消除这种情况。只是让错误延迟发生,我们还是需要处理这种情况。

Rx 中有操作函数可以用来处理这种消费者处理不过来的情况。

onBackpressureBuffer

onBackpressureBuffer 会缓存所有当前无法消费的数据,直到 Observer 可以处理为止。

你可以指定缓冲的数量,如果缓冲满了则会导致数据流失败。

Observable.interval(1, TimeUnit.MILLISECONDS)
    .onBackpressureBuffer(1000)
    .observeOn(Schedulers.newThread())
    .subscribe(
        i -> {
            System.out.println(i);
            try {
                Thread.sleep(100);
            } catch (Exception e) { }
        },
        System.out::println
    );

结果:

0
1
2
3
4
5
6
7
8
9
10
11
rx.exceptions.MissingBackpressureException: Overflowed buffer of 1000

上面的示例,生产者比消费者快 100 倍。使用 1000个缓冲来处理这种消费者比较慢的情况。当消费者消费 11个数据的时候,缓冲区满了,生产者生产了 1100个数据。数据流就抛出异常了。

onBackpressureDrop

如果消费者无法处理数据,则 onBackpressureDrop 就把该数据丢弃了。

Observable.interval(1, TimeUnit.MILLISECONDS)
    .onBackpressureDrop()
    .observeOn(Schedulers.newThread())
    .subscribe(
        i -> {
            System.out.println(i);
            try {
                Thread.sleep(100);
            } catch (Exception e) { }
        },
        System.out::println);

结果:

0
1
2
...
126
127
12861
12862
...

这个示例中,前面 128 个数据正常的被处理的,这是应为 observeOn 在切换线程的时候, 使用了一个 128 个数据的小缓冲。

本文出自 云在千峰 http://blog.chengyunfeng.com/?p=981

时间: 2024-07-29 07:58:44

RxJava 并发之数据流发射太快如何办的相关文章

冥想:节奏太快,给心灵一点歇息的时间

http://www.nowamagic.net/librarys/eight/posts/2461开始冥想是半年前,因为一个朋友很相信这方面东西,受他影响,后来慢慢尝试,感觉效果非常的好,喜欢上了冥想,也感受它带来的好的影响.之前到现在都没有看过任何过于冥想的书,所以只能凭自己半年来的经验小分享下... 为什么要冥想呢?首先,我觉得冥想带来的好处是,让自己的生活节奏慢下来,缓解生活的压力,多一些思考的时间.现在人的生活节奏太快,不停的往上赶,没有有效放松和释放压力的方式,累了想到的放松方式就是

在嵌入式学习过程中的困惑——有人说软件技术变化太快,现在学的东西过两年就要完全淘汰,是吗?

有人说软件技术变化太快,现在学的东西过两年就要完全淘汰,是吗? 你知道这话是什么人说的吗?必然是已经被淘汰的人说的.比如Delphi.BCB.PB这些开发工具和语言,都曾经很是风光了一阵,但现在已经完全被Java和.NET取代了.那么Java和.NET会不会被取代呢,也许在相当长的时间内还不会. 这些被淘汰的程序员有一些共同的特点:只会用鼠标拖拽控件,离开IDE就不知道如何工作,学点儿花拳绣腿的功夫就想吃一辈子,对学习新技术不感兴趣,做一天和尚撞一天钟.现在请回头看看1,在整个软件技术领域,这些

玩法变化太快?今天我来告诉你App推广怎么玩

无论你是刚刚进入移动互联网推广的小菜鸟,还是已经在这个行业摸爬滚打数年的老兵.无论哪个阶段你都会碰到各种各样的棘手的问题.从一开始的APP怎么做分发?到最后的一个月上千万的预算怎么去花.这些问题无时无刻的都困扰着移动互联网市场从业者的脑边.更重要的是App推广的玩法变得太快,从一开始的应用商店到积分墙再到现在的信息流广告,短短一两年你会发现以前的经验现在已经用不上了."不进则退"是移动互联网推广从业者必须深刻认识到的一个问题. APP推广到底是干什么的?是应用商店的渠道推广?还是媒介广

RxJava 并发之线程调度

由于 Rx 目标是用在异步系统上并且 Rx 支持多线程处理,所以很多 Rx 开发者认为默认情况下 Rx 就是多线程的. 其实实际情况不是这样的,Rx 默认是单线程的. 除非你明确的指定线程,否则所有 onNext/onError/onCompleted 以及各个操作函数的调用都是在同一个线程中完成的.例如下面的示例: final BehaviorSubject<Integer> subject = BehaviorSubject.create(); subject.subscribe(i -&

RxJava 并发之意外情况处理

Rx 尽量避免状态泄露到数据流之外的场景.但是有些东西本身就带有状态.比如服务器可以上线和离线.手机可以访问Wifi.按钮被按下了等.在 Rx 中国,我们在一段时间内看到这些事件,并称之为窗口(window).其他事件在这个窗口内发生可能需要特殊处理.例如,手机在使用移动收费上网的时候,会把网络请求优先级降低,来避免天价流量费的情况. 注意:上面的一段话估计是翻译的,有点语句不通.更多参考官网: http://www.introtorx.com/Content/v1.0.10621.0/17_S

互联网Facebook使用人数增长太快

——中国仍被本土社交网络主导,Google+是所有海外社交网络中排名第一的服务,Facebook位居第二. ——固然Tumblr和Pinterest好像实现了最大增长,但在使用频率上好像并没有那么高. ——跟着我们不断通过移动设备使用Facebook和Twitter等服务,这两款产品的移动广告营收都已经超过桌面广告,而移动设备也成为社交网络领域的重要动力.但总体而言,其使用量尚未超过台式机和笔记本.尽管在大屏幕上使用社交网络的比例正在收缩,但仍有六成用户通过台式机或笔记本使用这类服务,移动平台的

ReactNative如何在JS中引用原生自定义控件(rn变化太快,网上很多教程有坑,这个我研究后可用,特意分享)

直接写一个Demo例子,有相关功底的肯定明白,会对特别的地方进行提醒,本文基于https://blog.csdn.net/lintcgirl/article/details/53489490,但是按此链接文章不可用. 首先是JAVA部分: 1 import com.facebook.react.ReactActivity; 2 3 public class MainActivity extends ReactActivity { 4 5 /** 6 * Returns the name of t

发现一个国内牛逼的maven仓库,速度真的太快了

前天网上下了一个项目,在公司还好,网络比较流畅,很快就把依赖下好了:回家的时候,想耍耍,结果下了一天也没把依赖下好,速度是几k每秒,甚至一k每秒,哎~心都碎了,网上一搜,结果发现了一个惊天的用nexus搭建的maven私服,阿里云的,那下载速度真是杠杠的: 配置很简单,修改下conf下的settings.xml文件,添加如下镜像配置: <mirrors> <mirror> <id>alimaven</id> <name>aliyun maven&

c# Random太快产生的随机数会重复

c# Random快速连续产生相同随机数的解决方案 Random类是一个产生伪随机数字的类,它的构造函数有两种,一个是直接New Random(),另外一个是New Random(Int32),前者是根据触发那刻的系统时间做为种子,来产生一个随机数字,后者可以自己设定触发的种子,一般都是用UnCheck((Int)DateTime.Now.Ticks)做为参数种子,因此如果计算机运行速度很快,如果触发Randm函数间隔时间很短,就有可能造成产生一样的随机数,因为伪随机的数字,在Random的内部