Hi~朋友,关注置顶防止错过消息
- 什么是有状态的计算
- 使用状态的场景
- 为什么需要状态管理
- 理想状态管理的特点
- Flink状态分类
- Managed State分类
- Keyed Stated特点
- Operator State特点
- Keyed Stated的具体分类
- 如何保存状态
- Checkpoint和Savepoint区别
- 状态保存在哪里
什么是有状态的计算?
有状态计算指的就是程序在计算过程中,需要将数据(状态)存储在本地存储或者外部存储中,以便下一次进行计算时获取使用,比如统计Nginx某个地址的调用次数,需要在每次计算时 不停的进行累加,并且将结果进行存储以便下次累加获取使用。
使用状态的场景
- 去重:上游系统数据会重复,落到下游系统时根据主键进行去重,需要将所有主键都记录下来,新的数据到来时需要判断主键是否已经存在
- 窗口计算:每分钟Nginx的访问次数,09:00~09:01这个窗口的数据需要先存入内存,等到09:01到来时将数据进行输出
- 机器学习/深度学习:训练的模型和当前模型的参数也是一种状态
- 访问历史数据:例如和昨天数据进行对比,如果每次从外部去读消耗资源比较大,所以可以把这些历史数据放入状态中做对比
为什么需要状态管理?
流式作业一般需要7*24小时不间断的运行,在宕机恢复时需要保证数据不丢失,在计算时要保证计算结果准确,数据不重复,恰好计算1次,为了达到上述这些目的,我们就需要对 程序运行过程中的状态进行管理。
理想状态管理的特点
- 易用:需要提供丰富的数据结构、多样的状态组织形式以及简洁的扩展接口
- 高效:实时作业需要需要更低的延迟,因此在状态保存和恢复时,需要保证处理速度;同时在进行横向扩展时不能影响作业本身的处理性能
- 可靠:状态需要可以被持久化,保证宕机后可以恢复
Flink状态分类
Managed State | RawState | |
---|---|---|
状态管理方式 | Flink Runtime自动管理:自动存储、自动恢复、内存优化 | 用户自己管理,需要自己序列化 |
状态数据结构 | 已知的数据结构:Value、List、Map等 | 字节数组byte[] |
推荐使用场景 | 大多数情况下可以使用 | 自定义Operator时使用 |
Managed State分类
Managed State主要分为两类:
- Keyed State
- Operator State
Keyed State特点
- 只能使用在KeyedStream算子中
- 一个Key对应一个State,一个Operator实例可以处理多个key,访问相应的多个State
- 随着并发改变,State会随着key在多个Operator实例间迁移
- 需要通过RuntimeContext访问,因此Operator必须是一个RickFunction
- 支持多样化的数据结构:ValueState、ListState、ReducingState、AggregatingState、MapState
Operator State特点
- 适用所有的算子,常用于source
- 一个Operator实例对应一个State
- 并发改变时,有两种重新分配方式可以选择:均匀分配或者合并后每个得到全量
- 实现CheckpointedFunction或ListCheckpointed接口
- 支持的数据结构:ListState
Keyed Stated具体分类
- ValueState:存储单个值
- MapState:数据类型为Map,在State上有put和remove等方法
- ListState:数据类型为List
- ReducingState:Reducing的add方法不是将当前元素追加到列表,而是直接更新进Reducing的结果中
- AggregatingState:AggregatingState和ReducingState的区别是在访问接口,Reducing的add和get的元素都是同一个类型,但是Aggregating输入的是IN,输出的是OUT
如何保存状态
保存状态依赖Checkpoint和Savepoint机制,Checkpoint是在程序运行过程中自动触发,Savepoint需要手动触发。
如果从Checkpoint进行恢复,需要保证数据源支持重发,同时Flink提供了两种一致性语义(恰好一次或者至少一次)。
代码语言:javascript复制StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000L);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.setMinPauseBetweenCheckpoints(500L);
checkpointConfig.setCheckpointTimeout(60000L);
checkpointConfig.setMaxConcurrentCheckpoints(1);
checkpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- enableCheckpointing:设置Checkpoint的间隔时间,单位ms
- setCheckpointingMode:设置Checkpoint的模式,如果设置了EXACTLY_ONCE,则需要保证Barries对齐,保证消息不会丢失也不会重复
- setMinPauseBetweenCheckpoints:设置两次Checkpoint中间的等待时间,通过这个可以防止Checkpoint太过频繁导致业务处理速度下降
- setCheckpointTimeout:设置Checkpoint的最大超时时间,上面代码表示如果Checkpoint超过1min,则超时失败
- setMaxConcurrentCheckpoints:表示同时有多少个Checkpoint在做快照
- setExternalizedCheckpointCleanup:用于设置任务在Cancel时是否需要保留当前的Checkpoint,RETAIN_ON_CANCELLATION当作业取消时保留作业的checkpoint,该情况下需要手动清除该作业 的Checkpoint,DELETE_ON_CANCELLATION作业取消时删除作业的Checkpoint,仅当作业失败时保存Checkpoint
Checkpoint和Savepoint区别
Checkpoint | Savepoint | |
---|---|---|
触发管理方式 | Flink自动触发管理 | 用户手动触发管理 |
用途 | Task发生异常时快速恢复 | 有计划地进行备份,作业停止后可以恢复,比如修改代码、调整并发 |
特点 | 轻量;自动从故障恢复;作业停止后默认清除 | 持久;标准格式存储,允许代码或配置发生改变;手动触发从Savepoint的恢复 |
状态保存在哪里?
状态保存有三种方式:
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
MemoryStateBackend在Checkpoint是基于内存保存状态,该状态存储在TaskManager节点(执行节点)的内存中,因此会受到内存容量的限制(默认5M),同时还要受到akka.framesize的限制 (默认10M)。Checkpoint保存在JobManager内存中,因此总大小不能超过JobManager的内存,只推荐本次测试或无状态的作业使用。
FsStateBackend是基于文件系统保存状态的,状态依旧保存在TaskManager中,因此State不能超过单个TaskManager的内存容量,Checkpoint存储在外部文件系统中(比如HDFS或本地),打破了JobManager内存的限制, 但是总大小不能超过文件系统的容量,推荐状态小的作业使用。
RocksDBStateBackend,首先RocksDB是一个K-V的内存存储系统,当内存快满时,会写入到磁盘,RocksDB也是唯一支持增量Checkpoint的Backend,这说明用户不需要将所有状态都写入进去,可以 只将增量改变的状态写入即可。Checkpoint存储在外部文件系统,因此State不能超过单个TaskManager内存 磁盘总和,单key最大为2GB,总大小不超过文件系统的容量即可,推荐大状态作业使用。