Flink - state

 

public class StreamTaskState implements Serializable, Closeable {

    private static final long serialVersionUID = 1L;

    private StateHandle<?> operatorState;

    private StateHandle<Serializable> functionState;

    private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates;

Flink中state分为三种,

可以看到,StreamTaskState是对三种state的封装,

1. KVState

是最基本的state,

抽象是一对,KvState和KvStateSnapshot

通过两个接口,互相转化

/**
 * Key/Value state implementation for user-defined state. The state is backed by a state
 * backend, which typically follows one of the following patterns: Either the state is stored
 * in the key/value state object directly (meaning in the executing JVM) and snapshotted by the
 * state backend into some store (during checkpoints), or the key/value state is in fact backed
 * by an external key/value store as the state backend, and checkpoints merely record the
 * metadata of what is considered part of the checkpoint.
 *
 * @param <K> The type of the key.
 * @param <N> The type of the namespace.
 * @param <S> The type of {@link State} this {@code KvState} holds.
 * @param <SD> The type of the {@link StateDescriptor} for state {@code S}.
 * @param <Backend> The type of {@link AbstractStateBackend} that manages this {@code KvState}.
 */
public interface KvState<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> {

    /**
     * Sets the current key, which will be used when using the state access methods.
     *
     * @param key The key.
     */
    void setCurrentKey(K key);

    /**
     * Sets the current namespace, which will be used when using the state access methods.
     *
     * @param namespace The namespace.
     */
    void setCurrentNamespace(N namespace);

    /**
     * Creates a snapshot of this state.
     *
     * @param checkpointId The ID of the checkpoint for which the snapshot should be created.
     * @param timestamp The timestamp of the checkpoint.
     * @return A snapshot handle for this key/value state.
     *
     * @throws Exception Exceptions during snapshotting the state should be forwarded, so the system
     *                   can react to failed snapshots.
     */
    KvStateSnapshot<K, N, S, SD, Backend> snapshot(long checkpointId, long timestamp) throws Exception;

