Flink中的状态与容错

1.概述

Flink支持有状态计算,根据支持得不同状态类型,分别有Keyed State和Operator State。针对状态数据得持久化,Flink提供了Checkpoint机制处理;针对状态数据,Flink提供了不同的状态管理器来管理状态数据,如MemoryStateBackend。

上面Flink的文章中,有引用word count的例子,但是都没有包含状态管理。也就是说,如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。

从容错和消息处理的语义上(at least once, exactly once),Flink引入了state和checkpoint。

首先区分一下两个概念,state一般指一个具体的task/operator的状态。而checkpoint则表示了一个Flink Job,在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态。

所谓checkpoint,就是在某一时刻,将所有task的状态做一个快照(snapshot),然后存储到memory/file system/rocksdb等。Flink通过定期地做checkpoint来实现容错和恢复。

2.checkpoint的实现CheckpointedFunction

CheckpointedFunction的描述

/**
 * This is the core interface for <i>stateful transformation functions</i>, meaning functions
 * that maintain state across individual stream records.
 * While more lightweight interfaces exist as shortcuts for various types of state, this interface offer the
 * greatest flexibility in managing both <i>keyed state</i> and <i>operator state</i>.
 *
 * <p>The section <a href="#shortcuts">Shortcuts</a> illustrates the common lightweight
 * ways to setup stateful functions typically used instead of the full fledged
 * abstraction represented by this interface.
 *
 * <h1>Initialization</h1>
 * The {@link CheckpointedFunction#initializeState(FunctionInitializationContext)} is called when
 * the parallel instance of the transformation function is created during distributed execution.
 * The method gives access to the {@link FunctionInitializationContext} which in turn gives access
 * to the to the {@link OperatorStateStore} and {@link KeyedStateStore}.
 *
 * <p>The {@code OperatorStateStore} and {@code KeyedStateStore} give access to the data structures
 * in which state should be stored for Flink to transparently manage and checkpoint it, such as
 * {@link org.apache.flink.api.common.state.ValueState} or
 * {@link org.apache.flink.api.common.state.ListState}.
 *
 * <p><b>Note:</b> The {@code KeyedStateStore} can only be used when the transformation supports
 * <i>keyed state</i>, i.e., when it is applied on a keyed stream (after a {@code keyBy(...)}).
 *
 * <h1>Snapshot</h1>
 * The {@link CheckpointedFunction#snapshotState(FunctionSnapshotContext)} is called whenever a
 * checkpoint takes a state snapshot of the transformation function. Inside this method, functions typically
 * make sure that the checkpointed data structures (obtained in the initialization phase) are up
 * to date for a snapshot to be taken. The given snapshot context gives access to the metadata
 * of the checkpoint.
 *
 * <p>In addition, functions can use this method as a hook to flush/commit/synchronize with
 * external systems.
 *
 * <h1>Example</h1>
 * The code example below illustrates how to use this interface for a function that keeps counts
 * of events per key and per parallel partition (parallel instance of the transformation function
 * during distributed execution).
 * The example also changes of parallelism, which affect the count-per-parallel-partition by
 * adding up the counters of partitions that get merged on scale-down. Note that this is a
 * toy example, but should illustrate the basic skeleton for a stateful function.
 *
 * <p><pre>{@code
 * public class MyFunction<T> implements MapFunction<T, T>, CheckpointedFunction {
 *
 *     private ReducingState<Long> countPerKey;
 *     private ListState<Long> countPerPartition;
 *
 *     private long localCount;
 *
 *     public void initializeState(FunctionInitializationContext context) throws Exception {
 *         // get the state data structure for the per-key state
 *         countPerKey = context.getKeyedStateStore().getReducingState(
 *                 new ReducingStateDescriptor<>("perKeyCount", new AddFunction<>(), Long.class));
 *
 *         // get the state data structure for the per-partition state
 *         countPerPartition = context.getOperatorStateStore().getOperatorState(
 *                 new ListStateDescriptor<>("perPartitionCount", Long.class));
 *
 *         // initialize the "local count variable" based on the operator state
 *         for (Long l : countPerPartition.get()) {
 *             localCount += l;
 *         }
 *     }
 *
 *     public void snapshotState(FunctionSnapshotContext context) throws Exception {
 *         // the keyed state is always up to date anyways
 *         // just bring the per-partition state in shape
 *         countPerPartition.clear();
 *         countPerPartition.add(localCount);
 *     }
 *
 *     public T map(T value) throws Exception {
 *         // update the states
 *         countPerKey.add(1L);
 *         localCount++;
 *
 *         return value;
 *     }
 * }
 * }</pre>
 *
 * <hr>
 *
 * <h1><a name="shortcuts">Shortcuts</a></h1>
 * There are various ways that transformation functions can use state without implementing the
 * full-fledged {@code CheckpointedFunction} interface:
 *
 * <h4>Operator State</h4>
 * Checkpointing some state that is part of the function object itself is possible in a simpler way
 * by directly implementing the {@link ListCheckpointed} interface.
 * That mechanism is similar to the previously used {@link Checkpointed} interface.
 *
 * <h4>Keyed State</h4>
 * Access to keyed state is possible via the {@link RuntimeContext}‘s methods:
 * <pre>{@code
 * public class CountPerKeyFunction<T> extends RichMapFunction<T, T> {
 *
 *     private ValueState<Long> count;
 *
 *     public void open(Configuration cfg) throws Exception {
 *         count = getRuntimeContext().getState(new ValueStateDescriptor<>("myCount", Long.class));
 *     }
 *
 *     public T map(T value) throws Exception {
 *         Long current = count.get();
 *         count.update(current == null ? 1L : current + 1);
 *
 *         return value;
 *     }
 * }
 * }</pre>
 *
 * @see ListCheckpointed
 * @see RuntimeContext
 */

