Flink如何应对背压问题

经常有人会问Flink如何处理背压问题。其实,答案很简单:Flink没用使用任何通用方案来解决这个问题,因为那根本不需要那样的方案。它利用自身作为一个纯数据流引擎的优势来优雅地响应背压问题。这篇文章,我们将介绍背压问题,然后我们将深挖Flink的运行时如何在task之间传输数据缓冲区内的数据以及流数据如何自然地两端降速来应对背压,最终将以一个小示例来演示它。

什么是背压

像Flink这样的流处理系统需要能够优雅地应对背压问题。背压通常产生于这样一种场景:当一个系统接收数据的速率高于它在一个瞬时脉冲内能处理的数据。许多日常问题都会导致背压。例如,垃圾回收卡顿可能会导致流入的数据快速堆积,或者一个数据源可能生产数据的速度过快。背压如果不能得到正确地处理,可能会导致资源被耗尽或者甚至出现更糟的情况导致数据丢失。

让我们来看一个简单的例子。假设存在一个数据流pipeline作为source,一个流处理job,以及一个sink以每秒500万条记录的速度处理数据,整个流处理程序处于稳定的状态。如下图所示(一个黑色的条状代表1百万个记录,该图是系统中其中1秒的快照):

在同一时间点,不管是流处理job还是sink,如果有1秒的卡顿,那么将导致至少500万条记录的积压。换句话说,source可能会产生一个脉冲,显示在一秒内数据的生产速度突然翻倍。

我们如何来应对类似这样的场景呢?当然,其中一种方案是删除这些元素。但数据丢失对许多流处理程序而言是不可接受的!这些应用要求exactly once的一致性。另一种方案是数据放在某个缓冲区内。缓冲区也需要被持久化,因为在失败的情况下,这些数据需要被重放 以防止数据丢失。理想情况下,这些数据应该被缓冲到某个持久化的channel里(例如,如果source本身提供持久化保证的情况下,可以是该source本身 – Apache Kafka是一个很不错的选择)。而理想的应对措施是:背压从sink到source的整个pipeline,同时对source进行限流来适配整个pipeline中最慢组件的速度,从而获得稳定状态:

Flink中的背压

Flink运行时的构造部件是operators以及streams。每一个operator消费一个中间/过渡状态的流,对它们进行转换,然后生产一个新的流。描述这种机制最好的类比是:Flink使用有效的分布式阻塞队列来作为有界的缓冲区。如同Java里通用的阻塞队列跟处理线程进行连接一样,一旦队列达到容量上限,一个相对较慢的接受者将拖慢发送者。

以下面这个示例(两个task组成的一个简单的flow)来看Flink如何应对背压:

1、记录“A”进入Flink,然后被Task 1处理

2、记录被序列化进缓冲区

3、缓冲区内的数据被移动到Task 2,task 2会从缓冲区内读取记录

这里有一个重要的事实:为了记录能被Flink处理,缓冲区必须是可用的

在Flink中这些分布式的队列被认为是逻辑流,而它们的有界容量可以通过每一个生产、消费流管理的缓冲池获得。缓冲池是缓冲区的集合,它们都可以在被消费完之后循环利用。这个观点很好理解:你从池里获取一个缓冲区,填进数据,然后在数据被消费后,将该缓冲区返还回缓冲池,之后你还可以再次使用它。

这些缓冲池的大小在运行时能动态变化。在不同的发送者/接收者存在不同的处理速度的情况下,网络栈里的内存缓冲区的数量(等于队列的容量)决定了系统能够提供的缓冲区的数量。Flink保证总是有足够的缓冲区提供给应用程序,但处理的速度是由用户的程序以及可用内存的数量决定的。内存越多,意味着系统可以轻松应对一定的瞬时背压(short periods,short GC)。越少的内存意味着需要对背压进行更多的“即时”响应(意思是,如果内存少缓冲区就容易被填满,那么需要立即作出响应,消费走数据才能应对这个问题)。

回到上面那个简单的示例:Task 1在其输出端被分配了一个缓冲池,Task 2在其输入端也有一个。如果当前有一个缓冲区可供序列化的“A”使用,我们就序列化它然后分配该缓冲区。

