Flink之状态之状态存储 state backends

流计算中可能有各种方式来保存状态:

  • 窗口操作
  • 使用 了KV操作的函数
  • 继承了CheckpointedFunction的函数

当开始做checkpointing的时候,状态会被持久化到checkpoints里来规避数据丢失和状态恢复。选择的状态存储策略不同,会导致状态持久化如何和checkpoints交互。

1.可用的状态持久化策略

Flink提供了三种持久化策略,如果没有显式指定,则默认使用MemoryStateBackend。

The MemoryStateBackend

将数据保存在java的堆里,kv状态或者window operator用hash table来保存values,triggers等等。

当进行checkpoints的时候,这种策略会对状态做快照,然后将快照作为checkpoint acknowledgement的一部分发送给JobManager,JM也将其保存在堆中。

MemoryStateBackend可以使用异步的方式进行快照,我们也鼓励使用异步的方式,避免阻塞,现在默认就是异步。如果不希望异步,可以在构造的时候传入false,如下:

new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);

限制:

  • 单次状态大小最大默认被限制为5MB,这个值可以通过构造函数来更改。
  • 无论单次状态大小最大被限制为多少,都不可用大过akka的frame大小。
  • 聚合的状态都会写入JM的内存。

适合:

  • 本地开发和调试。
  • 状态比较少的作业

The FsStateBackend

FsStateBackend 通过文件系统的URL来设置,比如“hdfs://namenode:40010/flink/checkpoints”或者“file:///data/flink/checkpoints”。

保持数据在TM的内存中,当做checkpointing的时候,会将状态快照写入文件,保存在文件系统或本地目录。少量的元数据会保存在JM的内存中。

默认使用异步的方式进行快照,同样,取消异步需要传递false:

 new FsStateBackend(path, false);

适用:

  • 状态比较大,窗口比较长,大的KV状态
  • 需要做HA的场景

The RocksDBStateBackend

RocksDBStateBackend 通过文件系统的URL来设置,例如“hdfs://namenode:40010/flink/checkpoints”或者“file:///data/flink/checkpoints”。

保存数据在一个叫做RocksDB的数据库中,这个数据库保存在TM的数据目录中。当做checkpointing时,整个数据库会被写入文件系统和目录。少量的元信息会保存在JM的内存中。

这种策略只支持异步快照。

限制:

  • 由于依赖于字节数组,支持的key和value的大小最大为2^31字节。对于使用Merge操作的状态,大小很可能就默默的超过了这个限制,下次获取就会失败。

适合:

  • 非常大的状态,长窗口,大的KV状态
  • 需要HA的场景

能够持有的状态的多少只取决于可使用的磁盘大小,这会允许使用非常大的状态,相比较FsStateBackend将状态保存在内存中。但这也同时意味着,这个策略的吞吐量会受限。

RocksDBStateBackend是目前唯一支持incremental的checkpoints的策略。

2.配置状态持久化策略

如果你没有指定任何策略,默认使用JM作为存储策略。如果你想更改,可以在flink-conf.yaml中变更,存储策略也可以在作业中单独设定。

Setting the Per-job State Backend

可以在StreamExecutionEnvironment中指定:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

Setting Default State Backend

默认的状态存储策略通过在flink-conf.yaml中通过state.backend来指定,有如下一些可选:

  • jobmanager (MemoryStateBackend)
  • filesystem (FsStateBackend)
  • rocksdb (RocksDBStateBackend)

也可以以全路径来指定,比如org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory 来代替 RocksDBStateBackend,不过,何必了。

state.checkpoints.dir这个参数来指定所有的checkpoints数据和元数据存储的位置。示例如下:

# The backend that will be used to store operator state checkpoints

state.backend: filesystem

# Directory for storing checkpoints

state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

原文地址:https://www.cnblogs.com/029zz010buct/p/9403283.html

时间: 2024-10-10 17:24:56

Flink之状态之状态存储 state backends的相关文章

Rhythmk 学习 Hibernate 02 - Hibernate 之 瞬时状态 离线状态 持久化状态 三状态

by:rhythmk.cnblogs.com 1.Hibernate 三种状态: 1.1.三种定义(个人理解,不一定准确):  瞬时状态(transient):    不被session接管,且不存在数据库中的对象的状态,类似于新New一个对象  离线状态 (detached):    数据库中存在而不被session接管  持久化状态(persistent): 对象被session管理且数据库中存在此对象 1.2. 状态之间转换关系图 2 .状态转换以及Hibernate数据库执行过程详解:

Android官方文章翻译之管理设备苏醒状态(Managing Device Awake State)