2.1. 它的snapshotState调用过程如下:

核心类StreamTask

/**
 * Base class for all streaming tasks. A task is the unit of local processing that is deployed
 * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
 * the Task‘s operator chain. Operators that are chained together execute synchronously in the
 * same thread and hence on the same stream partition. A common case for these chains
 * are successive map/flatmap/filter tasks.
 *
 * <p>The task chain contains one "head" operator and multiple chained operators.
 * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks,
 * as well as for sources, iteration heads and iteration tails.
 *
 * <p>The Task class deals with the setup of the streams read by the head operator, and the streams
 * produced by the operators at the ends of the operator chain. Note that the chain may fork and
 * thus have multiple ends.
 *
 * <p>The life cycle of the task is set up as follows:
 * <pre>{@code
 *  -- setInitialState -> provides state of all operators in the chain
 *
 *  -- invoke()
 *        |
 *        +----> Create basic utils (config, etc) and load the chain of operators
 *        +----> operators.setup()
 *        +----> task specific init()
 *        +----> initialize-operator-states()
 *        +----> open-operators()
 *        +----> run()
 *        +----> close-operators()
 *        +----> dispose-operators()
 *        +----> common cleanup
 *        +----> task specific cleanup()
 * }</pre>
 *
 * <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
 * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
 * are called concurrently.
 *
 * @param <OUT>
 * @param <OP>
 */

2.2.它的initializeState调用过程如下:

3.checkpoint的状态管理器StateBackend

StateBackend

/**
 * A <b>State Backend</b> defines how the state of a streaming application is stored and
 * checkpointed. Different State Backends store their state in different fashions, and use
 * different data structures to hold the state of a running application.
 *
 * <p>For example, the {@link org.apache.flink.runtime.state.memory.MemoryStateBackend memory state backend}
 * keeps working state in the memory of the TaskManager and stores checkpoints in the memory of the
 * JobManager. The backend is lightweight and without additional dependencies, but not highly available
 * and supports only small state.
 *
 * <p>The {@link org.apache.flink.runtime.state.filesystem.FsStateBackend file system state backend}
 * keeps working state in the memory of the TaskManager and stores state checkpoints in a filesystem
 * (typically a replicated highly-available filesystem, like <a href="https://hadoop.apache.org/">HDFS</a>,
 * <a href="https://ceph.com/">Ceph</a>, <a href="https://aws.amazon.com/documentation/s3/">S3</a>,
 * <a href="https://cloud.google.com/storage/">GCS</a>, etc).
 *
 * <p>The {@code RocksDBStateBackend} stores working state in <a href="http://rocksdb.org/">RocksDB</a>,
 * and checkpoints the state by default to a filesystem (similar to the {@code FsStateBackend}).
 *
 * <h2>Raw Bytes Storage and Backends</h2>
 *
 * The {@code StateBackend} creates services for <i>raw bytes storage</i> and for <i>keyed state</i>
 * and <i>operator state</i>.
 *
 * <p>The <i>raw bytes storage</i> (through the {@link CheckpointStreamFactory}) is the fundamental
 * service that simply stores bytes in a fault tolerant fashion. This service is used by the JobManager
 * to store checkpoint and recovery metadata and is typically also used by the keyed- and operator state
 * backends to store checkpointed state.
 *
 * <p>The {@link AbstractKeyedStateBackend} and {@link OperatorStateBackend} created by this state
 * backend define how to hold the working state for keys and operators. They also define how to checkpoint
 * that state, frequently using the raw bytes storage (via the {@code CheckpointStreamFactory}).
 * However, it is also possible that for example a keyed state backend simply implements the bridge to
 * a key/value store, and that it does not need to store anything in the raw byte storage upon a
 * checkpoint.
 *
 * <h2>Serializability</h2>
 *
 * State Backends need to be {@link java.io.Serializable serializable}, because they distributed
 * across parallel processes (for distributed execution) together with the streaming application code.
 *
 * <p>Because of that, {@code StateBackend} implementations (typically subclasses
 * of {@link AbstractStateBackend}) are meant to be like <i>factories</i> that create the proper
 * states stores that provide access to the persistent storage and hold the keyed- and operator
 * state data structures. That way, the State Backend can be very lightweight (contain only
 * configurations) which makes it easier to be serializable.
 *
 * <h2>Thread Safety</h2>
 *
 * State backend implementations have to be thread-safe. Multiple threads may be creating
 * streams and keyed-/operator state backends concurrently.
 */