    /**
     * Disposes the key/value state, releasing all occupied resources.
     */
    void dispose();
}

定义也比较简单,关键是snapshot接口,产生KvStateSnapshot

public interface KvStateSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend>
        extends StateObject {

    /**
     * Loads the key/value state back from this snapshot.
     *
     * @param stateBackend The state backend that created this snapshot and can restore the key/value state
     *                     from this snapshot.
     * @param keySerializer The serializer for the keys.
     * @param classLoader The class loader for user-defined types.
     *
     * @return An instance of the key/value state loaded from this snapshot.
     *
     * @throws Exception Exceptions can occur during the state loading and are forwarded.
     */
    KvState<K, N, S, SD, Backend> restoreState(
        Backend stateBackend,
        TypeSerializer<K> keySerializer,
        ClassLoader classLoader) throws Exception;
}

KvStateSnapshot,对应于KvState,关键是restoreState接口

以具体的,FsState为例,

public abstract class AbstractFsState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>      extends AbstractHeapState<K, N, SV, S, SD, FsStateBackend> {

可以看到AbstractFsState是继承AbstractHeapState的,因为对于FsState的状态也是cache在Heap中的,只是在snapshot的时候需要写文件

所以先看下AbstractHeapState,

/**
 * Base class for partitioned {@link ListState} implementations that are backed by a regular
 * heap hash map. The concrete implementations define how the state is checkpointed.
 *
 * @param <K> The type of the key.
 * @param <N> The type of the namespace.
 * @param <SV> The type of the values in the state.
 * @param <S> The type of State
 * @param <SD> The type of StateDescriptor for the State S
 * @param <Backend> The type of the backend that snapshots this key/value state.
 */
public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend>
        implements KvState<K, N, S, SD, Backend>, State {

    /** Map containing the actual key/value pairs */
    protected final HashMap<N, Map<K, SV>> state; //可以看到这里,多了个namespace的概念,避免key太容易重复

    /** Serializer for the state value. The state value could be a List<V>, for example. */
    protected final TypeSerializer<SV> stateSerializer;

    /** The serializer for the keys */
    protected final TypeSerializer<K> keySerializer;

    /** The serializer for the namespace */
    protected final TypeSerializer<N> namespaceSerializer;

    /** This holds the name of the state and can create an initial default value for the state. */
    protected final SD stateDesc; //StateDescriptor,用于放一些state的信息,比如default值

    /** The current key, which the next value methods will refer to */
    protected K currentKey;

    /** The current namespace, which the access methods will refer to. */
    protected N currentNamespace = null;

    /** Cache the state map for the current key. */
    protected Map<K, SV> currentNSState;

    /**
     * Creates a new empty key/value state.
     *
     * @param keySerializer The serializer for the keys.
     * @param namespaceSerializer The serializer for the namespace.
     * @param stateDesc The state identifier for the state. This contains name
     *                           and can create a default state value.
     */
    protected AbstractHeapState(TypeSerializer<K> keySerializer,
        TypeSerializer<N> namespaceSerializer,
        TypeSerializer<SV> stateSerializer,
        SD stateDesc) {
        this(keySerializer, namespaceSerializer, stateSerializer, stateDesc, new HashMap<N, Map<K, SV>>());
    }
 
AbstractFsState
public abstract class AbstractFsState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
        extends AbstractHeapState<K, N, SV, S, SD, FsStateBackend> {

    /** The file system state backend backing snapshots of this state */
    private final FsStateBackend backend;

    public abstract KvStateSnapshot<K, N, S, SD, FsStateBackend> createHeapSnapshot(Path filePath); //

    @Override
    public KvStateSnapshot<K, N, S, SD, FsStateBackend> snapshot(long checkpointId, long timestamp) throws Exception {

        try (FsStateBackend.FsCheckpointStateOutputStream out = backend.createCheckpointStateOutputStream(checkpointId, timestamp)) { //

            // serialize the state to the output stream
            DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(new DataOutputStream(out));
            outView.writeInt(state.size());
            for (Map.Entry<N, Map<K, SV>> namespaceState: state.entrySet()) {
                N namespace = namespaceState.getKey();
                namespaceSerializer.serialize(namespace, outView);
                outView.writeInt(namespaceState.getValue().size());
                for (Map.Entry<K, SV> entry: namespaceState.getValue().entrySet()) {
                    keySerializer.serialize(entry.getKey(), outView);
                    stateSerializer.serialize(entry.getValue(), outView);
                }
            }
            outView.flush(); //真实的内容是刷到文件的

            // create a handle to the state
            return createHeapSnapshot(out.closeAndGetPath()); //snapshot里面需要的只是path
        }
    }
}

 

对于kv state,也分为好几类,valuestate,liststate,reducestate,foldstate,

简单起见,先看valuestate

public class FsValueState<K, N, V>
    extends AbstractFsState<K, N, V, ValueState<V>, ValueStateDescriptor<V>>
    implements ValueState<V> {

    @Override
    public V value() {
        if (currentNSState == null) {
            currentNSState = state.get(currentNamespace); //现初始化当前namespace的kv
        }
        if (currentNSState != null) {
            V value = currentNSState.get(currentKey);
            return value != null ? value : stateDesc.getDefaultValue(); //取出value,如果为null,从stateDesc中取出default
        }
        return stateDesc.getDefaultValue();
    }

    @Override
    public void update(V value) {
        if (currentKey == null) {
            throw new RuntimeException("No key available.");
        }

        if (value == null) {
            clear();
            return;
        }

        if (currentNSState == null) {
            currentNSState = new HashMap<>();
            state.put(currentNamespace, currentNSState);
        }

        currentNSState.put(currentKey, value); //更新
    }

    @Override
    public KvStateSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, FsStateBackend> createHeapSnapshot(Path filePath) {
        return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), stateSerializer, stateDesc, filePath); //以文件路径,创建snapshot
    }

 

继续看FsStateSnapshot

public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
        extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD, FsStateBackend> {

    public abstract KvState<K, N, S, SD, FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, SV>> stateMap); //

    @Override
    public KvState<K, N, S, SD, FsStateBackend> restoreState(
        FsStateBackend stateBackend,
        final TypeSerializer<K> keySerializer,
        ClassLoader classLoader) throws Exception {

        // state restore
        ensureNotClosed();

        try (FSDataInputStream inStream = stateBackend.getFileSystem().open(getFilePath())) {
            // make sure the in-progress restore from the handle can be closed
            registerCloseable(inStream);

            DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream);

            final int numKeys = inView.readInt();
            HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys);

            for (int i = 0; i < numKeys; i++) {
                N namespace = namespaceSerializer.deserialize(inView);
                final int numValues = inView.readInt();
                Map<K, SV> namespaceMap = new HashMap<>(numValues);
                stateMap.put(namespace, namespaceMap);
                for (int j = 0; j < numValues; j++) {
                    K key = keySerializer.deserialize(inView);
                    SV value = stateSerializer.deserialize(inView);
                    namespaceMap.put(key, value);
                }
            }

