【Flink】第二篇:维表Join之版本表

2022-03-31 10:58:24 浏览数 (1)

在数仓ETL中,事实表和维度表在维度码值之上做join、或者若干表之间进行join做数据打宽十分常见。数仓中的join本质上是以空间换时间,范式降低,以便后续olap数据分析之用。但是看似简单的join操作,一旦在Flink的流式语义中实现,做到实时Join就不是一件轻松的事了!

在Flink中,以1.12为蓝本(后续文章不做特殊说明均默认1.12),包含三类常见场景下的Join:

  1. Regular Join(常规双流Join)
  2. Interval Join(区间Join)
  3. Temporal Join(时态表Join):Lookup DB Join、版本表Join

以kafka-json事实表关联upsert-kafka版本表的Demo入手,对版本表Join的水位线机制作简要分析。

首先,澄清一下相关的概念:

动态表(Dynamic Table) / 时态表(Temporal Table):与表示批处理数据的静态表不同,动态表是随时间变化的。

Flink的Table API和SQL对这种随时间动态变化的表和批处理中的相对静止的表抽象出了一致的语义,也就是说Flink是以流式的动态表去理解相对静止的静态表,这也是Flink从它的根本的流式世界观去理解其他事物的一种体现。Flink的流批一体的语义使得我们可以像查询批处理中的静态表一样查询动态表。查询动态表将生成一个连续查询。一个连续查询永远不会终止,结果也会生成一个动态表。查询不断更新其结果表,以动态反映其输入表上的更改。这和高级关系数据库系统中的物化视图的概念十分类似:

物化视图(Materialized Views):物化视图被定义为一条SQL查询,就像常规虚拟视图一样。但是,不同的是,物化视图缓存查询结果,因此在访问视图时不需要对查询进行计算。缓存的一个常见难题是缓存过期。当这个查询视图的基本表被修改时,物化视图的某些值将会过期,此时便需要根据基本表的变化来对缓存的视图数据进行维护,以符合视图查询的SQL查询逻辑。

时态表又分为普通表和版本表:

版本:数据表中的每行数据都有其生命周期,例如,新插入一行数据,这行数据以此为生命周期的开始,更新这行数据,将开启这行数据下一个版本,生命周期也将从新计算,直至删除,生命周期结束。

版本表:如果时态表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,来自数据库的 changelog 可以定义成版本表。

普通表:如果时态表中的记录仅仅可以追踪并和它的最新版本,这种表我们称之为普通表,来自数据库 或 HBase 的表可以定义成普通表。

例如,有一张产品价格版本表,产品的价格是随时间实时变化的:

代码语言:javascript复制
(changelog kind)  update_time  product_id product_name price
================= ===========  ========== ============ ===== 
 (INSERT)         00:01:00     p_001      scooter      11.11
 (INSERT)         00:02:00     p_002      basketball   23.11
-(UPDATE_BEFORE)  12:00:00     p_001      scooter      11.11
 (UPDATE_AFTER)   12:00:00     p_001      scooter      12.99
-(UPDATE_BEFORE)  12:00:00     p_002      basketball   23.11 
 (UPDATE_AFTER)   12:00:00     p_002      basketball   19.99
-(DELETE)         18:00:00     p_001      scooter      12.99

产品 scooter 在时间点 00:01:00的初始价格是 11.11, 在 12:00:00 的时候涨价到了 12.99, 在 18:00:00 的时候这条产品价格记录被删除。

那么,在 10:00:00 对应的表版本如下所示:

代码语言:javascript复制
update_time  product_id product_name price
===========  ========== ============ ===== 
00:01:00     p_001      scooter      11.11
00:02:00     p_002      basketball   23.11

13:00:00 对应的版本,表的内容如下所示:

代码语言:javascript复制
update_time  product_id product_name price
===========  ========== ============ ===== 
12:00:00     p_001      scooter      12.99
12:00:00     p_002      basketball   19.99

可以看到,维护版本表需要两个关键信息:主键和时间,那么,本次Demo的版本表选择upsert-kafka定义如下:

代码语言:javascript复制
CREATE TABLE right_upsert (
    appl_seq STRING,
    uid STRING,
    state STRING,
    amount DECIMAL(16,2),
    op_ts TIMESTAMP(3),
    PRIMARY KEY(appl_seq) NOT ENFORCED, --主键
    WATERMARK FOR op_ts AS op_ts - INTERVAL '3' SECOND --水位线
) WITH (
    'connector' = 'upsert-kafka',
    'properties.bootstrap.servers' = 'XXX:9092,XXX:9092,XXX:9092',
    'topic' = 'left_kafka-join-03',
    'key.format' = 'json',
    'value.format' = 'json',
    'properties.group.id' = 'test-01'
)

为了简便,事实表(左表)也用同样的schema,并选用kafka-json:

代码语言:javascript复制
CREATE TABLE left_kafka (
    appl_seq STRING,
    uid STRING,
    state STRING,
    amount DECIMAL(16,2),
    op_ts TIMESTAMP(3),
    WATERMARK FOR op_ts AS op_ts - INTERVAL '3' SECOND
) WITH (
    'connector' = 'kafka',
    'properties.bootstrap.servers' = 'xxx:9092,xxx:9092,xxx:9092',
    'topic' = 'left_kafka-join-01',
    'value.format' = 'json',
    'properties.group.id' = 'test-01',
    'scan.startup.mode' = 'latest-offset'
)

另外,为了避免分区水位线倾斜问题,暂时我们将上游两张kafka分区以及程序并行度均设为1(后续专题分析此问题),并在程序中开启检查点,如下:

代码语言:javascript复制
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
streamEnv.setParallelism(1);
streamEnv.enableCheckpointing(1000L * 1 * 1, CheckpointingMode.EXACTLY_ONCE);
streamEnv.getCheckpointConfig().setCheckpointTimeout(1000L * 20);
streamEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000L * 1);
streamEnv.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.minutes(1)));

然后执行Left Join SQL:

代码语言:javascript复制
SELECT t1.appl_seq, cast(t1.op_ts as string), t2.appl_seq, cast(t2.op_ts as string)
FROM left_kafka AS t1 LEFT JOIN right_upsert FOR SYSTEM_TIME AS OF t1.op_ts AS t2
ON t1.appl_seq = t2.appl_seq

至此,我们便可以写测试数据发送到kafka进行Demo测试及验证。

测试结论:

对于相同主键的左右流数据,假设左右流时间属性为T左,T右,

  • 不设置水位线延迟的情况下,
  1. 左流选择join的是右流中T右1<=T左<=T右2,如果T右2还没来到,会缓存,以等待;
  2. 已经被触发写出的右流数据会被删除缓存,之后左流乱序迟到数据会被缓存,等待下一次触发Join时写出,但此前对应的右流相应版本的缓存已被删除所以NULL填补;
  • 设置水位线的情况下,
  1. 左流数据触发写出的时机是水位线延迟之后的版本到来时写出,未到来时缓存;
  2. 对于右流来说,同样,触发右流可以被join的时机是右流水位线延迟之后的右流版本被左流触发join
  3. 其他性质同不设置水位线延迟一样

0 人点赞