RxJava 2 0中backpressure 背压 概念的理解

英文原文:https://github.com/ReactiveX/RxJava/wiki/Backpressure

Backpressure(背压、反压力)

在rxjava中会经常遇到一种情况就是被观察者发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息。那么随之而来的就是如何处理这些未处理的消息。

举个例子,使用zip操作符将两个无限大的Observable压缩在一起,其中一个被观察者发送消息的速度是另一个的两倍。一个比较不靠谱的做法就是把发送比较快的消息缓存起来,当比较慢的Observable发送消息的时候取出来并将他们结合在一起。这样做就使得rxjava变得笨重而且十分占用系统资源。

在rxjava中有多重控制流以及背压(backpressure)策略用来应对当一个快速发送消息的被观察者遇到一个处理消息缓慢的观察者。下面的解释将会向你展示你应当怎么设计属于你自己的被观察者和操作符去应对流量控制(flow control)。

Hot and cold Observables, and multicasted Observables

Observable 数据流有两种类型:hot 和 cold。这两种类型有很大的不同。本节介绍他们的区别,以及作为 Rx 开发者应该如何正确的使用他们。

Cold observables

只有当有订阅者订阅的时候, Cold Observable 才开始执行发射数据流的代码。并且每个订阅者订阅的时候都独立的执行一遍数据流代码。 Observable.interval 就是一个 Cold Observable。每一个订阅者都会独立的收到他们的数据流。

我们经常用到的Observable.create 就是 Cold Observable,而 just, range, timer 和 from 这些创建的同样是 Cold Observable。

Hot observables

Hot observable 不管有没有订阅者订阅,他们创建后就开发发射数据流。 一个比较好的示例就是 鼠标事件。 不管系统有没有订阅者监听鼠标事件,鼠标事件一直在发生,当有订阅者订阅后,从订阅后的事件开始发送给这个订阅者,之前的事件这个订阅者是接受不到的;如果订阅者取消订阅了,鼠标事件依然继续发射。

了解更多Hot and cold Observables,参考:

http://blog.csdn.net/jdsjlzx/article/details/51839090

当一个cold observable是multicast(多路广播)(当转换完成时或者方法被调用)的时候,为了应对背压,应当把cold observable转换成hot observable。

cold observable 相当于响应式拉(就是observer处理完了一个事件就从observable拉取下一个事件),hot observable通常不能很好的处理响应式拉模型,但它却是处理流量控制问题的不二候选人,例如使用onBackpressureBuffer或者onBackpressureDrop 操作符,和其他操作符比如operators, throttling, buffers, or windows.

此段过于抽象,特提供原文如下,如有好的翻译建议请提出。

Cold Observables are ideal for the reactive pull model of backpressure described below. Hot Observables typically do not cope well with a reactive pull model, and are better candidates for some of the other flow control strategies discussed on this page, such as the use of the onBackpressureBuffer or onBackpressureDrop operators, throttling, buffers, or windows.

能避免背压问题的运算符

防止过度创建observable的第一道防线就是使用普通数组去减少observable发送消息的数量,在这一节会使用一些操作符去应对突发的observable发送爆发性数据(一会没有,一会很多)就像下面的这张图片所示:

这些操作符可以通过微调参数确保slow-consuming观察者不被生产可观测的。

Throttling节流

操作符中比如 sample(?) 、 throttleLast(?)、 throttleFirst(?)、 throttleWithTimeout(?) 、 debounce(?) 允许你通过调节速率来改变Observable发射消息的速度。

以下图表展示如何使用这些操作符。

样本 (或 throttleLast)

sample 操作符定期收集observable发送的数据items,并发射出最后一个数据item。

Observable<Integer> burstySampled = bursty.sample(500, TimeUnit.MILLISECONDS);
  • 1

上面代码解释,定期且一次收集5个item,发射出最后一个item。

官网解释:http://reactivex.io/documentation/operators/sample.html

throttleFirst

跟sample有点类似,但是并不是把观测到的最后一个item发送出去,而是把该时间段第一个item发送出去。

