缘起:
Flink Table/ SQL 对changelog语义的解读:
Changelog | Insert | Update | Delete | |
---|---|---|---|---|
RowKind | Insert(I ) | Update After(U ) | Update Before(U-) | Delete(D-) |
Upsert | Upsert(I ) | Delete(D-) | ||
Retract | Add(I ) | Retract(D-) |
例如,在canal-json中,CanalJsonSerializationSchema#rowKind2String
代码语言:javascript复制private StringData rowKind2String(RowKind rowKind) {
switch (rowKind) {
case INSERT:
case UPDATE_AFTER:
return OP_INSERT;
case UPDATE_BEFORE:
case DELETE:
return OP_DELETE;
default:
throw new UnsupportedOperationException(
"Unsupported operation '" rowKind "' for row kind.");
}
}
分析:在我们建实时数仓中,一般采取ODS->DWD->DWS的分层架构,如下图,
这么一来,在ODS端接入的changelog下发后,U语义将会被分解为先Delete(D-)这条数据,然后再Insert(I )更新后的数据下发到kafka canal-json,那么下游如果要将这组更新原本是语义的数据存入。
迷思:例如MySQL,按照猜想的逻辑来说,在数据库端将会有一个空隙,就是先执行完Delete到执行Insert前,在极限情况下这时候来了一条查询此条数据的请求,数据库将会返回查询不到。显然,这种情况是错误的。
问题的本质:Update的语义本质上包含了两个基本操作:
- 先找到这条旧数据所在的存储位置(Compare old value);
- 再用新数据覆盖旧数据(Swap new value)。
这就需要存储介质执行Update操作时具备原子性。这种更新数据的原子性不是天然具备的,为此很多软件都付出了额外的代价,例如,在Java中有CAS(Compare And Swap):Unsafe#compareAndSwap可以完成原子性的去修改Java Heap上的一个值。
验证猜想:
一、验证思路:
程序1:source1->kafka ogg-json,sink1->kafka canal-json
程序2:source2->kafka canal-json,sink2->mysql
source1接收一条Update数据,经过flink分解为两条D-、I 下发到sink1;source2接收sink1的D-、I ,sink到MySQL。然后我们打开MySQL的执行操作日志,分析一下MySQL执行的Flink下发的SQL原语是什么。
二、测试过程:
source1 ddl:
代码语言:javascript复制CREATE TABLE source (
appl_seq STRING,
op_ts TIMESTAMP(3),
state STRING,
amount BIGINT,
PRIMARY KEY(appl_seq) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'format-oggcanal-source-01',
'properties.bootstrap.servers' = '...',
'properties.group.id' = 'test-01',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'ogg-json'
)
sink1 ddl:
代码语言:javascript复制CREATE TABLE sink (
appl_seq STRING,
op_ts TIMESTAMP(3),
state STRING,
amount BIGINT,
PRIMARY KEY(appl_seq) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'format-oggcanal-sink-01',
'properties.bootstrap.servers' = '...',
'value.format' = 'canal-json'
)
source2 ddl:
代码语言:javascript复制CREATE TABLE source (
appl_seq STRING,
op_ts TIMESTAMP(3),
state STRING,
amount BIGINT,
PRIMARY KEY(appl_seq) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'format-oggcanal-sink-01',
'properties.bootstrap.servers' = '...',
'properties.group.id' = 'test-01',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'canal-json'
)
sink2 ddl:
代码语言:javascript复制CREATE TABLE sink (
appl_seq STRING,
op_ts TIMESTAMP(3),
state STRING,
amount BIGINT,
PRIMARY KEY(appl_seq) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = '...',
'table-name' = 'test_canal',
'username' = '...',
'password' = '...'
)
source1 测试数据,INSERT、UPDATE、DELETE:
代码语言:javascript复制{
"after": {
"appl_seq": "1",
"op_ts": "2021-01-01 00:00:00.000000""state": "01",
"amount": 1
},
"before": {
},
"current_ts": "2021-01-01 00:00:00.000000
"op_ts": "2021-01-0100: 00: 00.000000",
"op_type": "I",
"pos": "1",
"primary_keys": [
"appl_seq"
],
"table": "ODS.APPL"
}
{
"after": {
"appl_seq": "1",
"op_ts": "2021-01-01 00:00:00.000000",
"state": "01",
"amount": 10000
},
"before": {
"appl_seq": "1",
"op_ts": "2021-01-01 00:00:00.000000",
"state": "01",
"amount": 1
},
"current_ts": "2021-01-01 00:00:00.000000",
"op_ts": "2021-01-01 00:00:00.000000",
"op_type": "U",
"pos": "1",
"primary_keys": ["appl_seq"],
"table": "ODS.APPL"
}
{
"after": {
},
"before": {
"appl_seq": "1",
"op_ts": "2021-01-01 00:00:00.000000",
"state": "01",
"amount": 10000
},
"current_ts": "2021-01-01 00:00:00.000000",
"op_ts": "2021-01-01 00:00:00.000000",
"op_type": "D",
"pos": "1",
"primary_keys": ["appl_seq"],
"table": "ODS.APPL"
}
sink1和sink2的canal-json数据:
代码语言:javascript复制INSERT:
{"data":[{"appl_seq":"1","op_ts":"2021-01-01 00:00:00","state":"01","amount":1}],"type":"INSERT"}
UPDATE:
{"data":[{"appl_seq":"1","op_ts":"2021-01-01 00:00:00","state":"01","amount":1}],"type":"DELETE"}
{"data":[{"appl_seq":"1","op_ts":"2021-01-01 00:00:00","state":"01","amount":10000}],"type":"INSERT"}
DELETE:
{"data":[{"appl_seq":"1","op_ts":"2021-01-01 00:00:00","state":"01","amount":10000}],"type":"DELETE"}
MySQL日志打开方式:
代码语言:javascript复制-- 查看日志是否开启,以及日志路径
SHOW VARIABLES LIKE "general_log%";
-- 开启日志路径
SET GLOBAL general_log = 'ON'
-- 设置日志路径
SET GLOBAL general_log_file = '/data/apps/job-flink/mysql_general_log.log';
在mysql_general_log.log文件中查找关键字定位SQL操作,结果如下:
代码语言:javascript复制-- I 插入:
INSERT INTO `test_canal`(`appl_seq`, `op_ts`, `state`, `amount`) VALUES ('1', '2021-01-01 00:00:00', '01', 1) ON DUPLICATE KEY UPDATE
`appl_seq`=VALUES(`appl_seq`), `op_ts`=VALUES(`op_ts`), `state`=VALUES(`state`), `amount`=VALUES(`amount`)
-- U 更新:
INSERT INTO `test_canal`(`appl_seq`, `op_ts`, `state`, `amount`) VALUES ('1', '2021-01-01 00:00:00', '01', 10000) ON DUPLICATE KEY UPDATE
`appl_seq`=VALUES(`appl_seq`), `op_ts`=VALUES(`op_ts`), `state`=VALUES(`state`), `amount`=VALUES(`amount`)
-- D- 删除:
DELETE FROM `test_canal` WHERE `appl_seq` = '1'
这里发现Flink用ON DUPLICATE KEY完成了Upsert操作,而不是像之前猜想的那样先执行Update拆解后的两条canal-json:D- -> I !
那么问题就变成了Flink是怎么做到能将本来我们认为会出现熵增(信息论中的信息损耗)的变换过程:Update -> D- 、 I 做到复原的:D- 、 I -> Update?
下一步就要从r源码,尝试解答我心中的迷思。
三、源码分析
jdbc-connector源码包:
Flink将逻辑包装成jdbc SQL执行必然会生成statement实例;在statement包中我们看到只有简单的三个类:
尝试找到谁去使用这些类,那么必然是距离封装SQL逻辑最近的线索:
没错,就是:JdbcBatchingOutputFormat.java,这个类里醒目的三个方法:
open:
代码语言:javascript复制/**
* Connects to the target database and initializes the prepared statement.
*
* @param taskNumber The number of the parallel instance.
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
this.scheduler =
Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
this.scheduledFuture =
this.scheduler.scheduleWithFixedDelay(
() -> {
synchronized (JdbcBatchingOutputFormat.this) {
if (!closed) {
try {
flush();
} catch (Exception e) {
flushException = e;
}
}
}
},
executionOptions.getBatchIntervalMs(),
executionOptions.getBatchIntervalMs(),
TimeUnit.MILLISECONDS);
}
}
分析:用scheduler线程池周期调度flush();线程池中只有一个线程,提供保序。
flush();中主要调用了attemptFlush,而attemptFlush中只有简单的一行代码,调用了JdbcBatchStatementExecutor#executeBatch,至于具体调用了哪个JdbcBatchStatementExecutor实现类的executeBatch,可以程序断点调式或者找到所有实现子类分析,在TableBufferReducedStatementExecutor实现类的注释:
代码语言:javascript复制/**
* Currently, this statement executor is only used for table/sql to buffer insert/update/delete
* events, and reduce them in buffer before submit to external database.
*/
显然,我们的程序应该是调用了这个类的executeBatch
其中,executeBatch:
代码语言:javascript复制@Override
public void executeBatch() throws SQLException {
for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : reduceBuffer.entrySet()) {
if (entry.getValue().f0) {
upsertExecutor.addToBatch(entry.getValue().f1);
} else {
// delete by key
deleteExecutor.addToBatch(entry.getKey());
}
}
upsertExecutor.executeBatch();
deleteExecutor.executeBatch();
reduceBuffer.clear();
}
分析:reduceBuffer是一个缓存线程池定时任务间隔之间到来的sink任务的逻辑,并且是一个Map
代码语言:javascript复制// the mapping is [KEY, < /-, VALUE>]
private final Map<RowData, Tuple2<Boolean, RowData>> reduceBuffer = new HashMap<>();
按照主键KEY进行reduce合并,所以D-会被I 覆盖掉,最终通过ON DUPLICATE KEY 执行I ,这也就是解释了为什么Flink下游可以看似无法反向复原的解析过程:Update -> D-、I ,并且解决了原子性问题!