Delta 实现Partial Merge

2022-07-21 13:44:59 浏览数 (1)

Partial Merge是我根据实际场景发明的一个词汇。目前官方版本应该不支持。

Partial Merge 定义

所谓Partial Merge 是指,假设我们有a,b,c,d 四个字段,现在来了一千条记录,每条记录只会包含这四个字段的一个或者多个,并且,每条记录包含的字段都不一样。

应用场景描述

通常,因为准备写入到表数据都需要统一的schema,如果我们使用原有的表的Schema,那么每条记录不包含的字段会被设置为null,但是这样,我们就不知道,究竟该字段是没有还是该字段最新值为Null,所以展开成完整的schema不是一个好的选择。因此,我们通常使用Json来解决这个问题,Json里只包含实际变更的字段。

假设我们原表的数据长这样:

代码语言:javascript复制
{"id":9,"content":"Spark好的语言1","label":0.0}
{"id":10,"content":"MLSQL是一个好的语言7","label":0.0}
{"id":12,"content":"MLSQL是一个好的语言7","label":0.0}

包含id, content, label三个字段。新写入的数据可以设计成这个样子:

代码语言:javascript复制
{"id":9,"value":"{"label":100.0}"}
{"id":10,"value":"{"content":"天才"}"}

id是主键,也可以是多个字段组成联合组件。 value是json字符串,里面包含了变更的字段的值。所以逻辑上是,找到id为9的记录,将label更新为100,找到id为10的记录,将content字段更新为”天才“。如果表里面没有记录10,则默认所有字段为null,然后再将content字段转修改为"天才"。

需要这个场景的,比如典型的用户画像,我们可能会有无数用户画像的字段更新程序将数据写入到Kafka,然后消费Kafka得到就是类似上面的id,value格式的数据,我们需要能够实时更新进delta表,最后算法的同学可以利用这些最新的数据计算新的属性。

如何基于Delta实现

将delta表和新进来的数据做full outer join,这样就能将两边数据衔接起来,衔接的结果可以组成类似这样的记录:

代码语言:javascript复制
case class FullOuterJoinRow(
left: Row, 
right: Row, 
leftPresent: Boolean, 
rightPresent: Boolean)

full outer join的特性是left join 和right join的结合体,这样可以区分出新来的数据是直接新增的还是说原来的表里面已经有的。可能的情况如下:

代码语言:javascript复制
row match {
  case FullOuterJoinRow(left, right, true, true) =>
    // upsert
    merge(left, right)

  case FullOuterJoinRow(left, right, true, false) =>
    // no change
    left
  case FullOuterJoinRow(left, right, false, true) =>
    // append
    merge(left, right)
}

如果left,right都存在,则表示需要right需要merge进left,也就是做update, 如果left存在,right不存在,则说明数据没有变化,如果left不存在,right存在,则说明数据是新增的。

当然了,我们一般不需要对全表做这种join,因为这样数据集会很大,我们只要找到那些包含了已经有新进来数老版本的的文件,然后把这些文件和进来的数据做full outer join即可。如何快速的过滤出哪些文件包含了新进来的数据(根据联合主键来判定),可参考我前一篇文章

祝威廉:Delta Lake如何自己实现更新操作加速

最后,这一篇文章和前一篇文章的实现都放在了

Upsert支持

欢迎来看。。。。。(然后代码现在还没优化,比较难看。。。大家勿喷)

0 人点赞