本文主要讨论两个问题:
- Flink两阶段提交(2PC)的缺陷?
- 为什么上游Flink程序明明开启了checkpoint,下游Kafka消费者还可以实时消费上游Sink的kafka消息,好像没有发生因为上游checkpoint而可能存在的延迟消费现象?
问题一
Flink两阶段提交(2PC)的缺陷?
在上一篇文章「checkpoint【1】」中,我们讨论过在2PC过程的每个阶段出现故障时Flink的处理方式:
- Phase 1: Pre-commit 预提交阶段
- Flink 的 JobManager 向 source 注入 checkpoint barrier 以开启这次 snapshot
- barrier 从 source 流向 sink
- 每个进行 snapshot 的算子成功 snapshot 后,都会向 JobManager 发送 ACK
- 当 sink 完成 snapshot 后, 向 JobManager 发送 ACK 的同时向 kafka 进行 pre-commit
- Phase 2: Commit分支
- 当 JobManager 接收到所有算子的 ACK 后,就会通知所有的算子这次 checkpoint 已经完成(同步提交上游Topic的offset)
- Sink接收到这个通知后, 就向 kafka 进行 commit, 正式把数据写入到kafka
- Phase 2: rollback分支
不同阶段 fail over 的 recovery 举措:
- 在pre-commit前fail over,系统恢复到最近的checkponit
- 在pre-commit后,commit前fail over,系统恢复到刚完成pre-commit时的状态
这里少一种情况:
在JobManager发送给所有算子commit操作后,因为网络等原因fail over,只有一部分算子接受到了commit请求。本质上因为,Flink算子并没有对commit的成功与否和JobManager进行ACK。
2PC的并不是完美的,2PC并不是完美的,他存在着同步阻塞问题、单点故障问题、无法100%保证数据一致性等问题。上述bug也正式2PC算法的缺陷之一:无法100%保证数据一致性。
3PC在2PC基础上加入了一些补偿机制,例如,如果参与者没有收到协调者的消息时,他不会一直阻塞,过一段时间之后,他会自动执行事务。这种策略就可以解决因为网络异常,各算子一致阻塞等待JobManager的第二阶段的commit/abort通知。但是,一般情况下我们并不会对Flink进行这种级别的二次开发。那在实际情况中我们如何应对这种可能会引起数据不一致的情况呢?
那么,Flink是如何通知到我们这种情况的?如果commit失败了(比如网络中断引起的故障),整个flink程序也因此失败,它会根据用户的重启策略重启,可能还会有一个尝试性的提交。这个过程非常严苛,因为如果提交没有最终生效,会导致数据丢失。
Kafka SQL/Table UML
官方给出的自定义Flink SQL/Table Source/Sink的UML关系图如下,
Kafka SQL/Table的核心类有:
KafkaDynamicTableFactory
KafkaDynamicSource
KafkaDynamicSink
FlinkKafkaConsumer
FlinkKafkaProducer
FlinkKafkaProducer中枚举了数据投递的语义:Semantic.EXACTLY_ONCE,Flink生产者将在Kafka事务中写入所有消息,该事务将在检查点上提交给Kafka。在这种模式下, FlinkKafkaProducer建立一个FlinkKafkaInternalProducer池。在每个检查点之间创建一个Kafka事务,该事务在notifyCheckpointComplete(long)上notifyCheckpointComplete(long)。如果检查点完成通知延迟运行,则FlinkKafkaProducer可能会耗尽池中的FlinkKafkaInternalProducer。在这种情况下,任何后续的snapshotState(FunctionSnapshotContext)请求都将失败,并且FlinkKafkaProducer将继续使用前一个检查点的FlinkKafkaInternalProducer。为了减少检查点失败的机会,有四个选项:
- 减少最大并发检查点数
- 使检查点更可靠(以便更快完成)
- 增加检查点之间的延迟
- 增加FlinkKafkaInternalProducer池的大小
从源码角度解读2PC
1. 2PC -> beginTransaction
FlinkKafkaProducer.KafkaTransactionState beginTransaction()
对于每个检查点,我们都会创建一个新的FlinkKafkaInternalProducer以便新事务不会与在先前检查点期间创建的事务发生冲突( producer.initTransactions()确保我们获得了新的producerId和epoch计数器)
FlinkKafkaInternalProducer是一个FlinkSink衔接KafkaProducer的类,代理KafkaProducer,并且implements Producer 默认初始化5个,本质是一个核心线程默认5个的线程池
代码语言:javascript复制FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
producer.beginTransaction();
2. 2PC -> preCommit
代码语言:javascript复制void preCommit(FlinkKafkaProducer.KafkaTransactionState transaction)
调用:
flush(transaction);
调用:kafkaClient#flush
kafkaProducer.flush();
3. 2PC -> commit
代码语言:javascript复制void commit(FlinkKafkaProducer.KafkaTransactionState transaction)
调用:FlinkKafkaInternalProducer
transaction.producer.commitTransaction();
调用:kafkaClient#commitTransaction
kafkaProducer.commitTransaction();
4. 2PC -> abort
代码语言:javascript复制void abort(FlinkKafkaProducer.KafkaTransactionState transaction)
调用:FlinkKafkaInternalProducer
transaction.producer.abortTransaction();
调用:kafkaClient#commitTransaction
kafkaProducer.abortTransaction();
5. TwoPhaseCommitSinkFunction
代码语言:javascript复制TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichSinkFunction<IN> implements CheckpointedFunction, CheckpointListener
// 预提交先前创建的事务。预提交必须采取所有必要步骤,为将来可能发生的提交准备事务。在此之后,事务可能仍会中止,但是基础实现必须确保对已
// 预先提交的事务的提交调用将始终成功。通常,实现涉及刷新数据
preCommit
// 提交预先提交的交易。如果此方法失败,则将重新启动Flink应用程序,并为同一事务再次调用recoverAndCommit(Object) 。
commit
// 失败会调用recoverAndAbort
abort
6. 总结
Flink的2PC实现:抽象类TwoPhaseCommitSinkFunction有4个方法:
1. beginTransaction()
开启事务.创建一个临时文件.后续把原要写入到外部系统的数据写入到这个临时文件
2. preCommit()
flush并close这个文件,之后便不再往其中写数据.同时开启一个新的事务供下个checkponit使用
3. commit()
把pre-committed的临时文件移动到指定目录
4. abort()
删除掉pre-committed的临时文件
问题二
没有延迟的下游kafka消费者现象
刚开始用Flink SQL做Flink-Kafka端到端exactly once测试时,很疑惑一个问题:上游Flink SQL Sink到Kafka某个topic,然后在console中实时消费这个topic的数据,在程序中明明设置了exactly-once,为什么console中会实时消费数据,而不是像预想的那样成批的消费checkpoint(n,n 1)之间的数据。
思路:
1. 直接在上述源码分析中的FlinkKafkaProducer打断点调试,因为这里是Flink SQL实现Sink Kafka必由之路。
发现这里语义居然不是exactly-once,而是at least-once(默认),分析可能是设置方式不对,之前我是在Flink Stream API中设置了语义,
代码语言:javascript复制StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings);
env.enableCheckpointing(1000L * 5, CheckpointingMode.EXACTLY_ONCE);
后查阅资料发现在Fink SQL设置exactly-once是在table DDL中设置
代码语言:javascript复制// 定义 Kafka sink 的语义。
// 有效值为 'at-least-once','exactly-once' 和 'none'。
sink.semantic = exactly-once
2. 在DDL中设置exactly-once语义后,现象还是和原来一样。
断点调式源码的commit方法,发现确实存在commit这个动作,但是在解开断点之前,console居然已经消费到了消息!那么查阅资料为什么会消费到上游kafka还没有commit的消息,结果是kafka也有自己的事务隔离级别。如果先使得下游不能消费上游还未提交的消息效果,需要在下游的kafka消费端设置事务隔离级别:
将所有从 Kafka 中消费记录的应用中的 isolation.level 配置项设置成实际所需的值(read_committed 或 read_uncommitted,后者为默认值)。
3. 需要在下游消费端设置事务的隔离级别为:read_committed。
困惑、初心与曙光
为什么checkpoint、「精确一次」?
故障冗余(数据一致性)
为什么流式计算?
低延迟、实时性(性能、可用性)
为什么不一条数据一次checkpoint?
checkpoint代价太大(目前的软硬件技术性能瓶颈)
最终,又走到了问题漩涡的中心,又是CAP类似的平衡问题……
但是,有没有其他路可走?
at least-once 幂等的sink端
= 没有延迟的exactly-once