Flink State
2023-09-06 14:24:38 2 举报
Flink算子状态
作者其他创作
大纲/内容
离线:任务失败 重新把昨天的 获取依赖RDD的血缘 将上一步结果重新计算即可实时:如果还像离线一样 任务失败 重新把昨天的计算 太慢了 没有时效性
HashMapStateBackend--默认整合了内存和文件 每个状态默认5M 通过构造函数可以设置 内存不够 溢出的写到本地磁盘状态规模超出内存空间时 读写效率明显降低
setUpdateType:状态时间戳的更新时机 是一个枚举Disabled:禁用TTLOnCreateAndWrite:每次写入都会更新State最后访问时间OnReadAndWrite:每次读都会更新State最后访问时间
重载父类的对应的方法
有三个重载方法对应父类也是重载的三种
1.Flink运行时 统一管理 序列化 故障恢复等都实现好了只需要调用接口2.配置了容错机制后 自动持久化 自动恢复3.应用发送横向扩展 状态自动重组分配到所有子任务实列上4.提供:Value State 值状态 ListState 列表状态Map State 映射状态 AggregateState 聚合状态5.内部支持各种数据类型
一个算子很多并行度 一个并行度对应一个槽不同的槽 计算资源时物理隔离的状态在并行任务间是无法共享 状态只针对当前子任务的实列有效
按照计算隔离 可以用于所有算子一个并行度 = 一个状态 = 一个槽
StateTtlConfig各参数详解
扩缩容范围并发改变 reblancelistState均匀分配BroadcastState会把状态拷贝到全部新任务上实现CheckPointFunction接口或 ListCheckPoint接口数据结构ListState BroadCastState
环境.setStateBackend(new HashMapStateBackend)
基于RocksDB存储类似hbase的内存存储库 kv形式存储
实现需要在创建对象前StateTtlConfig.newBuilder()设置
如果算子需要历史状态 先将历史状态获取到然后跟数据进行计算逻辑 更新历史状态 返回计算结果执行一次环境 只会获取一次历史状态 但会不断更新状态
内存:environment.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage())文件 外部系统:environment.getCheckpointConfig().setCheckpointStorage(路径)
状态TTL
1.配置环境2.获取数据源3.自定义类实现 mapfunciton 和 checkpointFunction接口4.重写 map snapshotstate快照 initializestate序列状态5.定义属性 标记 和 状态集合 调用一次map 标记自增6.initializestate方法 实例化一个描述器 ListStateDescriptor7.通过上下文调用.getOperatorStore 在调用.getListState()将描述器传进去 赋值给状态集合8.snapshotState快照 先将状态集合清理 在add添加属性9.环境对象调用.enableCheckPoint(5000) 五秒开启一个检查点10.环境对象调用.getcheckPointConfig.setCheckpointStorage(File:///)11.通过initializeState方法中的上下文.isRestored() 如果有恢复的数据遍历历史集合 将历史数据拿出来 赋值 然后在自增12.获取历史数据之前得先开启配置 newConfiguartion()将配置对象.setString(\"execution.savepoint.path\
setTtl:状态过期的时间设置了TTL 上次访问时间 + TTL小于当前时间 表明状态过期
cleanupStrategies:过期状态如何清理FULL_STATE_SCAN_SNAPSHOT:过期值只有在显式读出时才会被删除INCREMENTAL_CLEANUP:增量地触发对某些状态项的清理ROCKSDB_COMPACTION_FILTER:定期使用异步压缩来合并状态的更新和减少储存
状态类型
扩缩容范围key group是重写分配的最小单元并发改变时 以keygroup为单位将键值分配给不同任务实现Rich Function通过getrumtimeContext
返回类型
基于文件FsStateBackend
Managed State / 托管状态Flink管理
来回切换
对状态进行存活管理淘汰机制:基于存活时间存活时长的计时器可以在数据被读 写时重置存活管理粒度是到元素级
根据当前数据和历史数据进行计算需要保存历史窗口计算的结果历史数据-->statesum
数据结构ValueState :保存可以更新和检索的值 kv形式 通过k更新(update) 通过v检索span style=\"font-size: inherit;\
环境.setStateBackend(new EmbeddedRocksDBStateBackend)
调用
isRestored()-->状态是从上一次执行的快照还原的,则返回 truegetRestoredCheckePointId()-->状态是从上一次执行的快照还原的 返回检查点IDgetOperatorStateStore()-->返回允许向后端注册操作员状态的接口getKeyedStateStore()-->返回允许向后端注册带密钥状态的接口
状态后端
开辟一块内存 自己管理实现状态的序列化 和 故障恢复Flink只会把自定义的当字节数组存储 没有其它的任何自动操作缺点:自己管理 太麻烦
1.13以后统一了格式
本地管理
算子状态/ Operator
托管方式
按照key隔离 只能用于keyby算子相同key对应一个状态
根据当前数据直接转换结果计算时不需要历史数据map filter flatMap
为什么流式需要状态?
1.new Configuartion配置2.配置对象.setString(\"execution.savepoint.path\
1.13以前
HDFS
jobManager 内存
setStateVisibility:状态 已过期未清理 能否返回 枚举NeverReturnExpired:不返回ReturnExpiredIfNotCleanedUp:可以返回
State
Row State / 原始状态自定义
键分区状态 / KeyBy
接口
有状态
基于内存 MemoryStateBackend
参数类
远端备份
setTtlTimeCharacteristic:StateTTL 所适用的时间模式ProcessTime:处理时间
构造器
无状态
基于RocksDB存储类似hbase的内存存储库 kv形式存储
状态本质是数据用来维护状态 本地访问 和 远端备份的
0 条评论
下一页