            return createFsState(stateBackend, stateMap); //
        }
        catch (Exception e) {
            throw new Exception("Failed to restore state from file system", e);
        }
    }
}

 

FsValueState内部实现的snapshot
public static class Snapshot<K, N, V> extends AbstractFsStateSnapshot<K, N, V, ValueState<V>, ValueStateDescriptor<V>> {
    private static final long serialVersionUID = 1L;

    public Snapshot(TypeSerializer<K> keySerializer,
        TypeSerializer<N> namespaceSerializer,
        TypeSerializer<V> stateSerializer,
        ValueStateDescriptor<V> stateDescs,
        Path filePath) {
        super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath);
    }

    @Override
    public KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, V>> stateMap) {
        return new FsValueState<>(backend, keySerializer, namespaceSerializer, stateDesc, stateMap);
    }
}

 

2. FunctionState

stateHandle对于KvState,更为通用一些

/**
 * StateHandle is a general handle interface meant to abstract operator state fetching.
 * A StateHandle implementation can for example include the state itself in cases where the state
 * is lightweight or fetching it lazily from some external storage when the state is too large.
 */
public interface StateHandle<T> extends StateObject {

    /**
     * This retrieves and return the state represented by the handle.
     *
     * @param userCodeClassLoader Class loader for deserializing user code specific classes
     *
     * @return The state represented by the handle.
     * @throws java.lang.Exception Thrown, if the state cannot be fetched.
     */
    T getState(ClassLoader userCodeClassLoader) throws Exception;
}

 

3. OperatorState,典型的是windowOperater的状态

OperatorState,也是用StateHandle作为,snapshot的抽象

 

看下这三种State如何做snapshot的

AbstractStreamOperator,看看和checkpoint相关的接口,可以看到只会snapshot KvState
@Override
public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
    // here, we deal with key/value state snapshots

    StreamTaskState state = new StreamTaskState();

    if (stateBackend != null) {
        HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> partitionedSnapshots =
            stateBackend.snapshotPartitionedState(checkpointId, timestamp);
        if (partitionedSnapshots != null) {
            state.setKvStates(partitionedSnapshots);
        }
    }

    return state;
}

@Override
@SuppressWarnings("rawtypes,unchecked")
public void restoreState(StreamTaskState state) throws Exception {
    // restore the key/value state. the actual restore happens lazily, when the function requests
    // the state again, because the restore method needs information provided by the user function
    if (stateBackend != null) {
        stateBackend.injectKeyValueStateSnapshots((HashMap)state.getKvStates());
    }
}

@Override
public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
    if (stateBackend != null) {
        stateBackend.notifyOfCompletedCheckpoint(checkpointId);
    }
}

 

AbstractUdfStreamOperator
public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT>

这个首先继承了AbstractStreamOperator,看下checkpoint相关的接口,

@Override
public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
    StreamTaskState state = super.snapshotOperatorState(checkpointId, timestamp); //先执行super的snapshotOperatorState,即Kv state的snapshot

    if (userFunction instanceof Checkpointed) {
        @SuppressWarnings("unchecked")
        Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;

        Serializable udfState;
        try {
            udfState = chkFunction.snapshotState(checkpointId, timestamp); //snapshot,function的状态
        }
        catch (Exception e) {
            throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
        }

        if (udfState != null) {
            try {
                AbstractStateBackend stateBackend = getStateBackend();
                StateHandle<Serializable> handle =
                        stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp); //调用stateBackend存储state,并返回snapshot
                state.setFunctionState(handle);
            }
            catch (Exception e) {
                throw new Exception("Failed to add the state snapshot of the function to the checkpoint: "
                        + e.getMessage(), e);
            }
        }
    }

    return state;
}

@Override
public void restoreState(StreamTaskState state) throws Exception {
    super.restoreState(state);

    StateHandle<Serializable> stateHandle =  state.getFunctionState();

    if (userFunction instanceof Checkpointed && stateHandle != null) {
        @SuppressWarnings("unchecked")
        Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction;

        Serializable functionState = stateHandle.getState(getUserCodeClassloader());
        if (functionState != null) {
            try {
                chkFunction.restoreState(functionState);
            }
            catch (Exception e) {
                throw new Exception("Failed to restore state to function: " + e.getMessage(), e);
            }
        }
    }
}

@Override
public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
    super.notifyOfCompletedCheckpoint(checkpointId);

    if (userFunction instanceof CheckpointListener) {
        ((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);
    }
}

可以看到这个operater,会snapshot kv state,和udf中的function的state

 

