Flink SQL中的Join操作

2022-02-24 13:44:52 浏览数 (1)

Flink SQL 支持对动态表进行复杂灵活的连接操作。 有几种不同类型的连接来解决可能需要的各种语义查询。

默认情况下,连接顺序未优化。 表按照在 FROM 子句中指定的顺序连接。 您可以调整连接查询的性能,首先列出更新频率最低的表,最后列出更新频率最高的表。 确保以不产生交叉连接(笛卡尔积)的顺序指定表,交叉连接不受支持并且会导致查询失败。

常规Joins

常规联接是最通用的联接类型,其中任何新记录或对联接任一侧的更改都是可见的,并且会影响整个联接结果。 例如,如果左侧有一条新记录,则当产品 id 相等时,它将与右侧的所有先前和将来的记录连接。

代码语言:javascript复制
SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id

对于流式查询,常规连接的语法是最灵活的,并且允许任何类型的更新(插入、更新、删除)输入表。 但是,此操作具有重要的操作含义:它需要将连接输入的双方永远保持在 Flink 状态。 因此,计算查询结果所需的状态可能会无限增长,具体取决于所有输入表的不同输入行数和中间连接结果。 您可以提供具有适当状态生存时间 (TTL) 的查询配置,以防止状态大小过大。 请注意,这可能会影响查询结果的正确性。 有关详细信息,请参阅查询配置。

对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于聚合类型和不同分组键的数量。 请提供具有有效保留间隔的查询配置,以防止状态大小过大。 有关详细信息,请参阅查询配置。

INNER Equi-JOIN

返回受连接条件限制的简单笛卡尔积。 目前,仅支持等值连接,即具有至少一个具有等式谓词的合取条件的连接。 不支持任意交叉或 theta 连接。

代码语言:javascript复制
SELECT *
FROM Orders
INNER JOIN Product
ON Orders.product_id = Product.id

OUTER Equi-JOIN

返回合格笛卡尔积中的所有行(即所有通过其连接条件的组合行),加上外部表中连接条件与其他表的任何行都不匹配的每一行的一个副本。 Flink 支持 LEFT、RIGHT 和 FULL 外连接。 目前,仅支持等值连接,即,与至少一个具有等式谓词的合取条件连接。 不支持任意交叉或 theta 连接。

代码语言:javascript复制
SELECT *
FROM Orders
LEFT JOIN Product
ON Orders.product_id = Product.id

SELECT *
FROM Orders
RIGHT JOIN Product
ON Orders.product_id = Product.id

SELECT *
FROM Orders
FULL OUTER JOIN Product
ON Orders.product_id = Product.id

区间 Joins

返回受连接条件和时间约束限制的简单笛卡尔积。 间隔连接至少需要一个等连接谓词和一个限制双方时间的连接条件。 两个适当的范围谓词可以定义这样的条件(<、<=、>=、>)、BETWEEN 谓词或比较两个输入的相同类型的时间属性(即处理时间或事件时间)的单个等式谓词 表。

例如,如果订单在收到订单四小时后发货,则此查询将连接所有订单及其相应的发货。

代码语言:javascript复制
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time

以下谓词是有效区间连接条件的示例:

  • ltime = rtime
  • ltime >= rtime AND ltime < rtime INTERVAL '10' MINUTE
  • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime INTERVAL '5' SECOND

对于流式查询,与常规联接相比,间隔联接仅支持具有时间属性的仅追加表。 由于时间属性是准单调递增的,因此 Flink 可以从其状态中移除旧值而不影响结果的正确性。

基于时间的JOIN

基于事件时间的JOIN

基于时间的JOIN允许对版本化表进行连接。 这意味着可以通过更改元数据来丰富表并在某个时间点检索其值。

时间连接采用任意表(左输入/探测站点)并将每一行与版本化表(右输入/构建端)中相应行的相关版本相关联。 Flink 使用 SQL:2011 标准的 FOR SYSTEM_TIME AS OF 的 SQL 语法来执行这个操作。 时间连接的语法如下;

代码语言:javascript复制
SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1

使用事件时间属性(即行时间属性),可以检索过去某个时间点的键值。 这允许在一个共同的时间点连接两个表。 版本化表将存储自上次水印以来的所有版本(按时间标识)。

例如,假设我们有一个订单表,每个订单都有不同货币的价格。 为了将该表正确规范化为单一货币,例如美元,每个订单都需要与下订单时的正确货币兑换率相连接。

代码语言:javascript复制
-- Create a table of orders. This is a standard
-- append-only dynamic table.
CREATE TABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    currency    STRING,
    order_time  TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time
) WITH (/* ... */);

-- Define a versioned table of currency rates. 
-- This could be from a change-data-capture
-- such as Debezium, a compacted Kafka topic, or any other
-- way of defining a versioned table. 
CREATE TABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
    WATERMARK FOR update_time AS update_time,
    PRIMARY KEY(currency) NOT ENFORCED
) WITH (
   'connector' = 'kafka',
   'value.format' = 'debezium-json',
   /* ... */
);