我们来看两种场景:

  • 本地传输:如果task1和task2都运行在同一个工作节点(TaskManager),缓冲区可以被直接共享给下一个task,一旦task 2消费了数据它会被回收。如果task 2比task 1慢,buffer会以比task 1填充的速度更慢的速度进行回收从而迫使task 1降速。
  • 远程传输:如果task 1和task 2运行在不同的工作节点上。一旦缓冲区内的数据被发送出去(TCP Channel),它就会被回收。在接收端,数据被拷贝到输入缓冲池的缓冲区中,如果没有缓冲区可用,从TCP连接中的数据读取动作将会被中断。输出端通常以watermark机制来保证不会有太多的数据在传输途中。如果有足够的数据已经进入可发送状态,会等到情况稳定到阈值以下才会进行发送。这可以保证没有太多的数据在路上。如果新的数据在消费端没有被消费(因为没有可用的缓冲区),这种情况会降低发送者发送数据的速度。

这个在固定大小的缓冲池之间的流示例,保证了Flink健壮的背压机制,从而使得task生产数据的速度跟消费的速度对等。

我们描述的这个方案可以从两个task之间的数据传输自然地扩展到更复杂的pipeline中,并保证背压在整个pipeline上扩散。

让我们来看一个简单的实验,它展示了Flink遇到背压问题后的表现。我们运行一个简单的生产者-消费者流拓扑,主要的功能是在本地的task之间传输数据,我们在task生产记录时改变它的速度。就本次测试而言,我们使用比默认配置更少的内存来使得背压问题得到凸显。我们为每个task配备两个大小为4096B(byte)的缓冲区。在通常的Flink部署场景中,task的缓冲区数量会比这更多,容量也会更大。另外,这个测试运行在单一的JVM中,但使用了完整的Flink功能栈。

下面这张图显示了:随着时间的改变,生产者(黄色线)和消费者(绿色线)基于所达到的最大吞吐(在单一JVM中每秒达到8百万条记录)的平均吞吐百分比。我们通过衡量task每5秒钟处理的记录数来衡量平均吞吐。

首先,我们运行生产者task到它最大生产速度的60%(我们通过Thread.sleep()来模拟降速)。消费者以同样的速度处理数据。然后,我们将消费task的速度降至其最高速度的30%。你就会看到背压问题产生了,正如我们所见,生产者的速度也自然降至其最高速度的30%。接着,我们对消费者停止人为降速,之后生产者和消费者task都达到了其最大的吞吐。接下来,我们再次将消费者的速度降至30%,pipeline给出了立即响应:生产者的速度也被自动降至30%。最后,我们再次停止限速,两个task也再次恢复100%的速度。这所有的迹象表明:生产者和消费者在pipeline中的处理都在跟随彼此的吞吐而进行适当的调整,这就是我们在流pipeline中描述的行为。

总结

Flink与持久化的source(例如kafka),能够为你提供即时的背压处理,而无需担心数据丢失。Flink不需要一个特殊的机制来处理背压,因为Flink中的数据传输相当于已经提供了应对背压的机制。因此,Flink所获得的最大吞吐量由其pipeline中最慢的部件决定。

本文翻译自:http://data-artisans.com/how-flink-handles-backpressure/



时间: 2024-10-20 05:38:34

Flink如何应对背压问题的相关文章

RxJava 2 0中backpressure 背压 概念的理解

英文原文:https://github.com/ReactiveX/RxJava/wiki/Backpressure Backpressure(背压.反压力) 在rxjava中会经常遇到一种情况就是被观察者发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息.那么随之而来的就是如何处理这些未处理的消息. 举个例子,使用zip操作符将两个无限大的Observable压缩在一起,其中一个被观察者发送消息的速度是另一个的两倍.一个比较不靠谱的做法就是把发送比较快的消息缓存起来,当比较慢的Obs

Flink Program Guide (9) -- StateBackend : Fault Tolerance(Basic API Concepts -- For Java)

