flink exactly-once系列目录:
一、两阶段提交概述
二、两阶段提交实现分析
三、StreamingFileSink源码分析
四、事务性输出实现
五、最终一致性实现
在【两阶段提交概述】中介绍了两阶段提交的基本思路以及如何根据checkpoint机制来实现两阶段提交思路,flink给出来两阶段提交抽象实现TwoPhaseCommitSinkFunction与具体实现FlinkKafkaProducer011。
一、TwoPhaseCommitSinkFunction
TwoPhaseCommitSinkFunction是一个抽象类,继承RichSinkFunction,实现CheckpointedFunction与CheckpointListener接口。抽象出了以下四个方法:
- beginTransaction, 开启一个事务,获得一个句柄
- preCommit,执行预提交
- commit ,执行提交
- abort,放弃一个事务
使用这四个方法然后结合checkpoint 过程提供的hook,来实现两阶段提交过程,看下其具体调用流程:
a. initializeState 状态初始化方法,只会被调用一次,第一件事情是用来恢复上次checkpoint完成预提交的事务与下一次checkpoint开始的事务,对于上次checkpoint完成预提交说明该checkpoint已经完成,那么执行commit操作,下一次checkpoint开始的事务说明该checkpoint,那么执行abort操作,第二件事情是开启一个新的事务,给新的checkpoint使用;
b. snapshotState 与checkpoint同步周期性执行的方法,首先执行preCommit对本次checkpoint事务执行预提交操作,并且开启一个新的事务提供给下一次checkpoint使用,然后将这两个事务句柄存放在state中进行容错,preCommit提交的事务就是在失败后重启需要commit的事务,而新开启的事务就是在失败后重启需要放弃的事务;
c. notifyCheckpointComplete checkpoint完成之后的回调方法,负责对预提交的事务执行commit操作。
在上面的流程中,任何一个步骤都有可能会失败,如果在预提交阶段失败,任务会失败重启回到最近一次的checkpoint成功状态,预提交的事务自然会因为事务超时而放弃;如果在预提交之后提交之前也就是完成checkpoint 但是还没触发notifyCheckpointComplete动作,这个这个过程中失败,那么就会从这次成功的checkpoint中恢复,会执行initializeState中的逻辑保证数据的一致性;如果在commit之后下次checkpoint之前失败,也就是在执行notifyCheckpointComplete之后失败,那么任务重启会继续提交之前已经提交过的事务,因此事务的提交需要保证重复提交不会影响数据的一致性。整个流程分析下来,除了需要保证事务重复提交保证数据的一致性外,还需要保证事务句柄能够被持久化容错,以便失败后重启恢复,接下来看下输出kafka 是如何保证数据一致性的。
二、FlinkKafkaProducer011
kafka从0.11版本开始提供了幂等与事务的特性,保证了数据的一致性,具体可以参考https://www.infoq.cn/article/kafka-analysis-part-8这篇文章,幂等通过producerId与SequenceNumber 来保证,但是幂等只能保证对单个分区操作的数据一致性,事务通过transactionId、producerId、epoch三个元素来保证,transactionId由客户端指定,producerId内部实现但是对用户透明、epoch表示对相同transactionId 不同producer的区分。FlinkKafkaProducer011继承TwoPhaseCommitSinkFunction抽象类,将kafka事务机制与checkpoint结合,如下图:
kafka的事务机制基本流程是先开启一个事务,然后发送数据,最后提交,将开启事务过程放在initializeState与snapshotState中,发送数据放在invoke中,flush 将所有缓存数据刷新到kafka ,相当于预提交操作,在snapshotState中执行,commitTransaction 提交操作放在notifyCheckpointComplete中执行。上面任何一个流程都有可能出现异常导致任务失败,对于kafka事务提交机制也是使用两阶段提交的模式,根据上一篇的分析,那么可能出现的问题就是在第二阶段,可能会出现部分提交成功部分提交失败导致数据不一致,如果能获取之前提交失败kafka 的transactionId、producerId、epoch这三个元素那么就可以在任务重启继续提交之前失败的事务,在flink 正好可以使用状态将这个三个元素进行容错,使重启之后可恢复。 在FlinkKafkaProducer011中使用KafkaTransactionState对象作为事务的句柄,保存着transactionId、producerId、epoch容错元素与FlinkKafkaProducer对象,FlinkKafkaProducer是transient类型的,不需要进行持久化,通过t-p-e就可以确定一个FlinkKafkaProducer。
理解以上流程就很好理解代码实现了,下面看几个重要的方法:
1. 开始事务,获得一个新的事务句柄
2. 预提交,执行flush操作
3. 提交,执行commitTransaction操作
4. 出现异常,任务重启放弃事务
三、两阶段提交实现总结
1. 外部存储需要满足事务特性
2. 外部存储需提供事务句柄,可持久化、可重新提交
3. 由于这种两阶段提交模式与checkpoint绑定在一起,checkpoint是周期性的执行,那么checkpoint周期的长短则会影响下游数据的延时性,需要根据实际使用情况来调整。