【Flink】第十六篇:源码角度分析 sink 端的数据一致性

2022-03-31 11:09:27 浏览数 (1)

【Flink】第四篇:【迷思】对update语义拆解D-、I 后造成update原子性丢失

【Flink】第五篇:checkpoint【1】

【Flink】第六篇:记一次Flink状态(State Size)增大不收敛,最终引起OOM问题排查

【Flink】第八篇:Flink 内存管理

【Flink】第九篇:Flink SQL 性能优化实战

【Flink】第十篇:join 之 regular join

【Flink】第十三篇:JVM思维导图

【Flink】第十四篇:LSM-Tree一般性总结

【Flink】第十五篇:Redis Connector 数据保序思考

接上篇对于Redis Connector的保序的思考后,在自研Connector中,关于数据一致性还有一个重要的点需要考虑,即如何保证数据投递的语义:

  • 精确一次
  • 至少一次
  • 至多一次

当然不需要全实现,而是根据场景及外部存储的特点,实现满足需求的数据投递语义能保证数据一致性的目的即可。

对于以上三种语义,精确一次是最严格的,所以精进一法,则万法皆通。

对于精确一次的语义可以先参考我之前的两篇文章有理论上的储备,接着我将再从现有的典型connector着手即可窥探其中奥义。

先将重要的结论贴出来:

flink的数据一致性需要source和sink满足以下两个条件:

  1. 你的 sources 必须是可回溯重放的,并且
  2. 你的 sinks 必须是事务性的(或幂等的)

体现在kafka connector中即,

  • source需要实现checkpointfunction,以便在failover时重放回溯上一个成功的checkpoint记录的offset以后的消息。
  • sink也需要实现checkpointfunction,以便在failover时从上一个成功的checkpoint记录的offset以后的消息开始初始化消费状态,当然由于kafka支持事务,所以实现2PC这个接口即可,因为这个接口也继承了checkpointfunction。

这是支持事务的真正端到端精确一次的kafka connector,如果2PC期间失败了,事务是没有提交成功的,所以也等于是没有被下游消费的。

但是还有很多外部存储系统不支持事务的特性,或者说支持事务特性的话效率太低,那么,这个时候就需要将外部存储系统设计成幂等的了!(我们平时使用kafka connector也不会使用它的精确一次,因为在上一个CK成功之前,下游是不会收到这个CK期间处理的数据的,这种为了数据一致性带来的性能损耗是难以忍受的,本质上这也是一种酸碱平衡的问题)

那么,众多的connector是怎么做的呢?我将从jdbc connector的源码中找到答案!

JDBC Connector

从META-INF.services这个SPI文件找到JdbcDynamicTableFactory。

因为我们需要看的是sink端,所以找到createDynamicTableSink方法,取其返回值JdbcDynamicTableSink即为DynamicTableSink的实现类:

再找到提供运行时执行类的方法getSinkRuntimeProvider:

他通过建造者模式build了一个JdbcBatchingOutputFormat:

再看JdbcBatchingOutputFormat的UML结构,这个类即为Flink的RichSink富函数的实现类,所以我们一定能从中找到相关的open、close等打开关闭和执行数据库存数的逻辑:

我们先来看看open的逻辑:

主要逻辑是:启动了一个调度线程池,corePoolSize为1。线程池中调度的线程执行的逻辑是线程安全的执行JdbcBatchingOutputFormat这个当前实例的flush操作。

注意,在flush过程中的异常会设置给以下这个成员变量:

代码语言:javascript复制
private transient volatile Exception flushException;

然后再来看看flush()的逻辑:

  1. 先进行一次checkFlushException()
  2. 然后执行尝试flush操作(attemptFlush),并行有最大尝试次数
  3. 如果超过最大尝试次数或者与数据库连接获取失败都会抛IOException

而尝试刷写方法attemptFlush很简单,就是调用代理的一个Executor实例执行executeBatch:

说完了在调度线程池中定时执行的flush逻辑,再来看看它又是如何受理每条RowData数据到来的逻辑的:

  1. 也是先进行一次checkFlushException()
  2. 然后执行addToBatch方法,并将batch的数据量计数(注意这个方法是线程安全的,所以不需要使用线程安全的JUC中的原子变量进行计数)
  3. 如果超过设置的刷写阈值,执行flush

而addToBatch的逻辑也很简单,同样也是调用代理的一个Executor实例执行addToBatch:

综合以上分析,得出以下结论:

  • 开启调度线程池定时flush数据到数据库,当batchsize达到刷写阈值也会去flush。
  • 具体执行数据库持久化的操作都是用代理的一个executor去执行
  • 并且在整个过程中首先检查是否有异常,一旦遇到异常,抛出RuntimeException结束掉当前线程。(很重要,在后面再解释为什么这么做!)

再来看看这个执行者executor做了些什么,JdbcBatchStatementExecutor?

可以看到,这个解扣定义的功能是实现:

代码语言:javascript复制
Executes the given JDBC statement in batch for the accumulated records

即将batch提交给远端数据库。

具体逻辑拿实现类TableBufferReducedStatementExecutor一探究竟:

首先,重点注意这个reduceBuffer,它存储的是经过压缩的changelog类型的有主键的RowData数据:

代码语言:javascript复制
the mapping is [KEY, < /-, VALUE>]

为什么说是经过压缩呢,Map这种数据结构天然是将key的value压缩,取最后存入的那一条的,所以,当对于一个主键的连续修改只会执行这个batch里最后的那一条修改操作,这样也降低了对数据库的压力。

那么,他是如何将RowData的RowKind进行映射的呢?

可以看到,在添加到batch的时候已经进行了映射,而且这居然是一种upsert的映射,可我们的关系型数据库并不是这样的操作啊,会出错的吧?!

我们继续往下看,

在执行batch中的DML的逻辑中用两个代理类分别执行upsert和delete操作。

找打这两个代理类并行分析后发现,是根据具体的数据库方言,将更新操作包装成了幂等的数据库操作!

例如,MySQLDialect

可以看到ON DUPLICATE KEY UPDATE,这个正是Mysql实现幂等的一种或方案!

简单地说,ON DUPLICATE KEY UPDATE 可以达到以下目的:

向数据库中插入一条记录:若该数据的主键值已在表中存在,则执行更新操作, 即UPDATE 后面的操作。否则插入一条新的记录。

综合以上对Flink jdbc Connector的分析,

1. 根据不同类型数据库,用具体数据库的SQL方言实现幂等方案

2. 在持久化DML到远端数据库过程中有任何异常,在符合设定阈值情况下立即抛出RuntimeException结束掉当前线程

那么为什么要有2.呢?

一旦持久化数据到远端数据库发生异常,如果我们不结束掉当前线程,那么checkpoint就会顺利执行下去(前提是我们不选择实现2PC的逻辑)。结果就是此次CK期间处理的数据在持久化过程中出现了问题,CK还顺利完成了,造成最终的数据不一致!

但是,如果我们妥善的处理这种持久化异常,并且将其暴露出反映给Flink的CK机制,此次CK失败后,就会从上一次成功的CK重新消费并重新持久化这次失败CK期间处理的数据,结果就是数据被再次持久化。并且由于下游是幂等的,所以无论上一次失败的CK是否提交了完整或者是部分数据,结果就是最终都会达到数据一致!

总结

0 人点赞