4.Savepoint

Savepoint是Checkpoint的一种特殊实现,底层也是使用Checkpoint的机制。Savepoint是用户以手工命令的方式触发Checkpoint并将结果持久化到指定的存储里,其主要目的是帮助用户在升级和维护集群过程中保存系统的状态数据,避免因停机或者升级邓正常终止应用的操作而导致系统无法恢复到原有的计算状态,而无法实现Exactly-Once的语义保证。

/**
 * Savepoints are manually-triggered snapshots from which a program can be
 * resumed on submission.
 *
 * <p>In order to allow changes to the savepoint format between Flink versions,
 * we allow different savepoint implementations (see subclasses of this
 * interface).
 *
 * <p>Savepoints are serialized via a {@link SavepointSerializer}.
 */

5.Querable State

Queryable State,顾名思义,就是可查询的状态,表示这个状态,在流计算的过程中就可以被查询,而不像其他流计算框架,需要存储到外部系统中才能被查询。目前可查询的state主要针对partitionable state,如keyed state等。

简单来说,当用户在job中定义了queryable state之后,就可以在外部,通过QueryableStateClient,通过job id, state name, key来查询所对应的状态的实时的值。

5.1 QueryableStateClient

QueryableStateClient

/**
 * Client for querying Flink‘s managed state.
 *
 * <p>You can mark state as queryable via {@link StateDescriptor#setQueryable(String)}.
 * The state instance created from this descriptor will be published for queries when it‘s
 * created on the Task Managers and the location will be reported to the Job Manager.
 *
 * <p>The client connects to a {@code Client Proxy} running on a given Task Manager. The
 * proxy is the entry point of the client to the Flink cluster. It forwards the requests
 * of the client to the Job Manager and the required Task Manager, and forwards the final
 * response back the client.
 *
 * <p>The proxy, initially resolves the location of the requested KvState via the JobManager. Resolved
 * locations are cached. When the server address of the requested KvState instance is determined, the
 * client sends out a request to the server. The returned final answer is then forwarded to the Client.
 */

其查询的实现

    /**
     * Returns a future holding the serialized request result.
     *
     * @param jobId                     JobID of the job the queryable state
     *                                  belongs to
     * @param queryableStateName        Name under which the state is queryable
     * @param keyHashCode               Integer hash code of the key (result of
     *                                  a call to {@link Object#hashCode()}
     * @param serializedKeyAndNamespace Serialized key and namespace to query
     *                                  KvState instance with
     * @return Future holding the serialized result
     */
    private CompletableFuture<KvStateResponse> getKvState(
            final JobID jobId,
            final String queryableStateName,
            final int keyHashCode,
            final byte[] serializedKeyAndNamespace) {
        LOG.debug("Sending State Request to {}.", remoteAddress);
        try {
            KvStateRequest request = new KvStateRequest(jobId, queryableStateName, keyHashCode, serializedKeyAndNamespace);
            return client.sendRequest(remoteAddress, request);
        } catch (Exception e) {
            LOG.error("Unable to send KVStateRequest: ", e);
            return FutureUtils.getFailedFuture(e);
        }
    }

通过组装request,然后使用client发送请求

5.2 KvStateServer

KvStateServer

/**
 * An interface for the Queryable State Server running on each Task Manager in the cluster.
 * This server is responsible for serving requests coming from the {@link KvStateClientProxy
 * Queryable State Proxy} and requesting <b>locally</b> stored state.
 */

6. 总结

为什么要使用状态?

数据之间有关联,需要通过状态满足业务逻辑

为什么要管理状态?

实时计算作业需要7*24运行,需要应对不可靠因素带来的影响

如何选择状态的类型和存储方式?

分析自己的业务场景,比对各方案的利弊,选择合适的,够用即可

参考资料:

【1】https://yq.aliyun.com/articles/225623#

【2】https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/

【3】https://blog.csdn.net/alexdamiao/article/details/94043468

原文地址:https://www.cnblogs.com/davidwang456/p/11124698.html

时间: 2024-07-31 19:21:47

