在实时数仓分层中,Kafka是一种比较常见的中间存储层,而在分布式计算中由于硬件、软件等异常导致的任务重启是一种正常的现象,通过之前的Kafka-Consumer分析得知,offset 是跟随着checkpoint周期性的保存, 那么消息是有可能被重复消费的,而Kafka 作为输出端并不属于整个Flink任务状态的一部分,重复被消费的消息会重复的输出,因此为了保证输出到Kafka数据的一致性,Flink 在Kafka Sink端的事务语义。本篇主要介绍Kafka-Sink 的执行流程与核心设计。
Kafka 幂等与事务
幂等
在通常情况下,生产者发送数据可能由于网络等原因导致数据重复发送, 常见的解法就是幂等操作, 也就是执行多次相同的操作与其执行一次的影响结果是一样的。Kafka 不像MySQL/HBase 这样存储可以通过uniqueKey或者RowKey 机制来保证幂等, 为了实现幂等引入了两个概念producerId与sequenceNumber, 每一个producer 都会有一个由服务端生成的producerId与之对应,sequenceNumber 是partition级别的自增消息序列号,客户端每一条消息都会对应生成一个sequenceNumber,在服务端同样会保存该sequenceNumber, 只有当客户端消息的sequenceNumber 大于服务端存储的sequenceNumber 该消息才会被接受,通过这种方式保证消息的幂等性,从而保证数据的一致性。
但是对于幂等消息有个重要的问题:不能跨topic 、跨partition 保证数据一致性,如果producer 生产的消息横跨多个topic、partition, 可能会存在部分成功,部分失败的情况;另外幂等只是在单次producer 会话中, 如果pruducer 因为异常原因重启,仍然可能会导致数据重复发送。因此引入了事务解决该问题。
事务
事务要求遵循原子性,即要么成功要么失败,为了应对跨topic、跨partition问题,kafka引入了TransactionCoordinator 事务协调者,由该协调者协调事务的提交与回滚操作,同时引入了_transaction_state 日志来持久化事务信息(与事务相关的topic、partition、producer等), 其本质也是一个topic, TransactionCoordinator 通过_transaction_state 日志信息来恢复或者取消事务。
为了能够跨producer会话,提供了一个transactionId 的概念, 由客户端指定,能够保证producer重启时仍然能够找到对应的producerId (也就是你是你), 从而继续完成事务。transactionId与producerId 同样也会保存到__transaction_state 中。
逻辑执行流程
前面分析了kafka-producer 幂等与事务相关的原理, 其可以保证单producer在跨topic、partition下的数据一致性,但是在Flink中是一个分布式的计算环境,多并发下会有多个producer 生产数据, 那么需要保证的是多个producer下的数据一致性。
通过之前对Flink的了解,Flink提供了基于checkpoint 下的两阶段提交流程(flink exectly-once系列之两阶段提交概述) ,该流程可以保证全局一致性的事务, 那么只需要将KakfkaProducer 的两阶段提交与Flink checkpoint提交融合起来即可实现。接下来看具体的融合逻辑:
左侧为正常事务的提交(以客户端的视角)流程,右侧为checkpoint 略缩版流程, 那么现在需要将这两部分逻辑融合起来:
- 开启事务, 事务的开端,每一次checkpoint 都应该是一个新的事务,因此应该在开始checkpoint 的流程中执行
- 写入数据,对于Flink来说就是正常的数据处理流程
- 异常处理, 在分布式的环境中,硬件或软件导致的失败属于正常现象,因此为了做容错处理需要保存事务相关信息, 也就是需要将其保存到状态中,需要在保存状态的流程中执行
- 提交事务,待整个checkpoint 完成在checkpoint完成回调中执行提交事务
- 回滚事务, 如果出现异常情况,那么可能会存在未完成或者待提交的事务,这部分事务已经在异常处理流程中保存了起来,因此可以在状态恢复流程中执行
具体实现
Flink中将两阶段提交做了一个抽象 TwoPhaseCommitSinkFunction,其实现了CheckpointedFunction与CheckpointListener这两个与checkpoint流程相关的两个接口,提供了以下几个主要的抽象方法:
- beginTransaction:开启事务
- preCommit:预提交
- commit:提交
- recoverAndCommit :恢复并且提交事务
- abort:取消事务
- recoverAndAbort:恢复并且提交事务
让使用者只需要实现这几个方法即可。那么接下来看在flink 的执行流程去看是如何调用这几个方法的:
从上面分析来看整个流程是比较简单的, 重点就在于如何做异常处理,面对可能会出现异常的情况做好检查点以便恢复处理。而对于FlinkKafkaProducer 的实现只需要继承TwoPhaseCommitSinkFunction 类,并且重写上面提到的几个抽象方法即可:
总结
本篇主要从事务角度介绍了Kafka 事务实现与FlinkKafkaProducer事务的实现。