Flink 是 stateful 计算引擎,不同于 Storm。在 Storm 这类无状态计算引擎中,并行的任务实例(通常一个任务实例运行在一个线程中)是不存储计算状态的,即使有一些运行时的程序元信息也是放在了像 ZooKeeper 这种第三方的高可用分布式协调者介质中。怎么理解这里的“无状态”呢?可以理解为流中的每个元素流过每个任务实例时,任务实例不会将此次处理的一些信息带到下一次处理元素中,即任务实例所在的线程是不存在记忆的。Flink 则相反,但是为了实现 stateful 需要付出非常大的代价,尤其是在分布式环境中,还要保证状态的全局一致性。就是说分布式在各个并行度线程中的任务实例所保存的状态必须是针对某个一致的语义平面上建立的,否则就无法保证在分布式环境中遇到故障后重启时恢复状态后的程序一致性了。
数据投递策略 这种影响数据一致性的流元素投递策略有三种:
- at most once:数据处理最多一次,这种语义在异常情况下会有数据丢失;
- at least once:数据处理最少一次,这种语义在异常情况下会有数据重复;
- exactlyonce:数据精确处理一次,这种语义最复杂,要想完成这一目标需要在数据处理的各个环节做到保障。
拿 Kafka 来分析 Flink 的精确一次语义。Flink 是数据逐条处理的原生流处理引擎,每处理完一条数据提交 offset 不现实,所以,Flink 在实现 End-to-End Exactly-Once 语义时,Flink 采用 Two phase commit 来解决这个问题。
Two Phase Commit(2PC)
两阶段提交就是把提交操作分为两个步骤,一个是预提交阶段,一个是提交阶段,通常这里包含两个角色,一个是 master 为协调管理者,一个是slave 为执行者,提交过程如下:
- master 发出预提交命令给所有的 slave
- slave 执行预提交的命令,执行完后给 master 发送一个 ack 反馈信息
- 当 master 收到所有的 slave 的成功反馈信息,那么再次给 slave 发送提交信息 commit
- slave 执行 commit 操作
如果在上面的流程中,在流程2上出现问题,也就是预提交出现问题,那么 master 会收到 slave 的失败反馈,这个时候 master 会让 slave 进行回滚操作,保证数据的一致性,但是在流程4中出现问题,那么就会造成数据的不一致性,这个时候我们可以采用3阶段提交(3PC)技术或者其他的方式进行数据修正,来达到最终的一致性。
第一阶段(precommit):
第二阶段(commit分支):
第二阶段(abort分支):
从上述2PC过程可以看出,2PC需要数据存储外部系统具备事务能力,即可以对一次事务请求进行commit或者rollback操作。同样的,为了提供端到端Exactly-Once语义,除了Flink应用程序本身的状态,Flink写入的外部存储也需要满足这个语义。也就是说,这些外部系统必须提供提交或者回滚的方法,然后通过Flink的checkpoint来协调。
保存什么状态
Flink在checkpoint的时候需要保存的状态主要有两类:
- 内部状态:Flink state backends保存和管理的内容,例如,sum算子的计算当前结果值,内部状态在预提交阶段只需要写入state backend即可,保证在checkpoint commit时可以提交,或者在rollback时可以撤回即可。
- 外部状态:外部状态通常由需要写入的外部系统引入,例如Kafka,将数据写到Kafka,这就有了外部的状态。因此,为了提供Exactly-Once保证,外部系统必须提供事务支持,借此和两阶段提交协议交互。在预提交阶段,除了将状态写入到state backend之外,data sink必须预提交自己的外部事务。
例如,在以下这样一个Flink程序中:
- 从Kafka读取数据的data source(KafkaConsumer,在Flink中)
- 窗口聚合
- 将数据写回到Kafka的data sink(KafkaProducer,在Flink中)
在预提交阶段,需要完成动作如下:
当checkpoint屏障在所有operator中都传递了一遍,以及它触发的快照写入完成,预提交阶段结束。这个时候,快照成功结束,整个程序的状态,包括预提交的外部状态是一致的。下一步是通知所有operator,checkpoint已经成功了。这是两阶段提交中的提交阶段,Jobmanager为程序中的每一个operator发起checkpoint已经完成的回调。data source和window operator没有外部的状态,在提交阶段中,这些operator不会执行任何动作。data sink拥有外部状态,所以通过事务提交外部写入。
Flink 2PC
- Phase 1: Pre-commit
- Flink 的 JobManager 向 source 注入 checkpoint barrier 以开启这次 snapshot
- barrier 从 source 流向 sink
- 每个进行 snapshot 的算子成功 snapshot 后,都会向 JobManager 发送 ACK
- 当 sink 完成 snapshot 后, 向 JobManager 发送 ACK 的同时向 kafka 进行 pre-commit
- Phase 2: Commit分支
- 当 JobManager 接收到所有算子的 ACK 后,就会通知所有的算子这次 checkpoint 已经完成(同步提交上游Topic的offset)
- Sink接收到这个通知后, 就向 kafka 进行 commit, 正式把数据写入到kafka
- Phase 2: rollback分支
不同阶段 fail over 的 recovery 举措:
- 在pre-commit前fail over,系统恢复到最近的checkponit
- 在pre-commit后,commit前fail over,系统恢复到刚完成pre-commit时的状态
Barrier对齐
barrier 对齐发生在一个处理节点需要接收上游不同处理节点的数据,由于不同的上游节点数据处理速度不一致,那么就会导致下游节点接收到 barrier 的时间点也会不一致,这时候就需要使用 barrier 对齐机制:在同一checkpoint 中,先到达的 barrier 是否需要等待其他处理节点 barrier 达到后在发送后续数据,barrier将数据流分为前后两个 checkpoint(chk n,chk n 1) 的概念,如果不等待那么就会导致 chk n 的阶段处理了 chk n 1 阶段的数据,但是在 source 端所记录的消费偏移量又一致,如果 chk n 成功之后,后续的任务处理失败,任务重启会消费 chk n 1 阶段数据,就会到致数据重复消息,如果 barrier 等待就不会出现这样情况,因此 barrier 需要对齐那么就是实现 exectly once 语义,否则实现的是 at least once 语义。由于状态是属于 flink 内部存储,所以 flink 仅仅满足内部 exectly once 语义。
注意:所以,对于自带幂等的下游sink组件,天生可以将at least once转换为exactly once。
Asynchronous Barrier Snapshotting(ABS)
Flink 的 checkpoint 的过程依赖于异步屏障快照算法,该算法在《Lightweight Asynchronous Snapshots for Distributed Dataflows》这篇 paper 中被提出。这篇paper主要介绍了一个轻量级分布式异步快照,只需要保存较少的数据即可。并且将这种轻量级的算法在flink中实现,之后通过验证表明,这种算法对数据的处理和计算影响很小,而且拥有线性的可扩展性,并且在快照比较频繁的时候性能依旧良好。
Copy On Write(COW)
Flink的state backend利用写时复制机制,允许当做异步 snapshot 时,能够不受影响地继续处理流。
还利用了 Copy On Write 机制的有,例如, CopyOnWriteArrayList。在遍历List的时候,如果被修改了会抛出 java.util.ConcurrentModificationException 错误,这是因为一个线程在遍历list的时候,其他线程在修改list中元素。CopyOnWriteArrayList类最大的特点就是,在对其实例进行修改操作(add/remove等)会新建一个数据并修改,修改完毕之后,再将原来的引用指向新的数组。这样,修改过程没有修改原来的数组。也就没有了ConcurrentModificationException错误。
理解精确一次
Flink通过回退和重新Source数据,从故障中恢复,当理想情况下被描述为精确一次时,这并不意味着每个事件都将被精确一次处理。
相反,这意味着每一个事件都会影响Flink管理的状态精确一次。
端到端精确一次
为了Source的每个事件都精确一次对Sink生效,必须满足以下条件:
- Source必须是可重放的
- Sink必须是事务性的(或幂等的)