【Flink】第四篇:【迷思】对update语义拆解D-、I+后造成update原子性丢失

2022-03-31 10:59:05 浏览数 (1)

缘起:

Flink Table/ SQL 对changelog语义的解读:

Changelog

Insert

Update

Delete

RowKind

Insert(I )

Update After(U )

Update Before(U-)

Delete(D-)

Upsert

Upsert(I )

Delete(D-)

Retract

Add(I )

Retract(D-)

例如,在canal-json中,CanalJsonSerializationSchema#rowKind2String

代码语言:javascript复制
private StringData rowKind2String(RowKind rowKind) {
    switch (rowKind) {
        case INSERT:
        case UPDATE_AFTER:
            return OP_INSERT;
        case UPDATE_BEFORE:
        case DELETE:
            return OP_DELETE;
        default:
            throw new UnsupportedOperationException(
                    "Unsupported operation '"   rowKind   "' for row kind.");
    }
}

分析:在我们建实时数仓中,一般采取ODS->DWD->DWS的分层架构,如下图,

这么一来,在ODS端接入的changelog下发后,U语义将会被分解为先Delete(D-)这条数据,然后再Insert(I )更新后的数据下发到kafka canal-json,那么下游如果要将这组更新原本是语义的数据存入。

迷思:例如MySQL,按照猜想的逻辑来说,在数据库端将会有一个空隙,就是先执行完Delete到执行Insert前,在极限情况下这时候来了一条查询此条数据的请求,数据库将会返回查询不到。显然,这种情况是错误的。

问题的本质:Update的语义本质上包含了两个基本操作:

  1. 先找到这条旧数据所在的存储位置(Compare old value);
  2. 再用新数据覆盖旧数据(Swap new value)。

这就需要存储介质执行Update操作时具备原子性。这种更新数据的原子性不是天然具备的,为此很多软件都付出了额外的代价,例如,在Java中有CAS(Compare And Swap):Unsafe#compareAndSwap可以完成原子性的去修改Java Heap上的一个值。

验证猜想:

一、验证思路:

程序1:source1->kafka ogg-json,sink1->kafka canal-json

程序2:source2->kafka canal-json,sink2->mysql

source1接收一条Update数据,经过flink分解为两条D-、I 下发到sink1;source2接收sink1的D-、I ,sink到MySQL。然后我们打开MySQL的执行操作日志,分析一下MySQL执行的Flink下发的SQL原语是什么。

二、测试过程:

source1 ddl:

代码语言:javascript复制
CREATE TABLE source (
    appl_seq STRING,
    op_ts TIMESTAMP(3),
    state STRING,
    amount BIGINT,
    PRIMARY KEY(appl_seq) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'format-oggcanal-source-01',
    'properties.bootstrap.servers' = '...',
    'properties.group.id' = 'test-01',
    'scan.startup.mode' = 'latest-offset',
    'value.format' = 'ogg-json'
)

sink1 ddl:

代码语言:javascript复制
CREATE TABLE sink (
    appl_seq STRING,
    op_ts TIMESTAMP(3),
    state STRING,
    amount BIGINT,
    PRIMARY KEY(appl_seq) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'format-oggcanal-sink-01',
    'properties.bootstrap.servers' = '...',
    'value.format' = 'canal-json'
)

source2 ddl:

代码语言:javascript复制
CREATE TABLE source (
    appl_seq STRING,
    op_ts TIMESTAMP(3),
    state STRING,
    amount BIGINT,
    PRIMARY KEY(appl_seq) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'format-oggcanal-sink-01',
    'properties.bootstrap.servers' = '...',
    'properties.group.id' = 'test-01',
    'scan.startup.mode' = 'latest-offset',
    'value.format' = 'canal-json'
)

sink2 ddl:

代码语言:javascript复制
CREATE TABLE sink (
    appl_seq STRING,
    op_ts TIMESTAMP(3),
    state STRING,
    amount BIGINT,
    PRIMARY KEY(appl_seq) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = '...',
    'table-name' = 'test_canal',
    'username' = '...',
    'password' = '...'
)

source1 测试数据,INSERT、UPDATE、DELETE:

代码语言:javascript复制
{
  "after": {
    "appl_seq": "1",
    "op_ts": "2021-01-01 00:00:00.000000""state": "01",
    "amount": 1
  },
  "before": {
  },
  "current_ts": "2021-01-01 00:00:00.000000
    "op_ts": "2021-01-0100: 00: 00.000000",
    "op_type": "I",
    "pos": "1",
    "primary_keys": [
        "appl_seq"
    ],
    "table": "ODS.APPL"
}

{
  "after": {
    "appl_seq": "1",
    "op_ts": "2021-01-01 00:00:00.000000",
    "state": "01",
    "amount": 10000
  },
  "before": {
    "appl_seq": "1",
    "op_ts": "2021-01-01 00:00:00.000000",
    "state": "01",
    "amount": 1
  },
  "current_ts": "2021-01-01 00:00:00.000000",
  "op_ts": "2021-01-01 00:00:00.000000",
  "op_type": "U",
  "pos": "1",
  "primary_keys": ["appl_seq"],
  "table": "ODS.APPL"
}

{
  "after": {
  },
  "before": {
    "appl_seq": "1",
    "op_ts": "2021-01-01 00:00:00.000000",
    "state": "01",
    "amount": 10000
  },
  "current_ts": "2021-01-01 00:00:00.000000",
  "op_ts": "2021-01-01 00:00:00.000000",
  "op_type": "D",
  "pos": "1",
  "primary_keys": ["appl_seq"],
  "table": "ODS.APPL"
}

