本文主要是讲merge操作的四个案例。
1.数据去重
实际上,线上业务很多时候数据源在上报数据的时候,由于各种原因可能会重复上报数据,这就会导致数据重复,使用merge函数可以避免插入重复的数据。具体操作方法如下:
sql
代码语言:javascript复制MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
scala
代码语言:javascript复制deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
注意:需要写入delta lake表的dataset自身要完成去重的 操作。我们可以通过merge语义区实现新数据和delta lake表中已有的数据之间去重,但是如果新的dataset内部有重复数据,重复数据依然会被插入。因此在写入新数据之前一定要完成去重操作。
如果数据确定可能会在某些时间周期内重复,那么可以对目标表进行按照时间分区,这样就可以在merge操作的时候指定时间范围。
sql
代码语言:javascript复制MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
scala
代码语言:javascript复制deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
这种利用分区进行谓词下推,可以大幅减少数据加载的量,进而提升速度。此外,对于Structured Streaming可以使用insert-only merge操作来实现连续不断的去重操作。主要有以下场景:
a.对于一些streaming操作,可以在foreachBatch操作来实现连续不断的将数据写入delta lake表,同时具有去重的功能。
b.对于另一些流查询,你可以连续不断的从delta lake表中读取去重的数据。可以这么做的原因是insert-only merge操作仅仅会追加新的数据到delta lake表中。
2.渐变纬度数据
另一个常见的操作是SCD Type 2,它维护对维表中每个key所做的所有变更的历史记录。此类操作需要更新现有行以将key的先前值标记为旧值,并插入新行作为最新值。给定具有更新的源表和具有维度数据的目标表,可以使用merge表达SCD type 2。
维护客户地址历史记录以及每个地址的有效日期范围,是本小节常见的示例操作。当需要更新客户的地址时,必须将先前的地址标记为不是当前地址,更新其有效日期范围,然后将新地址添加为当前地址。scala的表达方法如下:
代码语言:javascript复制val customersTable: DeltaTable = ... // table with schema (customerId, address, current, effectiveDate, endDate)
val updatesDF: DataFrame = ... // DataFrame with schema (customerId, address, effectiveDate)
// Rows to INSERT new addresses of existing customers
val newAddressesToInsert = updatesDF
.as("updates")
.join(customersTable.toDF.as("customers"), "customerid")
.where("customers.current = true AND updates.address <> customers.address")
// Stage the update by unioning two sets of rows
// 1. Rows that will be inserted in the whenNotMatched clause
// 2. Rows that will either update the current addresses of existing customers or insert the new addresses of new customers
val stagedUpdates = newAddressesToInsert
.selectExpr("NULL as mergeKey", "updates.*") // Rows for 1.
.union(
updatesDF.selectExpr("updates.customerId as mergeKey", "*") // Rows for 2.
)
// Apply SCD Type 2 operation using merge
customersTable
.as("customers")
.merge(
stagedUpdates.as("staged_updates"),
"customers.customerId = mergeKey")
.whenMatched("customers.current = true AND customers.address <> staged_updates.address")
.updateExpr(Map( // Set current to false and endDate to source's effective date.
"current" -> "false",
"endDate" -> "staged_updates.effectiveDate"))
.whenNotMatched()
.insertExpr(Map(
"customerid" -> "staged_updates.customerId",
"address" -> "staged_updates.address",
"current" -> "true",
"effectiveDate" -> "staged_updates.effectiveDate", // Set current to true along with the new address and its effective date.
"endDate" -> "null"))
.execute()
3.cdc操作
和scd类似,另一个常见的案例是变化数据捕获,也即是常说的CDC,简单来说就是同步外部数据库的变更数据到deta lake。换句话说,对于外部数据库的 update,delete,insert操作,要同时作用于delta 表。这种情况,也可以使用merge操作来实现。
代码语言:javascript复制val deltaTable: DeltaTable = ... // DeltaTable with schema (key, value)// DataFrame with changes having following columns// - key: key of the change// - time: time of change for ordering between changes (can replaced by other ordering id)// - newValue: updated or inserted value if key was not deleted// - deleted: true if the key was deleted, false if the key was inserted or updatedval changesDF: DataFrame = ...// Find the latest change for each key based on the timestamp// Note: For nested structs, max on struct is computed as// max on first struct field, if equal fall back to second fields, and so on.val latestChangeForEachKey = changesDF .selectExpr("key", "struct(time, newValue, deleted) as otherCols" ) .groupBy("key") .agg(max("otherCols").as("latest")) .selectExpr("key", "latest.*")deltaTable.as("t") .merge( latestChangeForEachKey.as("s"), "s.key = t.key") .whenMatched("s.deleted = true") .delete() .whenMatched() .updateExpr(Map("key" -> "s.key", "value" -> "s.newValue")) .whenNotMatched("s.deleted = false") .insertExpr(Map("key" -> "s.key", "value" -> "s.newValue")) .execute()
4. 整合foreachBatch
实际上在使用delta lake的时候可以结合foreachBatch和merge,来实现复杂的流查询到delta lake表的upsert功能。总共有以下几个场景:
a.在update模式下写流聚合结果到delta lake。这种情况,实际上比Complete模式更加高效。
代码语言:javascript复制import io.delta.tables.*
val deltaTable = DeltaTable.forPath(spark, "/data/aggregates")
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.format("delta")
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
b.将数据库变更操作同步到delta lake。该场景就是写变化数据到delta lake,也即是本问第三小节。
c.流数据以去重的方式写入delta lake。这个就是本文第一小节。
注意:
确保foreachBatch中的merge语句是幂等的,因为重新启动流查询可以将对该操作对同一批数据重复执行。
当在foreachBatch中使用merge时,流查询的输入数据速率可能会上报为在源处生成数据的实际速率的若干倍数。这是因为merge多次读取输入数据,导致输入指标倍增。如果这是瓶颈,则可以在合并之前缓存批处理DataFrame,然后在合并之后取消缓存。