State Backends 本文翻译自文档Streaming Guide / Fault Tolerance / StateBackend ----------------------------------------------------------------------------------------- 使用Data Stream API编写的程序通常以多种形式维护状态: ·  窗口将收集element或在它被触发后聚合element ·  Transformation方法可能会

Apache Flink fault tolerance源码剖析完结篇

这篇文章是对Flinkfault tolerance的一个总结.虽然还有些细节没有涉及到,但是基本的实现要点在这个系列中都已提及. 回顾这个系列,每篇文章都至少涉及一个知识点.我们来挨个总结一下. 恢复机制实现 Flink中通常需要进行状态恢复的对象是operator以及function.它们通过不同的方式来达到状态快照以及状态恢复的能力.其中function通过实现Checkpointed的接口,而operator通过实现StreamOpeator接口.这两个接口的行为是类似的. 当然对于数据

Flink内存管理源码解读之基础数据结构

概述 在分布式实时计算领域,如何让框架/引擎足够高效地在内存中存取.处理海量数据是一个非常棘手的问题.在应对这一问题上Flink无疑是做得非常杰出的,Flink的自主内存管理设计也许比它自身的知名度更高一些.正好最近在研读Flink的源码,所以开两篇文章来谈谈Flink的内存管理设计. Flink的内存管理的亮点体现在作为以Java为主的(部分功能用Scala实现,也是一种遵循JVM规范并依赖JVM解释执行的函数式编程语言)的程序却自主实现内存的管理而不完全依赖于JVM的内存管理机制.它的优势在

Flink内存管理源码解读之内存管理器

回顾 上一篇文章我们谈了Flink自主内存管理的一些基础的数据结构.那篇中主要讲了数据结构的定义,这篇我们来看看那些数据结构的使用,以及内存的管理设计. 概述 这篇文章我们主要探讨Flink的内存管理类MemoryManager涉及到对内存的分配.回收,以及针对预分配内存而提供的memory segment pool.还有支持跨越多个memory segment数据访问的page view. 本文探讨的类主要位于pageckage : org.apache.flink.runtime.memor

Flink中task之间的数据交换机制

Flink中的数据交换构建在如下两条设计原则之上: 数据交换的控制流(例如,为实例化交换而进行的消息传输)是接收端初始化的,这非常像最初的MapReduce. 数据交换的数据流(例如,在网络上最终传输的数据)被抽象成一个叫做IntermediateResult的概念,它是可插拔的.这意味着系统基于相同的实现逻辑可以既支持流数据,又支持批处理数据的传输. 数据传输包含多个对象,它们是: JobManager master节点,用于响应任务调度.恢复.协作,以及通过ExecutionGraph数据结

Flink中的Time与Window

一.Time 在Flink的流式处理中,会涉及到时间的不同概念 Event Time:是事件创建的时间.它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳 Ingestion Time:是数据进入Flink的时间 Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time. 例如一条日志进入Flink的时间为2017-11-12 10:00:0

Flink系列之流式

本文仅是自己看书.学习过程中的个人总结,刚接触流式,视野面比较窄,不喜勿喷,欢迎评论交流. 1.为什么是流式? 为什么是流式而不是流式系统这样的词语?流式系统在我的印象中是相对批处理系统而言的,用来处理流数据,实现数据处理功能的一个系统,而流式一词提醒我要以数据产生的方式去看待数据和以及处理过程,即在现实生活中,数据是以流的形式不断产生的,处理的过程应贴近数据产生的方式. 2.流与批 在处理数据时,对数据而言有:无界和有界之分.无界可以理解为不知道数据产生的停止时间,在数学上可以用前闭后开( [

Flink中的状态与容错

1.概述 Flink支持有状态计算,根据支持得不同状态类型,分别有Keyed State和Operator State.针对状态数据得持久化,Flink提供了Checkpoint机制处理:针对状态数据,Flink提供了不同的状态管理器来管理状态数据,如MemoryStateBackend. 上面Flink的文章中,有引用word count的例子,但是都没有包含状态管理.也就是说,如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算. 从容错和消息处理的语义