SELECT 
     order_id,
     price,
     currency,
     conversion_rate,
     order_time,
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;

order_id  price  currency  conversion_rate  order_time
========  =====  ========  ===============  =========
o_001     11.11  EUR       1.14             12:00:00
o_002     12.51  EUR       1.10             12:06:00

注意:event-time temporal join 是由左右两边的 watermark 触发的; 请确保连接的两边都正确设置了水印。 注意:事件时间时态联接需要时态联接条件的等价条件中包含的主键,例如表currency_rates的主键currency_rates.currency要约束在条件orders.currency = currency_rates.currency中。

与常规连接相比,尽管构建端发生了变化,但之前的时态表结果不会受到影响。 与区间连接相比,时态表连接没有定义记录将在其中连接的时间窗口。 来自探测端的记录总是在时间属性指定的时间与构建端的版本连接。 因此,构建端的行可能是任意旧的。 随着时间的推移,不再需要的记录版本(对于给定的主键)将从状态中删除。

基于处理时间的JOIN

处理时间时态表连接使用处理时间属性将行与外部版本化表中键的最新版本相关联。

根据定义,使用处理时间属性,连接将始终返回给定键的最新值。 可以将查找表视为一个简单的 HashMap<K, V> ,它存储来自构建端的所有记录。 这种连接的强大之处在于,当无法将表具体化为 Flink 中的动态表时,它允许 Flink 直接针对外部系统工作。

以下处理时时态表联接示例显示了应与表 LatestRates 联接的仅追加表订单。 LatestRates 是一个以最新速率具体化的维度表(例如 HBase 表)。 在时间 10:15、10:30、10:52,LatestRates 的内容如下所示:

代码语言:javascript复制
10:15> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1

10:30> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1

10:52> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        116     <==== changed from 114 to 116
Yen           1

LastestRates 在时间 10:15 和 10:30 的内容是相等的。 欧元汇率在 10:52 从 114 变为 116。

Orders 是一个仅附加表,表示给定金额和给定货币的付款。 例如,在 10:15,有一个金额为 2 欧元的订单。

代码语言:javascript复制
SELECT * FROM Orders;

amount currency
====== =========
     2 Euro             <== arrived at time 10:15
     1 US Dollar        <== arrived at time 10:30
     2 Euro             <== arrived at time 10:52

鉴于这些表格,我们想计算所有转换为通用货币的订单。

代码语言:javascript复制
amount currency     rate   amount*rate
====== ========= ======= ============
     2 Euro          114          228    <== arrived at time 10:15
     1 US Dollar     102          102    <== arrived at time 10:30
     2 Euro          116          232    <== arrived at time 10:52

借助基于时间的表连接,我们可以在 SQL 中将这样的查询表示为:

代码语言:javascript复制
SELECT
  o.amount, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

探测端的每条记录都将与构建端表的当前版本连接。 在我们的示例中,查询使用处理时间概念,因此在执行操作时,新附加的订单将始终与最新版本的 LatestRates 连接。

结果对于处理时间是不确定的。 处理时时间连接最常用于通过外部表(即维度表)丰富流。

与常规连接相比,尽管构建端发生了变化,但之前的时态表结果不会受到影响。 与区间连接相比,临时表连接没有定义记录连接的时间窗口,即旧行不存储在状态中。

Lookup Join

查找连接通常用于使用从外部系统查询的数据来丰富表。 联接要求一个表具有处理时间属性,而另一个表由查找源连接器支持。

查找连接使用上面的处理时间连接语法和由查找源连接器支持的正确表。

以下示例显示了指定查找联接的语法。

代码语言:javascript复制
-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
  'table-name' = 'customers'
);

-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

在上面的示例中,Orders 表中包含来自 MySQL 数据库中的 Customers 表的数据。 带有后续处理时间属性的 FOR SYSTEM_TIME AS OF 子句确保 Orders 表的每一行在连接运算符处理 Orders 行的时间点与那些匹配连接谓词的客户行连接。 它还可以防止在将来更新连接的客户行时更新连接结果。 查找连接还需要一个强制相等连接谓词,在上面的示例中为 o.customer_id = c.id。

数组的扩张

为给定数组中的每个元素返回一个新行。 尚不支持取消嵌套 WITH ORDINALITY。

代码语言:javascript复制
SELECT order_id, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

表函数

将表与表函数的结果连接起来。 左(外)表的每一行都与表函数的相应调用产生的所有行相连接。 用户定义的表函数必须在使用前注册。

INNER JOIN

如果其表函数调用返回空结果,则删除左(外)表的行。

代码语言:javascript复制
SELECT order_id, res
FROM Orders,
LATERAL TABLE(table_func(order_id)) t(res)

LEFT OUTER JOIN

如果表函数调用返回空结果,则保留相应的外部行,并用空值填充结果。 目前,针对横向表的左外连接需要 ON 子句中的 TRUE 文字。

代码语言:javascript复制
SELECT order_id, res
FROM Orders
LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res)
  ON TRUE

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://cloud.tencent.com/developer/article/1946397

0 人点赞