在上一篇《零基础学Flink:实时热销榜Top5(案例)》文档中我们介绍了如何计算实时热销榜。在案例的最后TopNHot类中,我们使用了状态类。
代码语言:javascript复制// 用于存储商品状态,待收齐同一个窗口的数据后,再触发 TopN 计算
private ListState<OrderView> orderState;
但是之前并未做详细介绍,那么我们今天就来详细说说Flink中的状态。
what 那什么是state
State是指流计算过程中计算节点的中间计算结果或元数据属性,比如 在聚合计算过程中要在state中记录中间聚合结果,比如 Apache Kafka 作为数据源时候,我们也要记录已经读取记录的offset,这些State数据在计算过程中会进行持久化(插入或更新)。所以Flink中的State就是与时间相关的,任务内部数据(计算数据和元数据属性)的快照。
那么我们有哪些典型的应用场景呢?
- 去重,记录所有主键
- 窗口计算,已进入未触发的数据
- 深度学习,训练的模型及参数
- 需要访问的历史数据:例如昨天的历史数据
why 为什么需要state
与批计算相比,state属于流计算特有的,因为没有failover机制,批计算要么成功,要么失败重新计算。而流计算在大多数场景下,需要增量计算,这样就需要保存之前计算的状态,用于快速回复计算。其主要场景
- 作业的备份与回复
- 计算资源的横向扩展
- SLA的保证
- 数据不丢不重,恰好计算一次
how 如何使用state
基本类型划分
在Flink中,按照基本类型,对State做了以下两类的划分:
- Keyed State,和Key有关的状态类型,它只能被基于KeyedStream之上的操作,方法所使用。我们可以从逻辑上理解这种状态是一个并行度操作实例和一种Key的对应, <parallel-operator-instance, key>。
- Operator State(或者non-keyed state),它是和Key无关的一种状态类型。相应地我们从逻辑上去理解这个概念,它相当于一个并行度实例,对应一份状态数据。因为这里没有涉及Key的概念,所以在并行度(扩/缩容)发生变化的时候,这里会有状态数据的重分布的处理。
下图是截取自孙梦瑶 的PPT,个人感觉还是总结的比较清楚的,感谢社区大牛们的贡献
组织形式划分
但是在这里还有一种按照组织形式的划分,也可以理解为按照runtime层面的划分,又可以分为一下两类:
- Managed State,这类State的内部结构完全由Flink runtime内部来控制,包括如何将它们编码写入到checkpoint中等等。
- Raw State,这类State就比较显得灵活一些,它们被保留在操作运行实例内部的数据结构中。从Flink系统角度来观察,在checkpoint时,它只知道的是这些状态数据是以连续字节的形式被写入checkpoint中。等待进行状态恢复时,又从字节数据反序列化为状态对象。
Managed State可以在所有的data stream相关方法中被使用,官方也是推荐优先使用这类State,因为它能被Flink runtime内部做自动重分布而且能被更好地进行内存管理。
如何保存state
Checkpoint
Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。这里,我们简单理解一下Flink Checkpoint机制,如官网下图所示:
checkpoint指定触发生成时间间隔后,每当需要触发Checkpoint时,会向Flink程序运行时的多个分布式的Stream Source中插入一个Barrier标记,这些Barrier会根据Stream中的数据记录一起流向下游的各个Operator。当一个Operator接收到一个Barrier时,它会暂停处理Steam中新接收到的数据记录。因为一个Operator可能存在多个输入的Stream,而每个Stream中都会存在对应的Barrier,该Operator要等到所有的输入Stream中的Barrier都到达。当所有Stream中的Barrier都已经到达该Operator,这时所有的Barrier在时间上看来是同一个时刻点(表示已经对齐),在等待所有Barrier到达的过程中,Operator的Buffer中可能已经缓存了一些比Barrier早到达Operator的数据记录(Outgoing Records),这时该Operator会将数据记录(Outgoing Records)发射(Emit)出去,作为下游Operator的输入,最后将Barrier对应Snapshot发射(Emit)出去作为此次Checkpoint的结果数据。
Savepoint
Savepoint会在Flink Job之外存储自包含(self-contained)结构的Checkpoint,它使用Flink的Checkpointing机制来创建一个非增量的Snapshot,里面包含Streaming程序的状态,并将Checkpoint的数据存储到外部存储系统中。
Flink程序中包含两种状态数据,一种是用户定义的状态(User-defined State),他们是基于Flink的Transformation函数来创建或者修改得到的状态数据;另一种是系统状态(System State),他们是指作为Operator计算一部分的数据Buffer等状态数据,比如在使用Window Function时,在Window内部缓存Streaming数据记录。为了能够在创建Savepoint过程中,唯一识别对应的Operator的状态数据,Flink提供了API来为程序中每个Operator设置ID,这样可以在后续更新/升级程序的时候,可以在Savepoint数据中基于Operator ID来与对应的状态信息进行匹配,从而实现恢复。当然,如果我们不指定Operator ID,Flink也会我们自动生成对应的Operator状态ID。
而且,强烈建议手动为每个Operator设置ID,即使未来Flink应用程序可能会改动很大,比如替换原来的Operator实现、增加新的Operator、删除Operator等等,至少我们有可能与Savepoint中存储的Operator状态对应上。另外,保存的Savepoint状态数据,毕竟是基于当时程序及其内存数据结构生成的,所以如果未来Flink程序改动比较大,尤其是对应的需要操作的内存数据结构都变化了,可能根本就无法从原来旧的Savepoint正确地恢复。
存储
至于如何存储,Flink定义了三种状态后端(State Backend),至于是那三种,以及应用场景,我们还是用一张图来说明吧
好了,flink状态和容错我们就先说到这。
参考连接
https://files.alicdn.com/tpsservice/1b9f5f0bda10883dce78496e6a5d648a.pdf
http://zhuanlan.51cto.com/art/201810/585018.htm
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html
https://blog.csdn.net/androidlushangderen/article/details/86485850
http://shiyanjun.cn/archives/1855.html
https://juejin.im/post/5bf93517f265da611510760d