Observable<Integer> burstyThrottled = bursty.throttleFirst(500, TimeUnit.MILLISECONDS);
  • 1

debounce (or throttleWithTimeout)

debounce操作符会只发送两个在规定间隔内的时间发送的序列的最后一个。

Observable<Integer> burstyDebounced = bursty.debounce(10, TimeUnit.MILLISECONDS);
  • 1

Buffers and windows 缓冲区和窗口

可以使用操作符比如buffer(?) 或者window(?) 收集过度生成消息的Observable的数据items,然后发射出较少使用的数据。缓慢的消费者可以决定是否处理每个集合中的某一个特定的项目,或处理集合中的某种组合,或为集合中的每一项预定计划工作,这都要视情况处理。

以下图表展示如何使用这些操作符。

buffer

你可以定期关闭并释放突发性的 Observable 缓冲区。

Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
  • 1

在突发期间你可以得到的想要的,并在缓冲区收集数据和最终在突发结束的时候释放缓存。使用debounce操作符释放缓存并关闭指示器buffer操作符。

此段超过本人翻译水平,特提供原文如下,如有好的翻译建议请提出。

Or you could get fancy, and collect items in buffers during the bursty periods and emit them at the end of each burst, by using the debounce operator to emit a buffer closing indicator to the buffer operator:

使用线程阻塞

处理过快生产item的其他策略就是使用线程阻塞,但是这么做违背了响应式设计和非阻塞模型设计,但是它的确是一个可行的选择。在rxJava中并没有操作符可以做到这一点。

如果observable发送消息,subscriber消耗消息都是在同一个线程这将很好的处理这个问题,但是你要知道,在rxJava中,很多时候生产者和消费者都不在同一个线程。

如何建立“响应式拉动(reactive pull)”backpressure

当subscribe订阅observable的时候可以通过调用subscribe.request(n),n是你想要的observable发送出来的量。

当在onNext()方法里处理完数据itme后,你能重新调用 request()方法,通知Observable发射数据items。下面是个例子。

