Flink Program Guide (7) -- 容错 Fault Tolerance(DataStream API编程指导 -- For Java)

false

false
false
false

EN-US
ZH-CN
X-NONE

/* Style Definitions */
table.MsoNormalTable
{mso-style-name:普通表格;
mso-tstyle-rowband-size:0;
mso-tstyle-colband-size:0;
mso-style-noshow:yes;
mso-style-priority:99;
mso-style-parent:"";
mso-padding-alt:0cm 5.4pt 0cm 5.4pt;
mso-para-margin:0cm;
mso-para-margin-bottom:.0001pt;
mso-pagination:widow-orphan;
font-size:10.0pt;
font-family:"Times New Roman",serif;}

容错(Fault Tolerance)

本文翻译自StreamGuide的Fault Tolerance

----------------------------------------------------------

Flink的容错机制会在错误出现时恢复程序并继续执行,这些容错机制包括设备硬件失效、网络失效、临时程序失效等等。

一、流容错

Flink使用检查点机制来在流Job失效后对其进行恢复。该检查点机制需要一个可以再次请求前面数据的persistent(或durable)的数据源(Apache Kafka便是如此一个数据源的示例)。

检查点机制将数据源和数据sink中的进展、窗口状态以及用户定义的状态(见于Working with state)一致地(consistently)存储起来以提供exactly once的处理语义。有关检查点存储位置(如JobManager、文件系统、数据库)依赖配置的state backend

文档streaming fault tolerance详细描述了Flink流容错机制中的技术。

我们可以通过StreamExecutionEnvironment调用enableCheckPointing(n)方法来启用检查点机制,其中参数n为检查点间隔,以毫秒计。

有关检查点机制的其他参数包括:

·       重试次数:setNumberOfExecutionRetries()方法定义了在失效后job会重新启动多少次。在检查点机制已启用但该值没有设置时,job通常会无限次重启。

·       恰好执行一次 VS. 至少执行一次:你可以向enableCheckPointing(n)方法传递一个mode,该mode包括两个保证级别。其中恰好执行一次适用于绝大多数应用,而至少执行一次则可能更适合一些对要求执行时间极短的应用(持续要求几毫秒)。

·       并行检查点数量:默认地,系统不会再一个检查点正在进行时触发另一个检查点,这保证整个执行拓扑不会花太多时间在检查点上而导致流数据处理停滞。Flink允许多个重叠检查点的情况存在,这对与在有一定延迟的流水线并行情景中(例如由于外部调用服务需要时间响应而导致延迟),仍然想要非常频繁地运行检查点来在失效后仅需要很少量重运行的需求十分有用。

·       检查点超时:定义一个超时时间,如果运行中的检查点到该事件点仍未完成,则将它中止。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// checkpoints have to complete within one minute, or are
discarded

env.getCheckpointConfig().setCheckpointTimeout(60000);

// allow only one checkpoint to be in progress at the same
time

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

1.1 数据源和Sink的容错保证

Flink仅在数据源参加了快照(snapshotting)机制时才可以保证在更新用户定义状态恰好执行一次。下表列出了对应绑定的connector的Flink更新状态的保证级别。有关各个connector的容错保证级别的细节,请见每个connnector的文档。


Source


Guarantees


Notes


Apache Kafka


exactly once


根据你使用的版本,选择合适的Kafka的connector


AWS Kinesis Streams


exactly once


RabbitMQ


at most once (v
0.10) / exactly once (v 1.0)


Twitter Streaming
API


at most once


Collections


exactly once


Files


exactly once


Sockets


at most once

为了保证端到端的恰好执行一次的数据传递(以及恰好执行一次的状态语义),数据sink需要参与检查点机制。下表Flink与绑定的sink的传递保证(假设是恰好执行一次状态更新):


Sink


Guarantees


Notes


HDFS rolling sink


exactly once


其实现依赖于Hadoop版本


Elasticsearch


at least once


Kafka producer


at least once


Cassandra sink


at least once / exactly
once


仅对于幂等的(idempotent)更新是恰好执行一次的保证


AWS Kinesis Streams


at least once


File sinks


at least once


Socket sinks


at least once


Standard output


at least once


Redis sink


at least once

二、重启策略

