使用布隆过滤器加速Upsert操作是我根据实际场景需求添加的功能。目前官方版本应该不支持(0.4.0)。
Delta Lake 现阶段判定一条数据是不是已经存在是比较暴力的,直接做一次全表join(如果有分区会好点)。这样可以得到待更新记录所在的文件路径。在特定场景,这个开销会非常大。上次和一位朋友聊天,他对这个点也"耿耿于怀"。 尤其是做MySQL表同步的时候,通常是没有分区的,这就意味着每次都有一次全表扫描。
一个直观的解决方案是,能不能给每个文件加个布隆过滤器。布隆过滤器的缺点是当他判定包含时,不一定是真的包含,有误判率。所以bitmap之类的,也会是好选择,可以精准的判定一条记录是不是在一个文件里。不过,因为Delta Lake判定记录所在的文件路径后,还会将新来的数据和这些过滤出来的文件的数据做一次anti join,从而将包含了待更新记录的文件里其他非待更新记录过滤出来,重新写成新文件,避免和新进来的数据产生重复。所以,布隆过滤器 “如果判定为没有,那么一定没有。如果判定为有,则可能有”的特点非常适合我们这个场景,因为我们的目标仅仅是为了缩小候选文件集,即使误判了,只要不漏,我们在做anti join的时候,也无非就是多处理几个文件。
确定了使用布隆过滤器之后,那么接下来的问题就是,布隆过滤器的数据该存在哪? Hudi的做法是存在Parquet文件自身的footer里。显然,这并不适合Delta,因为Delta并不会直接操作Parquet文件,而是通过Spark Dataframe来完成的。所以,我们只能单独存储在一个子目录。理论上,布隆过滤器属于索引,那么应该和_delta_log 目录平级,所以这个时候可以添加一个_bf_index目录比较合适,考虑到未来还要添加其他索引类型,包括对读的优化,所以可以弄一个二层目录类似_index/_bf_index 可能会更好。
现在存储位置确认了,接着我们该思考应该以什么形式保存索引,为了保持简单性,我们继续以parquet为索引的存储格式。恩,parquet真的是个万能的存储格式。接着,我们马上又棉铃一个选择,是给每一个parquet数据文件生成一个parquet索引文件,还是为所有的parquet数据文件生成一个parquet索引文件? parquet索引文件的字段可以为如下:
- fileName (被索引的文件路径)
- bf_index (布隆过滤器的二进制或者字符串表示形式)
为了更好的性能,考虑到单表parquet data文件数量不会非常多,所以我们采用使用一个parquet索引文件策略。
虽然是采用一个parquet索引文件,但实际上会是两个。因为会涉及到索引里数据的删除。如果不想做删除(或者定期删除),那么就需要每次都过滤索引里还有效的文件,从而得到其不容过滤器。
现在,我们只要在新增、删除文件的时候嵌入生成索引的代码即可,然后在查找待更新记录所在文件的时候使用索引即可。但这里有个问题,Delta Lake是支持版本的和事务的,尤其是其采用乐观锁,意味着能不能成功取决于最后的commit操作,如果操作失败,我们就需要清理掉新增的索引数据,如果操作成功,我们也要清理掉老版本的索引数据。本来,如果能将索引数据(其实和实际数据没有什么区别),都能纳入到AddFile/Remove File的抽象体系里,并且能够放到_delta_log里统一管理,那就非常完美了,因为可以享受一大波红利,比如清理的工作Delta原生的机制就可以做的很好,而且还能使得索引数据也能享受版本的红利。但是当前,Delta并没有提供一个不修改其源码就能让我们嵌入一个新的非数据类文件的文件到DeltaLog里,如果我们希望有这些功能,一个直观的做法是我们针对索引文件单独开发一个_delta_index_log,但是如何保证事务性又会是一个难题,毕竟你维护了两套元数据体系。
所以,在不修改Delta 源码的基础上,我们只能保留有限个版本的索引信息,因为需要在commit之后再进行rename操作,这个时候两个动作作为一个整体不是原子性的,如果在进行commit之后,在rename之前发生了系统crash,那么再启动后,数据就不是完整的了,丢失了索引数据,而且我们需要有个机制去保证我们索引数据清理和业务数据的清理是一致的,而目前来看是比较困难的。对于事务,我们也只能退而求其次,如果事务失败,我们会清理掉当前新生成的索引数据。如果事务成功,我们会通过Rename的方式将其转化为正式的索引目录。为了避免事务冲突,我们需要将索引目录名称带上版本号。幸好,对于版本而言,我们更可能是会在读的时候回溯版本。而在写的的时候,我们都会基于最新的版本去写入Delta Lake,这就意味着我们只要找到当前版本的索引即可。
当然,我们也可以保留N个版本的索引数据,缺点是,因为我们没有办法嵌入清理的代码到Delta里,从而难以让索引数据和业务数据的清理周期(保留的版本数)保持一致。
使用布隆过滤器加速的场景主要有:
- 每个批次进来的数据不太大
- 表数据比较大,文件数量较多
这样可以避免一次全表join。但是如果数据更新的非常随机,导致每次基本会touch到所有的文件,那么做anti-join的时候成本就会比较高。但聊胜于无,毕竟可以避免一次全表Join,对于表数据比较大的情况,还是非常划算的。