Flink中的状态与容错的相关文章

Flink 从0到1学习 —— Flink 中如何管理配置?

前言 如果你了解 Apache Flink 的话,那么你应该熟悉该如何像 Flink 发送数据或者如何从 Flink 获取数据.但是在某些情况下,我们需要将配置数据发送到 Flink 集群并从中接收一些额外的数据. 在本文的第一部分中,我将描述如何将配置数据发送到 Flink 集群.我们需要配置很多东西:方法参数.配置文件.机器学习模型.Flink 提供了几种不同的方法,我们将介绍如何使用它们以及何时使用它们.在本文的第二部分中,我将描述如何从 Flink 集群中获取数据. 如何发送数据给 Ta

Flink中task之间的数据交换机制

Flink中的数据交换构建在如下两条设计原则之上: 数据交换的控制流(例如,为实例化交换而进行的消息传输)是接收端初始化的,这非常像最初的MapReduce. 数据交换的数据流(例如,在网络上最终传输的数据)被抽象成一个叫做IntermediateResult的概念,它是可插拔的.这意味着系统基于相同的实现逻辑可以既支持流数据,又支持批处理数据的传输. 数据传输包含多个对象,它们是: JobManager master节点,用于响应任务调度.恢复.协作,以及通过ExecutionGraph数据结

Flink中案例学习--State与CheckPoint

一.State 在Flink中,按照基本类型,对State做了以下两类的划分: Keyed State,和Key有关的状态类型,它只能被基于KeyedStream之上的操作,方法所使用.我们可以从逻辑上理解这种状态是一个并行度操作实例和一种Key的对应, <parallel-operator-instance, key>.Operator State(或者non-keyed state),它是和Key无关的一种状态类型.相应地我们从逻辑上去理解这个概念,它相当于一个并行度实例,对应一份状态数据

部署OpenStack问题汇总(四)--openstack中nova-compute状态status显示为&#39;XXX&#39;的问题

第一次部署openstack的时候就遇见了这个问题,当时的版本是havana, 现在部署essex的时候又遇到了这个问题,经过一番折腾,解决了这个问题,记录下来,以免以后忘记. =========================================================== 1.查看/var/log/nova/nova-compute.log文件其中出现了这样的情况: Domain not found: no domain with matching name 'insta

Thread类中各种状态的区别以及使用

wait()是等待某个object:1.执行object的synchronized方法:2.执行一段synchronized代码,而且是基于object做的同步:3.如果object是class类,可以执行它的synchronized static方法 notify().notifyAll():能够唤醒等待的线程 interrupt():中断一个线程,用得比较少 join().join(long millis):当这个线程调用了join()方法后它才执行后面的线程,保证了线程之间有顺序的执行.如

SVN 在 Xcode中的状态说明

最近同事总是问我关于SVN状态的问题,‘C’是什么意思啦?‘A’是什么意思啦?等等一系列问题. 为了方便以后查阅,以及新同事的快速融入,特在此记录一下^_^. 当然了大家也可以google一下,一搜一大把. SVN Status 字符含义如下: ' '  - NO Modifications. 没有改动 'A' - Added. 新增加的文件 'C' - Conflicted. 文件内容与更新得到的内容发生了冲突(团队开发时会遇到) 'D' - Deleted. 已经被删除的文件 'I'  -

一篇博客分清shell中的状态返回值-return-break-continue-exit

一篇博客分清shell中的状态返回值-return-break-continue-exit 一.break.continue.exit.return的区别和对比 条件与循环控制及程序返回值命令知识表 命令 说明 break n 如果省略n,则表示跳出整个循环,n表示跳出循环的层数 continue n 如果省略n,则表示跳出本次循环,忽略本次循环剩余代码,进入循环的下一次循环.n表示退到第n层继续循环 exit n 表示退出当前shell程序,n为上一次程序执行的状态返回值,n也可以省略,在下一

程序中保存状态的方式之Cookies

程序中保存状态的方式之 Cookies,之前写过一篇关于ViewState的.现在继续总结Cookies方式的 新建的测试页面login <%@ Page Language="C#" AutoEventWireup="true" CodeFile="Login.aspx.cs" Inherits="Login" %> <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML

在C 函数中保存状态:registry、reference和upvalues

在C函数中保存状态:registry.reference和upvalues C函数能够通过堆栈来和Lua交换数据,但有时候C函数须要在函数体的作用域之外保存某些Lua数据.那么我们想到全局变量或static变量,这样做的缺点是:(1)为Lua设计C函数库时,导致不可重入.(2)不是全部的Lua值都能非常好的保存到C变量中.那么可不能够将值保存在Lua全局变量里面呢,能够,Lua就提供了一个独立的被称为registry的表,可是Lua代码本身不能訪问它. 1.registry全局注冊表 解释:一个