Flink - Working with State

All transformations in Flink may look like functions (in the functional processing terminology), but are in fact stateful operators.
You can make every transformation (map, filter, etc) stateful by using Flink’s state interface or checkpointing instance fields of your function.
You can register any instance field as managed state by implementing an interface.
In this case, and also in the case of using Flink’s native state interface, Flink will automatically take consistent snapshots of your state periodically, and restore its value in the case of a failure.

讨论如何使用Flink的state接口来管理状态数据,对于这些状态数据,Flink会自动的定期做snapshots,并且当failure后,会自动restore这些状态

 

Using the Key/Value State Interface

The Key/Value state interface provides access to different types of state that are all scoped to the key of the current input element.
This means that this type of state can only be used on a KeyedStream, which can be created via stream.keyBy(…).

Key/Value state 只能用于KeyedStream

Now, we will first look at the different types of state available and then we will see how they can be used in a program. The available state primitives are:

  • ValueState<T>: This keeps a value that can be updated and retrieved (scoped to key of the input element, mentioned above, so there will possibly be one value for each key that the operation sees). The value can be set using update(T) and retrieved using T value().
  • ListState<T>: This keeps a list of elements. You can append elements and retrieve an Iterable over all currently stored elements. Elements are added using add(T), the Iterable can be retrieved using Iterable<T> get().
  • ReducingState<T>: This keeps a single value that represents the aggregation of all values added to the state. The interface is the same as for ListState but elements added using add(T) are reduced to an aggregate using a specifiedReduceFunction.

All types of state also have a method clear() that clears the state for the currently active key (i.e. the key of the input element).

3种不同类型的state,

ValueState,单值的state,可以通过update(T)T value()来操作

ListState<T>, 多只的state,通过add(T)或Iterable<T> get()来操作和访问

ReducingState<T>,多值状态,但是只保留reduce的结果

并且所有的state,都有clear,来清除状态数据

It is important to keep in mind that these state objects are only used for interfacing with state. The state is not necessarily stored inside but might reside on disk or somewhere else.
The second thing to keep in mind is that the value you get from the state depend on the key of the input element.
So the value you get in one invocation of your user function can be different from the one you get in another invocation if the key of the element is different.

这些state对象只能被状态接口使用,
并且取出的状态对象,取决于input element的key;所以不同的调用user function 得到的state value是不一样的,因为element的key 可能不同

 

To get a state handle you have to create a StateDescriptor this holds the name of the state (as we will later see you can create several states, and they have to have unique names so that you can reference them), the type of the values that the state holds and possibly a user-specified function, such as a ReduceFunction. Depending on what type of state you want to retrieve you create one of ValueStateDescriptor, ListStateDescriptor or ReducingStateDescriptor.

对于state,需要一个StateDescriptor ,作为name用于reference这个state,如果你定义多个state,他们的StateDescriptor 必须是unique的。
不同类型的state,有不同类型的StateDescriptor

 

State is accessed using the RuntimeContext, so it is only possible in rich functions.
Please see here for information about that but we will also see an example shortly.
The RuntimeContext that is available in a RichFunction has these methods for accessing state:

  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)

 

State对象通过RuntimeContext的接口获取到,当然不同类型的state,对应于不同的接口;
关键是,如果要使用state,必须要使用rich function,用普通的function是无法获取到的

This is an example FlatMapFunction that shows how all of the parts fit together:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(0)
        .flatMap(new CountWindowAverage())
        .print();

 

Checkpointing Instance Fields

Instance fields can be checkpointed by using the Checkpointed interface.

除了使用Key/Value state interface,还可以用Checkpointed interface,去实现snapshotState(…)restoreState(…)

When the user-defined function implements the Checkpointed interface, the snapshotState(…) and restoreState(…) methods will be executed to draw and restore function state.

In addition to that, user functions can also implement the CheckpointNotifier interface to receive notifications on completed checkpoints via the notifyCheckpointComplete(long checkpointId) method. Note that there is no guarantee for the user function to receive a notification if a failure happens between checkpoint completion and notification.

你还可以实现CheckpointNotifier ,这样当checkpoint结束的时候会调用这个接口,你可以做些事,不过这个不保证一定触发