sink1和sink2的canal-json数据:

代码语言:javascript复制
INSERT:
{"data":[{"appl_seq":"1","op_ts":"2021-01-01 00:00:00","state":"01","amount":1}],"type":"INSERT"}
UPDATE:
{"data":[{"appl_seq":"1","op_ts":"2021-01-01 00:00:00","state":"01","amount":1}],"type":"DELETE"}
{"data":[{"appl_seq":"1","op_ts":"2021-01-01 00:00:00","state":"01","amount":10000}],"type":"INSERT"}
DELETE:
{"data":[{"appl_seq":"1","op_ts":"2021-01-01 00:00:00","state":"01","amount":10000}],"type":"DELETE"}

MySQL日志打开方式:

代码语言:javascript复制
-- 查看日志是否开启,以及日志路径
SHOW VARIABLES LIKE "general_log%";

-- 开启日志路径
SET GLOBAL general_log = 'ON'
-- 设置日志路径
SET GLOBAL general_log_file = '/data/apps/job-flink/mysql_general_log.log';

在mysql_general_log.log文件中查找关键字定位SQL操作,结果如下:

代码语言:javascript复制
-- I  插入:
INSERT INTO `test_canal`(`appl_seq`, `op_ts`, `state`, `amount`) VALUES ('1', '2021-01-01 00:00:00', '01', 1) ON DUPLICATE KEY UPDATE
`appl_seq`=VALUES(`appl_seq`), `op_ts`=VALUES(`op_ts`), `state`=VALUES(`state`), `amount`=VALUES(`amount`)

-- U 更新:
INSERT INTO `test_canal`(`appl_seq`, `op_ts`, `state`, `amount`) VALUES ('1', '2021-01-01 00:00:00', '01', 10000) ON DUPLICATE KEY UPDATE
`appl_seq`=VALUES(`appl_seq`), `op_ts`=VALUES(`op_ts`), `state`=VALUES(`state`), `amount`=VALUES(`amount`)

-- D- 删除:
DELETE FROM `test_canal` WHERE `appl_seq` = '1'

这里发现Flink用ON DUPLICATE KEY完成了Upsert操作,而不是像之前猜想的那样先执行Update拆解后的两条canal-json:D- -> I !

那么问题就变成了Flink是怎么做到能将本来我们认为会出现熵增(信息论中的信息损耗)的变换过程:Update -> D- 、 I 做到复原的:D- 、 I -> Update?

下一步就要从r源码,尝试解答我心中的迷思。

三、源码分析

jdbc-connector源码包:

Flink将逻辑包装成jdbc SQL执行必然会生成statement实例;在statement包中我们看到只有简单的三个类:

尝试找到谁去使用这些类,那么必然是距离封装SQL逻辑最近的线索:

没错,就是:JdbcBatchingOutputFormat.java,这个类里醒目的三个方法:

open:

代码语言:javascript复制
/**
 * Connects to the target database and initializes the prepared statement.
 *
 * @param taskNumber The number of the parallel instance.
 */
@Override
public void open(int taskNumber, int numTasks) throws IOException {
    super.open(taskNumber, numTasks);
    jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
    if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
        this.scheduler =
                Executors.newScheduledThreadPool(
                        1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
        this.scheduledFuture =
                this.scheduler.scheduleWithFixedDelay(
                        () -> {
                            synchronized (JdbcBatchingOutputFormat.this) {
                                if (!closed) {
                                    try {
                                        flush();
                                    } catch (Exception e) {
                                        flushException = e;
                                    }
                                }
                            }
                        },
                        executionOptions.getBatchIntervalMs(),
                        executionOptions.getBatchIntervalMs(),
                        TimeUnit.MILLISECONDS);
    }
}

分析:用scheduler线程池周期调度flush();线程池中只有一个线程,提供保序。

flush();中主要调用了attemptFlush,而attemptFlush中只有简单的一行代码,调用了JdbcBatchStatementExecutor#executeBatch,至于具体调用了哪个JdbcBatchStatementExecutor实现类的executeBatch,可以程序断点调式或者找到所有实现子类分析,在TableBufferReducedStatementExecutor实现类的注释:

代码语言:javascript复制
/**
 * Currently, this statement executor is only used for table/sql to buffer insert/update/delete
 * events, and reduce them in buffer before submit to external database.
 */

显然,我们的程序应该是调用了这个类的executeBatch

其中,executeBatch:

代码语言:javascript复制
@Override
public void executeBatch() throws SQLException {
    for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : reduceBuffer.entrySet()) {
        if (entry.getValue().f0) {
            upsertExecutor.addToBatch(entry.getValue().f1);
        } else {
            // delete by key
            deleteExecutor.addToBatch(entry.getKey());
        }
    }
    upsertExecutor.executeBatch();
    deleteExecutor.executeBatch();
    reduceBuffer.clear();
}

分析:reduceBuffer是一个缓存线程池定时任务间隔之间到来的sink任务的逻辑,并且是一个Map

代码语言:javascript复制
// the mapping is [KEY, < /-, VALUE>]
private final Map<RowData, Tuple2<Boolean, RowData>> reduceBuffer = new HashMap<>();

按照主键KEY进行reduce合并,所以D-会被I 覆盖掉,最终通过ON DUPLICATE KEY 执行I ,这也就是解释了为什么Flink下游可以看似无法反向复原的解析过程:Update -> D-、I ,并且解决了原子性问题!

0 人点赞