这几个月一直在学习iOS开发,把Android放在一旁耽搁了很久,是时候温故而知新了. 说点篇外话,这几天在看Developer的官方文档,关于Material Design又了解了一下. Material早在2014年的GoogleIO大会就已经被谷歌推出了,但是市场上采用这种新的设计方案的APP确是少之又少,国外倒是跟进的很快,国内几乎没有任何进展,纵观国内各大公司的移动APP,很少会“与时俱进”的采用Google推出的新系统的设计方案. 不过这原因由来已久了,因为安卓系统的版本分布太过零散

select刷新后,保持选定状态,Cookies存储select选定状态信息

//cookies存储select选定值,防止刷新后没了 window.onload = function () { var cooki = document.cookie; if (cooki != "") { cooki = "{\"" + cooki + "\"}"; cooki = cooki.replace(/\s*/g, "").replace(/=/g, '":"').re

Ejb-有状态&无状态SessionBean

1.    Ejb的分类 首先,企业级Bean分为三类: SessionBean用于实现业务逻辑,它可以是有状态的,也可以是无状态的.每当客户端请求时,容器就会选择一个SessionBean来为客户端服务.SessionBean可以直接访问数据库,但更多时候,它会通过EntityBean实现数据访问. Entity Bean是域模型对象(用来表示真实世界的实体),用于实现O/R映射,负责将数据库中的表记录映射为内存中的Entity对象,事实上,创建一个Entity Bean对象相当于新建一条记录

[Java并发编程]-线程的六种状态及其状态转换

转载请注明:http://blog.csdn.net/UniKylin/article/details/45050823 1.线程自身信息 线程运行的过程会产生很多信息,这些信息都保存在Thread类中的成员变量里面,常见的有: a.线程的ID是唯一标识getId() b.线程的名称:getName(),如果不设置线程名称默认为"Thread-xx" c.线程的优先级:getPriority,线程优先级从1-10,其中数字越大表示优先级别越高,同时获得JVM调度执行的可能性越大,JDK

有状态无状态回话bean

1.有状态(Stateful) 可以在不同的方法调用间保持针对各个客户端的状态 与客户端的联系必须被维持,这样做开销要大一些 有状态也可以这样理解,它存在存储能力,也就是说至少有一个属性来标识它目前的状态,例如: 注意:有状态会话bean,每个用户有自己特有的一个实例,在用户的生存期内,bean保持了用户的信息,即“有状态”:一旦用户灭亡(调用结束或实例结束),bean的生命期也告结束.即每个用户最初都会得到一个初始的bean. 2.无状态(Stateless) 在不同方法调用间不保留任何状态

MATLAB学习(一)——状态好状态坏,自作自受

状态不好,学学MATLAB做做准备吧. 一.基本情况 1.1 书写 一行写不下? %可以加上三个小黑点(续行符)并按下回车键,然后接下去再写.例如 s=1-1/2+1/3-1/4+1/5-1/6+1/7-…- 1/8+1/9-1/10+1/11-1/12; 1.2 运算相关 不等于:~= 所以,非:~ 常用的矩阵运算: eye(size(A))   产生与A矩阵同阶的单位矩阵 zeros(m,n)         产生0矩阵 ones(m,n)         产生1矩阵 rand (m,n) 

hibernate实体对象的三种状态:自由状态,持久状态,游离状态.

自由态与游离态的区别: 当一个持久化对象,脱离开Hibernate的缓存管理后,它就处于游离状态,游离对象和自由对象的最大区别在于,游离对象在数据库中可能还存在一条与它 对应的记录,只是现在这个游离对象脱离了Hibernate的缓存管理,而自由对象不会在数据库中出现与它对应的数据记录 (1)自由状态(Transient) 特性: ①不在Session的缓存中,不与任何的Session实例相关联. ②在数据库中没有与之相对应的记录. (2)持久状态(Persistent) 特性 ①在Session

eigrp的查询和主动状态------active状态和sia状态

查询的步骤: 拓扑结构--A是BCDE的吓一跳路由器,B是CDE的后继路由器. 10.0的路由器现在断开了,A的10.0进入active状态,A会向BCDE发送query,CDE接到查询后,知道A不能够到底10.0的网络,立即把后继路由器B放到路由表中,给A回复replay,A解除对CDE的ACTIVE状态,因为B没有后继路由器,向CDE发送查询,CDE接到查询后,因为CDE都没有后继路由器了,所以向A发送查询,A接到查询后向CDE发送replay,告诉他们自己没有去10.0的查询,CDE重新收