public class CountWindowAverage
        extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
        implements Checkpointed<Tuple2<Long, Long>> {

    private Tuple2<Long, Long> sum = null;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // update the count
        sum.f0 += 1;

        // add the second field of the input value
        sum.f1 += input.f1;

        // if the count reaches 2, emit the average and clear the state
        if (sum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, sum.f1 / sum.f0));
            sum = Tuple2.of(0L, 0L);
        }
    }

    @Override
    public void open(Configuration config) {
        if (sum == null) {
            // only recreate if null
            // restoreState will be called before open()
            // so this will already set the sum to the restored value
            sum = Tuple2.of(0L, 0L);
        }
    }

    // regularly persists state during normal operation
    @Override
    public Serializable snapshotState(long checkpointId, long checkpointTimestamp) {
        return sum;
    }

    // restores state on recovery from failure
    @Override
    public void restoreState(Tuple2<Long, Long> state) {
        sum = state;
    }
}

 

Stateful Source Functions

Stateful sources require a bit more care as opposed to other operators.

In order to make the updates to the state and output collection atomic (required for exactly-once semantics on failure/recovery), the user is required to get a lock from the source’s context.

对于有状态的source,有些不一样的是,在更新state和output时,注意要加锁来保证exactly-once,比如避免多个线程同时更新offset

public static class CounterSource
        extends RichParallelSourceFunction<Long>
        implements Checkpointed<Long> {

    /**  current offset for exactly once semantics */
    private long offset;

    /** flag for job cancellation */
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public Long snapshotState(long checkpointId, long checkpointTimestamp) {
        return offset;

    }

    @Override
    public void restoreState(Long state) {
        offset = state;
    }
}

 

State Checkpoints in Iterative Jobs

Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing:env.enableCheckpointing(interval, force = true).

Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.

 

State Backends

Programs written in the Data Stream API often hold state in various forms:

  • Windows gather elements or aggregates until they are triggered
  • Transformation functions may use the key/value state interface to store values
  • Transformation functions may implement the Checkpointed interface to make their local variables fault tolerant

主要的state,包含几种,

windows里面gather的elements

Transformation functions中用key/value state interface创建的state

Transformation functions 中通过Checkpointed interface 去对local variables做的state

 

When checkpointing is activated, such state is persisted upon checkpoints to guard against data loss and recover consistently.

How the state is represented internally, and how and where it is persisted upon checkpoints depends on the chosen State Backend.

关键,state如何和存到何处,还是看具体用什么State Backend

 

Available State Backends

Out of the box, Flink bundles these state backends:

  • MemoryStateBacked
  • FsStateBackend
  • RocksDBStateBackend

If nothing else is configured, the system will use the MemoryStateBacked.

当前有3种state backends,默认的是用MemoryStateBacked

 

The MemoryStateBackend

The MemoryStateBacked holds data internally as objects on the Java heap. Key/value state and window operators hold hash tables that store the values, triggers, etc.

Upon checkpoints, this state backend will snapshot the state and send it as part of the checkpoint acknowledgement messages to the JobManager (master), which stores it on its heap as well.

MemoryStateBackend顾名思义,就是state是存储在Java heap中的;在做checkpoints的时候,state backend 会将state snapshot放入 checkpoint acknowledgement messages 发给JobManager,JobManager 仍然是将它存在heap中。

 

The FsStateBackend

The FsStateBackend is configured with a file system URL (type, address, path), such as for example “hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.

The FsStateBackend holds in-flight data in the TaskManager’s memory. Upon checkpointing, it writes state snapshots into files in the configured file system and directory.

Minimal metadata is stored in the JobManager’s memory (or, in high-availability mode, in the metadata checkpoint).

State snapshot数据是存在文件系统中的,而JobManager的内存中,只是存放最小的元数据

 

The RocksDBStateBackend

只是用RocksDB来替换文件系统,

NOTE: To use the RocksDBStateBackend you also have to add the correct maven dependency to your project:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-statebackend-rocksdb_2.10</artifactId>
  <version>1.0.3</version>
</dependency>

The backend is currently not part of the binary distribution. See here for an explanation of how to include it for cluster execution.

 

Configuring a State Backend

State backends can be configured per job. In addition, you can define a default state backend to be used when the job does not explicitly define a state backend.

Setting the Per-job State Backend

The per-job state backend is set on the StreamExecutionEnvironment of the job, as shown in the example below:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

 

Setting Default State Backend

A default state backend can be configured in the flink-conf.yaml, using the configuration key state.backend.

Possible values for the config entry are jobmanager (MemoryStateBackend), filesystem (FsStateBackend), or the fully qualified class name of the class that implements the state backend factory FsStateBackendFactory.

In the case where the default state backend is set to filesystem, the entry state.backend.fs.checkpointdir defines the directory where the checkpoint data will be stored.

A sample section in the configuration file could look as follows:

# The backend that will be used to store operator state checkpoints

state.backend: filesystem

# Directory for storing checkpoints

state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints
时间: 2024-10-02 21:03:11

Flink - Working with State的相关文章

