Flink-Kafka Connector 是连接kafka 的连接器,负责对接kafka 的读写, 本篇主要介绍kafka consumer 的执行流程与核心设计。
逻辑执行流程
- 分配当前task消费的partition与起始的offset : 根据从状态中恢复的数据与客户端指定的消费模式, 采取的方式是状态中offset优先, 即从状态中能够找到对应的offset 就使用该offset , 否则就根据客户端指定的方式
- 从kafka 中不断拉取数据, 发送到下游,并且保存当前的offset
- 为了保证整个任务的全局一致性,需要将offset 提交到状态中
- 如果开启了分区发现模式,那么需要将检测到新的分区添加到消费线程中。
两个重要接口
Flink 保证全局数据一致性是通过全局状态快照checkpoint 完成的, 也就是周期性的执行checkpoint 将当前的任务状态保存起来, Flink 在整个checkpoint 的执行过程中提供了两个接口,方便用户去做一些自定义的操作, 例如操作状态、两阶段提交实现等等。
CheckpointedFunction接口
提供了initializeState方法与snapshotState方法,initializeState方法是在任务初始化时候执行,常见的就是获取的checkpoint 中的状态数据;snapshotState方法是在每次checkpoint触发都会执行,常见的就是将数据存放在状态对象中,以便能够被持久化。
CheckpointListener接口
提供了notifyCheckpointComplete方法与notifyCheckpointAborted方法,这两个方法都是在一次checkpoint 完成之后执行,那么有可能是通知成功回调(notifyCheckpointComplete)也有可能失败回调(notifyCheckpointAborted)。
具体实现
对于Flink 来说source端的标准对接接口是SourceFunction ,主要实现其run方法,在run 中执行数据的pull操作;另外为了保证整个状态的一致性,在checkpoint时需要记录checkpoint时的offset, 并且保证其失败重启时也能够从checkpoint 记录的offset开始消费, 因此同时实现了CheckpointedFunction接口与CheckpointListener接口,这两个接口提供了可操作状态的一些方法。
FlinkKafkaConsumerBase 实现SourceFunction、CheckpointedFunction、CheckpointListener接口的抽象类,包含了整个流程的核心方法,如下:
initializeState
从checkpoint 中 恢复最近一次或者是指定批次checkpoint 中offset, 并将其存放在TreeMap<KafkaTopicPartition,Long> 结的 restoredState 对象中
open
主要作用就是分配当前task消费的partitioin 的offset 位置
1. partition 分配策略:姑且认为是当前task的下标与 partition%numTask 相等就分配给当前task
2. offset 分配策略:有状态数据就使用状态数据的offset ; 没有就根据客户端指定的StartupMode作为消费起点
run
开始消费kafka 中数据, 通过 KafkaFetcher 完成 :
1. 启动了一个消费线程 KafkaConsumerThread 从kakfa 中拉取数据,将其存储到 Handover 的next 对象中
2. 循环从Handover 的next 中获取数据
3. 记录下当前的offset, 更新到subscribedPartitionStates 中去
createAndStartDiscoveryLoop
在run 方法中被调用, 开启了异步分区发现的线程discoveryLoopThread,会按照指定的时间间隔检查是否有新的分区(默认情况下不开启), 当发现有新的分区时会将其添加到unassignedPartitionsQueue中, 以便被KafkaConsumerThread 线程检测到
snapshotState
将记录的subscribedPartitionStates 中消费进度数据写入到 unionOffsetStates 状态中与临时对象pendingOffsetsToCommit中
notifyCheckpointComplete
提交offset 至kafka中:将pendingOffsetsToCommit 中记录当前批次checkpoint 的offset 数据提交到kafka 中
核心流程
- KakfaConsumerThread 线程不断从Kafka 中消费数据
- 消费的数据存储handover 中
- kafkaFetch 不断从handover 获取数据进行处理
其他流程
- initializeState、snapshotState 这两个方法是实现了CheckpointedFunction接口里面的对应方法,CheckpointedFunction 接口是Flink 提供的两个hook, 任务初始化执行initializeState,用于从状态中恢复数据, 优于open先执行, 用于其恢复offset数据;snapshotState 每次触发checkpoint 时执行,提供用户操作hook, 用于将offset 数据保存在状态中。
- notifyCheckpointComplete 是实现了CheckpointListener 接口中的方法, checkpoint 完成之后的回调方法, 提交状态中的offset数据至kafka中。
offset 提交
对于整个offset的提交至kafka中, 类似于两阶段的提交过程:
- 第一阶段:执行checkpoint 时即调用snapshotState方法, offset 保存到状态中
- 第二阶段:checkpoint 执行完成时回调notifyCheckpointComplete方法,offset 提交到kafka中
对于第一阶段失败任务直接重启,从最近一次checkpoint记录的位点开始消费,对于第二阶段提交offset至kafka如果失败,并不会导致任务重启,只是做了日志记录,因为提交offset到kafka成功与否并不会影响任务的执行。
启动时offset指定
- 如果是从checkpoint 恢复,那么就会忽略客户端所指定的startMode , 也就是checkpoint 状态数据优先
总结
本篇主要介绍了FlinkKafkaConsumer的核心设计流程与实现,同时介绍了与checkpoint流程结合完成offset的管理。