WindowOperator,典型的operater state
public class WindowOperator<K, IN, ACC, OUT, W extends Window>   extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>   implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable
public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {

    if (mergingWindowsByKey != null) {
        TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
        ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
        for (Map.Entry<K, MergingWindowSet<W>> key: mergingWindowsByKey.entrySet()) {
            setKeyContext(key.getKey());
            ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, mergeStateDescriptor);
            mergeState.clear();
            key.getValue().persist(mergeState);
        }
    }

    StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);

    AbstractStateBackend.CheckpointStateOutputView out =
        getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);

    snapshotTimers(out);

    taskState.setOperatorState(out.closeAndGetHandle());

    return taskState;
}

@Override
public void restoreState(StreamTaskState taskState) throws Exception {
    super.restoreState(taskState);

    final ClassLoader userClassloader = getUserCodeClassloader();

    @SuppressWarnings("unchecked")
    StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
    DataInputView in = inputState.getState(userClassloader);

    restoreTimers(in);
}
时间: 2024-12-13 19:05:26

Flink - state的相关文章

Flink - state管理

在Flink – Checkpoint 没有描述了整个checkpoint的流程,但是对于如何生成snapshot和恢复snapshot的过程,并没有详细描述,这里补充   StreamOperator /** * Basic interface for stream operators. Implementers would implement one of * {@link org.apache.flink.streaming.api.operators.OneInputStreamOper

Flink State的两张图

streamTask的invoke方法中,会循环去调用task上的每个operator的initializeState方法,在这个方法中,会真正创建除了savepointStream的其他三个对象, 而savepointStream会lazy到做savepoint的时候才创建对象,这个也可以理解,毕竟savepoint不是必须的.那么,三个对象创建了之后,就可以发挥作用了吗?不是.KeyedStateBackend和OperatorStateBackend创建之后立刻就会发生作用,因为用户的代码

Apache-Flink深度解析-State

摘要: 实际问题 在流计算场景中,数据会源源不断的流入Apache Flink系统,每条数据进入Apache Flink系统都会触发计算.如果我们想进行一个Count聚合计算,那么每次触发计算是将历史上所有流入的数据重新新计算一次,还是每次计算都是在上一次计算结果之上进行增量计算呢?答案是肯定的,Apache Flink是基于上一次的计算结果进行增量计算的. 实际问题 在流计算场景中,数据会源源不断的流入Apache Flink系统,每条数据进入Apache Flink系统都会触发计算.如果我们

Flink 1.0到1.9特性

Flink API 历史变迁 在 Flink 1.0.0 时期,加入了 State API,即 ValueState.ReducingState.ListState 等等.State API 可以认为是 Flink 里程碑式的创新,它能够让用户像使用 Java 集合一样地使用 Flink State,却能够自动享受到状态的一致性保证,不会因为故障而丢失状态.包括后来 Apache Beam 的 State API 也从中借鉴了很多. 在 Flink 1.1.0 时期,支持了 Session Wi

Flink - Working with State

All transformations in Flink may look like functions (in the functional processing terminology), but are in fact stateful operators. You can make every transformation (map, filter, etc) stateful by using Flink's state interface or checkpointing insta

Flink之状态之状态存储 state backends

流计算中可能有各种方式来保存状态: 窗口操作 使用 了KV操作的函数 继承了CheckpointedFunction的函数 当开始做checkpointing的时候,状态会被持久化到checkpoints里来规避数据丢失和状态恢复.选择的状态存储策略不同,会导致状态持久化如何和checkpoints交互. 1.可用的状态持久化策略 Flink提供了三种持久化策略,如果没有显式指定,则默认使用MemoryStateBackend. The MemoryStateBackend 将数据保存在java

Flink中案例学习--State与CheckPoint

一.State 在Flink中,按照基本类型,对State做了以下两类的划分: Keyed State,和Key有关的状态类型,它只能被基于KeyedStream之上的操作,方法所使用.我们可以从逻辑上理解这种状态是一个并行度操作实例和一种Key的对应, <parallel-operator-instance, key>.Operator State(或者non-keyed state),它是和Key无关的一种状态类型.相应地我们从逻辑上去理解这个概念,它相当于一个并行度实例,对应一份状态数据

「Flink」使用Managed Keyed State实现计数窗口功能

先上代码: public class WordCountKeyedState { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 初始化测试单词数据流 DataStreamSource<String> lineDS = env.addSource(n

使用Flink时遇到的坑

1.启动不起来 查看JobManager日志: WARN org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to retrieve leader gateway and port. akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://[email protected]:6123/), Path(/user/jo