Flink之状态之状态存储 state backends

流计算中可能有各种方式来保存状态: 窗口操作 使用 了KV操作的函数 继承了CheckpointedFunction的函数 当开始做checkpointing的时候,状态会被持久化到checkpoints里来规避数据丢失和状态恢复.选择的状态存储策略不同,会导致状态持久化如何和checkpoints交互. 1.可用的状态持久化策略 Flink提供了三种持久化策略,如果没有显式指定,则默认使用MemoryStateBackend. The MemoryStateBackend 将数据保存在java

&lt;译&gt;Flink编程指南

Flink 的流数据 API 编程指南 Flink 的流数据处理程序是常规的程序 ,通过再流数据上,实现了各种转换 (比如 过滤, 更新中间状态, 定义窗口, 聚合).流数据可以来之多种数据源 (比如, 消息队列, socket 流, 文件). 通过sink组件落地流计算的最终结果,比如可以把数据落地文件系统,标准输出流比如命令行界面, Flink 的程序可以运行在多种上下文环境 ,可以单独只是Flink api,也可以嵌入其他程序. execution可以运行在本地的 JVM里, 也可以 运行

Apache Flink fault tolerance源码剖析(五)

上一篇文章我们谈论了保存点的相关内容,其中就谈到了保存点状态的存储.这篇文章我们来探讨用户程序状态的存储,也是在之前的文章中多次提及的state backend(中文暂译为状态终端). 基于数据流API而编写的程序经常以各种各样的形式保存着状态: 窗口收集/聚合元素(这里的元素可以看作是窗口的状态)直到它们被触发 转换函数可能会使用key/value状态接口来存储数据 转换函数可能实现Checkpointed接口来让它们的本地变量受益于fault tolerant机制 当检查点机制工作时,上面谈

Flink DataStream API Programming Guide

Example Program The following program is a complete, working example of streaming window word count application, that counts the words coming from a web socket in 5 second windows.   public class WindowWordCount { public static void main(String[] arg

Apache Flink 漫谈系列 - JOIN 算子

聊什么 在<Apache Flink 漫谈系列 - SQL概览>中我们介绍了JOIN算子的语义和基本的使用方式,介绍过程中大家发现Apache Flink在语法语义上是遵循ANSI-SQL标准的,那么再深思一下传统数据库为啥需要有JOIN算子呢?在实现原理上面Apache Flink内部实现和传统数据库有什么区别呢?本篇将详尽的为大家介绍传统数据库为什么需要JOIN算子,以及JOIN算子在Apache Flink中的底层实现原理和在实际使用中的优化! 什么是JOIN 在<Apache F

Apache-Flink深度解析-State

摘要: 实际问题 在流计算场景中,数据会源源不断的流入Apache Flink系统,每条数据进入Apache Flink系统都会触发计算.如果我们想进行一个Count聚合计算,那么每次触发计算是将历史上所有流入的数据重新新计算一次,还是每次计算都是在上一次计算结果之上进行增量计算呢?答案是肯定的,Apache Flink是基于上一次的计算结果进行增量计算的. 实际问题 在流计算场景中,数据会源源不断的流入Apache Flink系统,每条数据进入Apache Flink系统都会触发计算.如果我们

Flink中的状态与容错

1.概述 Flink支持有状态计算,根据支持得不同状态类型,分别有Keyed State和Operator State.针对状态数据得持久化,Flink提供了Checkpoint机制处理:针对状态数据,Flink提供了不同的状态管理器来管理状态数据,如MemoryStateBackend. 上面Flink的文章中,有引用word count的例子,但是都没有包含状态管理.也就是说,如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算. 从容错和消息处理的语义

Apache Flink 是什么?

架构 Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算.Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算. 接下来,我们来介绍一下 Flink 架构中的重要方面. 处理无界和有界数据 任何类型的数据都可以形成一种事件流.信用卡交易.传感器测量.机器日志.网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流. 数据可以被作为 无界 或者 有界 流来处理. 无界流 有定义流的开始,但没有定义流的结束.它们会无休止地产生

使用flink实现一个topN的程序

topN功能是一个非常常见的功能,比如查看最近几分钟的阅读最高数,购买最高数. flink实现topN的功能也非常方便,下面就开始构建一个flink topN的程序. 还是像上篇博客一样,从kafka读取数据,然后进行计算和数据转换,最后sink到mysql中. 假设有个需求,实现一个统计每5分钟最高购买数的商品. 使用maven创建一个工程,具体步骤可以参考上边博文.然后创建一个数据库表,用于存储最终的结果集.语句如下: CREATE TABLE `itembuycount` ( `id` m