someObservable.subscribe(new Subscriber<t>() {
    @Override
    public void onStart() {
      request(1);
    }

    @Override
    public void onCompleted() {
      // gracefully handle sequence-complete
    }

    @Override
    public void onError(Throwable e) {
      // gracefully handle error
    }

    @Override
    public void onNext(t n) {
      // do something with the emitted item "n"
      // request another item:
      request(1);
    }
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

你可以通过一个神奇数字request, request(Long.MAX_VALUE),禁用反应拉背力和要求Observable按照自己的步伐发射数据。request(0)是一个合法的调用,但没有奏效。请求值小于零的请求会导致抛出一个异常。

Reactive pull backpressure isn’t magic

backpressure 不会使得过度生产的observable的问题消失,这只是提供了一种更好的解决问题的方法。 让我们更仔细的研究刚刚说到的zip操作符的问题。

这里有两个observable,a和b,b发射item比a更加的频繁,当你想zip这两个observable的时候,你需要把a发送出来的第n个和b发送出来的第n个对象处理,然而由于b发送出来的速率更快,这时候b已经发送出了n+1~n+m个消息了,这时候你要想要把a的n+1~n+m个消息结合的话,就必须持有b已经发送出来的n+1~n+m消息,同时,这意味着缓存的数量在不断的增长。

当然你可以给b添加操作符throttling,但是这意味着你将丢失某些从b发送出来的项,你真正想要做的其实就是告诉b:“b你需要慢下来,但是你要保持你给我的数据是完整的”。

响应式拉(reective pull)模型可以当你做到这一点,subscriber从observable那里拉取数据,这比较通常在observable那里推送数据这种模式形成鲜明的对比。

在rxJava中,zip操作符正是使用了这种技巧。它给每个源observable维护了一个小的缓存池,当它的缓存池满了以后,它将不会从源observable那里拉取item。每当zip发送一个item的时候,他从它的缓存池里面移除相应的项,并从源observable那里拉取下一个项。

在rxJava中,很多操作符都使用了这种模式(响应式拉),但是有的操作符并没有使用这种模式,因为他们也许执行的操作跟源observable处于相同的进程。在这种情况下,由于消耗事件会阻塞本进程,所以这一项的工作完成后,才有机会收到下一项。还有另外一种情况,backpressure也是不适合的,因为他们有指定的其他方式去处理流量控制,这些特殊的情况在rxJava的java文档里面都会有详细说明为毛。

但是,observable a和b必须正确的响应request()方法,如果一个observable还没有被支持响应式拉(并不是每个observable都会支持),你可以采取以下其中一种操作都可以达到backpressure的行为:

onBackpressurebuffer

给observable发送出来的数据持有一个缓存,当request方法被调用的时候,给下层流发送一个item。

这个操作符还有一个实验性的版本允许去设置这个缓存池的大小,但当缓存池满了以后将会终止执行并抛出异常。

onBackpressureDrop

命令observable丢弃后来的事件,直到subscriber再次调用request(n)方法的时候,就发送给它的subscriber调用时间以后的n个事件。

onBackpressureBlock (实验性的, not in RxJava 1.0)

源Observable的线程操作直到Subscriber发出请求,然后只要有挂起的请求就结束线程。

如果你不允许这些操作符操作不支持背压的Observable,或者Subscriber或一些操作符尝试申请活性拉反压力,你会遇到一个MissingBackpressureException,你将被告知通过onError()进行回调。


Flowable与Observable

最后,为了大家更好的理解backpressure概念,这里补充说一下Flowable。

Observable在RxJava2.0中新的实现叫做Flowable, 同时旧的Observable也保留了。因为在 RxJava1.x 中,有很多事件不被能正确的背压,从而抛出MissingBackpressureException。

举个简单的例子,在 RxJava1.x 中的 observeOn, 因为是切换了消费者的线程,因此内部实现用队列存储事件。在 Android 中默认的 buffersize 大小是16,因此当消费比生产慢时, 队列中的数目积累到超过16个,就会抛出MissingBackpressureException, 初学者很难明白为什么会这样,使得学习曲线异常得陡峭。

而在2.0 中,Observable 不再支持背压,而Flowable 支持非阻塞式的背压。Flowable是RxJava2.0中专门用于应对背压(Backpressure)问题。所谓背压,即生产者的速度大于消费者的速度带来的问题,比如在Android中常见的点击事件,点击过快则经常会造成点击两次的效果。其中,Flowable默认队列大小为128。并且规范要求,所有的操作符强制支持背压。幸运的是, Flowable 中的操作符大多与旧有的 Observable 类似。

再分享一下我老师大神的人工智能教程吧。零基础!通俗易懂!风趣幽默!还带黄段子!希望你也加入到我们人工智能的队伍中来!https://blog.csdn.net/jiangjunshow

原文地址:https://www.cnblogs.com/skiwnchhw/p/10349260.html

时间: 2024-08-27 17:42:50

RxJava 2 0中backpressure 背压 概念的理解的相关文章

iOS项目中工作空间Workspaces概念的理解

我在之前的一篇博客中讨论过Eclipse和Android Studio的工作空间问题,并做了一定的区别.其实只要理解并会使用前两者,Xcode中的工作空间也就不难理解了.我们通过实际的操作步骤来理解Xcode的工作空间. (1)在Xcode中点击:File-->New-->Workspace.   在Save as中输入工作空间的名称.并选择存放的目录: . (2)去刚才的目录下查看,发现多了一个MyApplication.xcworkspace文件.该文件就是工作空间.先关闭Xcode,双击

面向对象中的一些概念的理解

今天回顾的内容,可能相对于前面的内容来说在理解上需要花费一点功夫,但面向对象也是属于打基础的部分,学好虽然不能够加分,但是没有掌握到这方面的知识,在后续的基础学习中,会有一些问题.那,我们开始回顾今天的内容吧!! 1. 构造方法 特点:与类同名而没有返回值类型 作用:创建对象 package cn.tedu.object; public class PersonDemo { public static void main(String[] args) { //构造方法 --- 创建对象的 //调

关于数字信号处理中的一些概念的理解

1,卷积:卷积的时域解释可类比为摔跤后疼痛感的持续,不同时刻的输入x(m)都对输出有影响,影响的大小取决于m时刻后的影响因子h(n-m),则此时(n时刻)的输出受m时刻的影响为x(m)*h(n-m),再考虑其他时刻的影响,则卷积公式得出. 从频域理解的话就是系统输出的傅里叶变换=输入的傅里叶变换*频率响应因子. 2,傅里叶变换:个人理解所谓的傅里叶变换就是通过数学上的累加将时间因子消去只留下频率因子的结果. 3,数字频率,模拟频率,采样频率的关系:w=2pi*f/fs;其中w是数字频率,f是模拟

OSPF部分概念个人理解

以下为个人在学习OSPF过程中对这些概念的理解,有不同理解大家互相沟通交流! 1.邻居关系和邻接关系有什么区别? 邻接关系一定是邻居关系,但邻居关系不一定是邻接关系. BMA环境下(在以太网环境.同一个广播域内),两个DRother路由器之间的状态即为邻居状态(two-way) DR.BDR之间:DR和BDR与DRother之间建立的均为邻接关系(Full). 2.OSPF支持的网络类型有哪些? P2P(点到点网络):eg:PPP.HDLC P2MP(非广播点到多点网络):eg:不完全连接的帧中

RxJava 2.0还没熟悉,RxJava 3.0说来就来了!(基础篇)

前言 每个Android开发者,都是爱RxJava的,简洁线程切换和多网络请求合并,再配合Retrofit,简直是APP开发的福音.不知不觉,RxJava一路走来,已经更新到第三大版本了.不像RxJava 2对RxJava 1那么残忍,RxJava 3对RxJava 2的兼容性还是挺好的,目前并没有做出很大的更改.RxJava2到2020年12月31号不再提供支持,错误的会同时在2.x和3.x修复,但新功能只会在3.x上添加. 同时,希望通过本文,能知道垃圾箱颜色分类. 作为尝鲜,赶紧品尝吧.

Android RxJava使用介绍(一)概念

RxJava到底是什么?使用RxJava到底有什么好处呢?其实RxJava是ReactiveX中使用Java语言实现的版本,目前ReactiveX已经实现的语言版本有: Java: RxJava JavaScript: RxJS C#: Rx.NET C#(Unity): UniRx Scala: RxScala Clojure: RxClojure C++: RxCpp Ruby: Rx.rb Python: RxPY Groovy: RxGroovy JRuby:RxJRuby Kotlin

Apache Storm 1.1.0 中文文档 | ApacheCN

前言  Apache Storm 是一个免费的,开源的,分布式的实时计算系统. 官方文档: http://storm.apache.org 中文文档: http://storm.apachecn.org ApacheCN 最近组织了翻译 Storm 1.1.0 中文文档 的活动,整体 翻译进度 为 96%. 感谢大家参与到该活动中来 感谢无私奉献的 贡献者,才有了这份 Storm 1.1.0 中文文档 感谢一路有你的陪伴,我们才可以做的更好,走的更快,走的更远,我们一直在努力 ... 网页地址:

在ASP.NET Core 2.0中使用CookieAuthentication

在ASP.NET Core中关于Security有两个容易混淆的概念一个是Authentication(认证),一个是Authorization(授权).而前者是确定用户是谁的过程,后者是围绕着他们允许做什么,今天的主题就是关于在ASP.NET Core 2.0中如何使用CookieAuthentication认证. 在ASP.NET Core 2.0中使用CookieAuthentication跟在1.0中有些不同,需要在ConfigureServices和Configure中分别设置,前者我

说说ASP.Net Core 2.0中的Razor Page

随着.net core2.0的发布,我们可以创建2.0的web应用了.2.0中新东西的出现,会让我们忘记老的东西,他就是Razor Page.下面的这篇博客将会介绍ASP.Net Core 2.0中的Razor Page. 在ASP.Net Core 2.0新特点之一就是支持Razor Page.今天的Razor Page是ASP.Net Core MVC中的一个子集.ASP.Net Core MVC 支持Razor Page意味着Razor Page应用从技术上来说就是MVC应用,同时Razo