一.Regular Joins(双流join)
这种 join 方式需要去保留两个流的状态,持续性地保留并且不会去做清除。两边的数据对于对方的流都是所有可见的,所以数据就需要持续性的存在state里面,那么 state 又不能存的过大,因此这个场景的只适合有界数据流或者结合ttl state配合使用。它的语法可以看一下,比较像离线批处理的 SQL
left join,right join,full join, inner join
代码语言:javascript复制CREATE TABLE NOC (
agent_id STRING,
codename STRING
)
WITH (
'connector' = 'kafka'
);
CREATE TABLE RealNames (
agent_id STRING,
name STRING
)
WITH (
'connector' = 'kafka'
);
SELECT
name,
codename
FROM NOC
INNER JOIN RealNames ON NOC.agent_id = RealNames.agent_id;
二.Interval Joins(区间 join)
加入了一个时间窗口的限定,要求在两个流做 join 的时候,其中一个流必须落在另一个流的时间戳的一定时间范围内,并且它们的 join key 相同才能够完成 join。加入了时间窗口的限定,就使得我们可以对超出时间范围的数据做一个清理,这样的话就不需要去保留全量的 State。Interval join 是同时支持 processing time 和 even time去定义时间的。如果使用的是 processing time,Flink 内部会使用系统时间去划分窗口,并且去做相关的 state 清理。如果使用 even time 就会利用 Watermark 的机制去划分窗口,并且做 State 清理
代码语言:javascript复制CREATE TABLE orders (
id INT,
order_time AS TIMESTAMPADD(DAY, CAST(FLOOR(RAND()*(1-5 1) 5)*(-1) AS INT), CURRENT_TIMESTAMP)
)
WITH (
'connector' = 'kafka'
);
CREATE TABLE shipments (
id INT,
order_id INT,
shipment_time AS TIMESTAMPADD(DAY, CAST(FLOOR(RAND()*(1-5 1)) AS INT), CURRENT_TIMESTAMP)
)
WITH (
'connector' = 'kafka'
);
SELECT
o.id AS order_id,
o.order_time,
s.shipment_time,
TIMESTAMPDIFF(DAY,o.order_time,s.shipment_time) AS day_diff
FROM orders o
JOIN shipments s ON o.id = s.order_id
WHERE
o.order_time BETWEEN s.shipment_time - INTERVAL '3' DAY AND s.shipment_time;
三.Temporal Table Join
Interval Joins 两个输入流都必须有时间下界,超过之后则不可访问。这对于很多 Join 维表的业务来说是不适用的,因为很多情况下维表并没有时间界限。针对这个问题,Flink 提供了 Temporal Table Join 来满足用户需求。Temporal Table Join 类似于 Hash Join,将输入分为 Build Table 和 Probe Table。前者一般是纬度表的 changelog,后者一般是业务数据流,典型情况下后者的数据量应该远大于前者。在 Temporal Table Join 中,Build Table 是一个基于 append-only 数据流的带时间版本的视图,所以又称为 Temporal Table。Temporal Table 要求定义一个主键和用于版本化的字段(通常就是 Event Time 时间字段),以反映记录在不同时间的内容
代码语言:javascript复制CREATE TEMPORARY TABLE currency_rates (
`currency_code` STRING,
`eur_rate` DECIMAL(6,4),
`rate_time` TIMESTAMP(3),
WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECONDS,
PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'currency_rates',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'raw',
'value.format' = 'json'
);
CREATE TEMPORARY TABLE transactions (
`id` STRING,
`currency_code` STRING,
`total` DECIMAL(10,2),
`transaction_time` TIMESTAMP(3),
WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECONDS
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'raw',
'key.fields' = 'id',
'value.format' = 'json',
'value.fields-include' = 'ALL'
);
SELECT
t.id,
t.total * c.eur_rate AS total_eur,
t.total,
c.currency_code,
t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code;
四.Lookup Joins(维表join)
JDBC 连接器可以用在时态表关联中作为一个可 lookup 的 source (又称为维表),当前只支持同步的查找模式。默认情况下,lookup cache 是未启用的, 你可以设置 lookup.cache.max-rows and lookup.cache.ttl 参数来启用。 lookup cache 的主要目的是用于提高时态表关联 JDBC 连接器的性能。默认情况下,lookup cache 不开启,所以所有请求都会发送到外部数据库。 当 lookup cache 被启用时,每个进程(即 TaskManager)将维护一个缓存。Flink 将优先查找缓存,只有当缓存未查找到时才向外部数据库发送请求,并使用返回的数据更新缓存。当缓存命中最大缓存行 lookup.cache.max-rows 或当行超过最大存活时间 lookup.cache.ttl 时,缓存中最老的行将被设置为已过期。缓存中的记录可能不是最新的,用户可以将 lookup.cache.ttl 设置为一个更小的值以获得更好的刷新数据,但这可能会增加发送到数据库的请求数。所以要做好吞吐量和正确性之间的平衡。
代码语言:javascript复制CREATE TABLE user_log (
user_id STRING
,item_id STRING
,category_id STRING
,behavior STRING
,ts TIMESTAMP(3)
,process_time as proctime()
, WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka'
,'topic' = 'user_behavior'
,'properties.bootstrap.servers' = 'localhost:9092'
,'properties.group.id' = 'user_log'
,'scan.startup.mode' = 'group-offsets'
,'format' = 'json'
);
CREATE TEMPORARY TABLE mysql_behavior_conf (
id int
,code STRING
,map_val STRING
,update_time TIMESTAMP(3)
-- ,primary key (id) not enforced
-- ,WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'jdbc'
,'url' = 'jdbc:mysql://localhost:3306/venn'
,'table-name' = 'lookup_join_config'
,'username' = 'root'
,'password' = '123456'
,'lookup.cache.max-rows' = '1000'
,'lookup.cache.ttl' = '1 minute' -- 缓存时间,即使一直在访问也会删除
);
SELECT a.user_id, a.item_id, a.category_id, a.behavior, c.map_val, a.ts
FROM user_log a
left join mysql_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.behavior = c.code
where a.behavior is not null;
五.Lateral Table Join (横向join)
lateral Table Join基本相当于SQL Server的CROSS APPLY,功能上要强于CROSS APPLY
1.表和表关联
代码语言:javascript复制CREATE TABLE People (
id INT,
city STRING,
state STRING,
arrival_time TIMESTAMP(3),
WATERMARK FOR arrival_time AS arrival_time - INTERVAL '1' MINUTE
) WITH (
'connector' = 'kafka'
);
CREATE TEMPORARY VIEW CurrentPopulation AS
SELECT
city,
state,
COUNT(*) as population
FROM (
SELECT
city,
state,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY arrival_time DESC) AS rownum
FROM People
)
WHERE rownum = 1
GROUP BY city, state;
SELECT
state,
city,
population
FROM
(SELECT DISTINCT state FROM CurrentPopulation) States,
LATERAL (
SELECT city, population
FROM CurrentPopulation
WHERE state = States.state
ORDER BY population DESC
LIMIT 2
);
2.函数表关联
代码语言:javascript复制SELECT
data, name, age
FROM
userTab,
LATERAL TABLE(splitTVF(data)) AS T(name, age)