Flink支持不同的重启策略,它们控制着job在失效情况下如何重启。集群可以用一个默认重启策略来启动,该策略总是在没有job的重启策略定义时使用。当一个拥有重启策略的job提交之后,该策略将会重写集群的默认设置。

默认地重启策略是通过Flink的配置文件flink-conf.yaml设置的。配置参数restart-strategy定义了启用什么策略。在每次默认情景下,会使用不重启的策略。有关改配置支持什么值,请见下面的可用重启策略表格。

每个重启策略都自带它们的参数集合来控制它们的行为。这些值同样在配置文件中有所设置。每个重启策略的描述包含了更多有关对应配置值的信息。


重启策略


restart-strategy的值


Fixed delay


fixed-delay


Failure rate


failure-rate


No restart


none

除了定义一个默认的重启策略,我们也可以为每个Flink的job定义各自的重启策略。重启测略可以通过在ExecutionEnvironment中调用setRestartStrategy方法来设置。注意,该方法同样适用于StreamExecutionEnvironment

下例中展示了我们如何为我们的job的重启策略设置一个固定延迟,在该例中,失效发生时系统将尝试将job重启3次,并且每次重启尝试的间隔为10秒。

ExecutionEnvironment env
= ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3,
// number of restart attempts
  Time.of(10,
TimeUnit.SECONDS) //
delay

  ));

2.1 固定延迟重启策略

固定延迟重启策略会以一个给定的次数尝试重启job。如果超过了最大重试次数,该job将判定为最终失败。在两次连续的重启尝试之间,重启策略会等待一段固定的时间。

在配置文件flink-conf.yaml中设置以下参数时,该策略将会作为默认策略启用。

restart-strategy : fixed-delay


配置参数


描述


默认值


restart-strategy.fixed-delay.attempts


Number
of restart attempts


1


restart-strategy.fixed-delay.delay


Delay
between two consecutive restart attempts


akka.ask.timeout

restart-strategy.fixed-delay.attempts:
3

restart-strategy.fixed-delay.delay:
10s

固定延迟策略同样可以使用代码设置:

ExecutionEnvironment env
= ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3,
// number of restart attempts
  Time.of(10,
TimeUnit.SECONDS) //
delay

  ));

2.1.1 重启尝试(Restart
Attemps)

Flink认为job失败的重启次数是可以通过restart-strategy.fixed-delay.attempts配置的,默认值为1

2.1.2 重试延迟(Retry
Delays)

重试的执行可以配置为有延迟的。延迟重试意味着在一次执行失败后,重新执行不会立即启动,而是要等待一个延迟后再启动。

延迟重试对于程序与外部系统交互有一定帮助,例如连接或者待定的会话需要在到达超时时间之后才可以尝试重新执行。

该值的默认值为akka.ask.timeout

2.2 失败比率重启策略

失败比率重启策略会在job失败后重启它,但是当failure
rate(即平均每秒的失败次数)超出后,job将被判定为最终失败。在两次连续的重启尝试之间,重启策略会等待一段固定时间。

在配置文件flink-conf.yaml中设置以下参数时,该策略将会作为默认策略启用。

Restart-strategy: failure-rate


配置参数


描述


默认值


restart-strategy.failure-rate.max-failures-per-interval


在判定一个job彻底失败前的给定时间内最大重启次数


1


restart-strategy.failure-rate.failure-rate-interval


测量failure rate的时间区间长度


1 minute


restart-strategy.failure-rate.delay


两次重启尝试之间等待的时间


akka.ask.timeout

restart-strategy.failure-rate.max-failures-per-interval:
3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s

失败比率重启策略同样可以通过代码设置:

ExecutionEnvironment env
= ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3,
// max failures per interval
  Time.of(5,
TimeUnit.MINUTES), //time
interval for measuring failure rate

  Time.of(10,
TimeUnit.SECONDS) //
delay

  ));

2.3 不重启策略

在该策略中,job失效将直接判定为最终失效,不会尝试重启。

restart-strategy: none

不重启策略同样可以通过代码设置:

ExecutionEnvironment env
= ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());

时间: 2024-08-02 06:41:04

Flink Program Guide (7) -- 容错 Fault Tolerance(DataStream API编程指导 -- For Java)的相关文章

Flink Program Guide (2) -- DataStream API编程指导 -- For Java

v\:* {behavior:url(#default#VML);} o\:* {behavior:url(#default#VML);} w\:* {behavior:url(#default#VML);} .shape {behavior:url(#default#VML);} 张安 张安 2 1 2016-08-02T10:56:00Z 2016-08-02T10:56:00Z 1 2945 16790 139 39 19696 16.00 false false false false

Flink Program Guide (10) -- Savepoints (DataStream API编程指导 -- For Java)

Savepoint 本文翻译自文档Streaming Guide / Savepoints ------------------------------------------------------------- 使用DataStream API编写的程序可以从一个savepoint处恢复执行.savepoint可以同时更新你的程序和Flink集群而不丢失任何状态.该文档包括了从触发.存储以及销毁(dispose)savepoint的所有内容.有关Flink如何处理状态和失效的详细内容,请见文

Flink Program Guide (3) -- Event Time (DataStream API编程指导 -- For Java)

Event Time 本文翻译自DataStream API Docs v1.2的Event Time ------------------------------------------------------- 一.事件时间 / 处理时间 / 提取时间 Flink支持流程序不同的time概念. ·        Processing time:处理时间指执行对应Operation的设备的系统时间. 当一个流程序以处理时间运行,所有基于时间的operation(如time窗口)将使用运行对应O

Flink Program Guide (6) -- 窗口 (DataStream API编程指导 -- For Java)

窗口(Window) 本文翻译自文档Windows ----------------------------------- Flink使用窗口的概念,根据element的时间戳或者其他指标,将可能无限的DataStream分割为有限的数据切片(slice).我们在处理无限数据流以及进行聚合element的transformation时需要此种窗口分割. 注意:我们在此文档中讨论的大多是keyed windowing,即window是应用在KeyedStream上的.关键字下的窗口具有一定的优势,

Flink Program Guide (5) -- 预定义的Timestamp Extractor / Watermark Emitter (DataStream API编程指导 -- For Java)

本文翻译自Pre-defined Timestamp Extractors / Watermark Emitter ------------------------------------------------------------------------------------------ 正如timestamps and watermark handling中所述,Flink提供了抽象类来让开发者赋值自己的时间戳并发送他们自己的Watermark.更具体来说,开发者需要依照不同用例情况来

Flink Program Guide (9) -- StateBackend : Fault Tolerance(Basic API Concepts -- For Java)

State Backends 本文翻译自文档Streaming Guide / Fault Tolerance / StateBackend ----------------------------------------------------------------------------------------- 使用Data Stream API编写的程序通常以多种形式维护状态: ·  窗口将收集element或在它被触发后聚合element ·  Transformation方法可能会

Apache Flink fault tolerance源码剖析(一)

因某些童鞋的建议,从这篇文章开始结合源码谈谈Flink Fault Tolerance相关的话题.上篇官方介绍的翻译是理解这个话题的前提,所以如果你想更深入得了解Flink Fault Tolerance的机制,推荐先读一下前篇文章理解它的实现原理.当然原理归原理,原理体现在代码实现里并不是想象中的那么直观.这里的源码剖析也是我学习以及理解的过程. 作为源码解析Flink Fault Tolerance的首篇文章,我们先暂且不谈太有深度的东西,先来了解一下:Flink哪里涉及到检查点/快照机制来

Flink DataStream API Programming Guide

Example Program The following program is a complete, working example of streaming window word count application, that counts the words coming from a web socket in 5 second windows.   public class WindowWordCount { public static void main(String[] arg

Apache Flink fault tolerance源码剖析(四)

上篇文章我们探讨了Zookeeper在Flink的fault tolerance中发挥的作用(存储/恢复已完成的检查点以及检查点编号生成器). 这篇文章会谈论一种特殊的检查点,Flink将之命名为--Savepoint(保存点). 因为保存点只不过是一种特殊的检查点,所以在Flink中并没有太多代码实现.但作为一个特性,值得花费一个篇幅来介绍. 检查点VS保存点 使用数据流API编写的程序可以从保存点来恢复执行.保存点允许你在更新程序的同时还能保证Flink集群不丢失任何状态. 保存点是人工触发