HashJoin性能优化: RuntimeFilter

2022-10-27 10:57:30 浏览数 (2)

1.什么是RuntimeFilter

HashJoin是关联查询中最重要的算子,对于计算密集型应用,关联查询的性能瓶颈主要在于HashJoin算子Probe阶段的Hash查找和Data Shuffle上。RuntimeFilter是用于运行时优化HashJoin性能的一种常见方法,RuntimeFilter对于INNER JOIN, Right Join, Semi Join等都有显著的性能提升效果。目前RuntimeFilter技术已经在很多数据库中得以应用,比如SnowFlake(BloomJoins), Impala,EMR Spark,Apache doris,Starrocks,PolarDB-X等。

分析型数据库中星型模型是常见的建模方法。比较有代表性的测试集就是SSB(Star Schema Benchmark)。星型模型主要分为事实表和维度表,事实表一般是大表,比如SSB测试集中的lineorder表,维度表一般为小表,比如SSB测试集中的customer,date等。这里的大表小表准确来说是指表的Distinct记录数。RuntimeFilter对于这类星型的数据模型下的复杂查询有非常大的提升作用。

HashJoin实现上是通过内表(一般为较小的表)构建Hash Table,然后遍历外表(一般为大表)数据查找Hash Table,根据不同的Join类型输出匹配的结果。HashJoin中Hash table probe算子一般是最为耗时的过程,另外一个耗时的过程就是数据的Shuffle过程,一般它们也就是性能的瓶颈点。那么对HashJoin性能优化最朴素的思想是减少probe遍历的数据量或者减少数据移动的大小从而提升性能。RuntimeFilter的原理正是将probe操作push down到外表的Scan算子,使用更快的非精确查找算法(MINMAX,BloomFilter)或者更快的精确查找算法(HashSet)来提前过滤数据,从而提升整个查询的性能。RuntimeFilter与SemiJoin相比,不同在于RuntimeFilter需要下推到Scan,实现更少的数据移动。类似的优化思路比如mysql中的pickup join,通过内表过滤后的结果集,通过索引计算左表的值,如此便不需要计算hash,这对于内表非常小的场景效果最佳。

2. 如何生成RuntimeFilter

RuntimeFilter是在优化器的CBO阶段之后插入物理计划中的。具体来说,首先需要从根节点遍历整个查询计划树,找到HashJoinNode节点,然后找到该HashJoinNode的等值表达式,将Join右孩子节点的条件下推到左孩子节点是Scan的节点上。举例来说对于下面的Join查询,首先生成如下的物理计划,遍历第一个Join节点,将 t3.a加入到RuntimeFilters中,编号为RuntimeFilterId=0,然后继续遍历左子树,遇到Join节点,将t2.a加入到RuntimeFilters中,编号为RuntimeFilterId=1。然后遇到ScanNode节点时,将RuntimeFilters上的所有RuntimeFilter下推到Scan t1节点上。当然,在论文https://dl.acm.org/doi/pdf/10.1145/3318464.3389769也提到将RuntimeFilter纳入到CBO的代价估算中可能获得更优的执行计划。

图1:生成带有RuntimeFilter的物理计划图1:生成带有RuntimeFilter的物理计划

2.1 RuntimeFilter不能下推的情况

并非所有的RuntimeFilter都可以下推,比如对于下面的查询

select count(*) from store t1 left outer join store t2 on t1.s_store_sk = t2.s_store_sk where coalesce(t2.s_store_sk 100, 100) in (select ifnull(100, s_store_sk) from store);

按照上面的思路生成的查询计划是这样的

代码语言:javascript复制
Query plan:
    |   4:HASH JOIN
    |   |  join op: LEFT SEMI JOIN (BROADCAST)
    |   |  equal join conjunct: coalesce(`t2`.`s_store_sk`   100, 100) = ifnull(100, `s_store_sk`)
    |   |  runtime filters: RF000[in] <- ifnull(100, `s_store_sk`)
    |   |----7:EXCHANGE
    |   3:HASH JOIN
    |   |  join op: LEFT OUTER JOIN
    |   |  equal join conjunct: `t1`.`s_store_sk` = `t2`.`s_store_sk`
    |   |----1:OlapScanNode
    |   |       TABLE: store
    |   |       runtime filters: RF000[in] -> coalesce(`t2`.`s_store_sk`   100, 100)
    |   0:OlapScanNode
    |      TABLE: store
  1. 假设store中有N条记录,并且为非零。
  2. 生成RF000由于ifnull函数的特点,所有的RF000中是一个100
  3. 将RF000下推到coalesce中,只有当s_store_sk为null时,coalesce的结果才是100,所以plannode1的结果是null,但由于LEFT OUTER JOIN的特点,如果内表不存在则需要补NULL,所以PlanNode3的结果为N个NULL
  4. 在LEFT SEMI JOIN中由于coalesce计算结果都是NULL,所以函数结果都是100,而ifnull的结果也是100,所以最后count(*)的结果是N。

但实际上正确的结果应该是0.所以在以上这种情况下,RuntimeFilter就不能下推。

除此之外,RuntimeFilter对于Left Outer Join,Anti Join,Full Outer Join等也不适用,这是因为外连接没有找到对应的数据时需要补NULL,而不能直接过滤掉。目前关于RuntimeFilter的限制可以参考https://doris.apache.org/docs/advanced/join-optimization/runtime-filter/

2.2 BloomFilterBits

BloomFilter 中衡量 BloomFilter 的误判率称为 false positives。 false positives 可以通过以下公式计算得到 epsilon = (1-(1-1/m)^{kn})^{k} approx(1-e^{-kn/m})^{k}

所以对于给定的误判率,我们可以估算 BloomFilter 位数组的大小,m 即 BloomFilter 的位数组大小。 m=frac {-kn}{ln(1-epsilon^{1/k})}

k表示BloomFilter中hash函数的个数。这里非常重要的一点就是n,需要通过估算 number distinct value 值得到。

所以精确的统计信息有助于更高效的实现BloomFilter。

2.3 Filter顺序对性能的影响

多个Filter在实现上有两种方式。

一种是短路计算,short-circuit 的优势在于逐步减少向下传递的数据量,所以过滤效果越好的过滤条件需要提前。所以依赖于优化器利用CBO提前准备好过滤顺序。这种方法在过滤效果较好时性能更好。

另一种是避免利用Filter的提前过滤,将多个Filter的计算移动到最后的AND或OR中,充分利用位运算加速最后的与或运算。这种方式稳定性较好,不受过滤效果的影响。

动态调整RuntimeFilter的顺序对提高查询性能会有一定的帮助。

3.Runtime是怎样执行的

3.1 RuntimeFilter的构建

RuntimeFilter一般是在HashJoin构建HashTable的时候构建出来的,主流的RuntimeFilterType有三种,In,BloomFilter,MinMax。在我们的实现中,可以根据HashTable读取的数据多少来决定使用哪种Type。一般来说In更适合数据量较少的场景。BloomFilter适合数据量较多的场景,MinMax通常可以与BloomFilter一起使用,在实现上统一抽象为一个RuntimeFilter。

分布式数据库中,HashJoin最常使用的有两种分布方式,BoardCastJoin和RedistirbuteJoin。如下图3中的Join(BC)代表的是BoardCastJoin,表示将小表广播到所有的节点。Join(Re)代表的是RedistributeJoin,表示根据JoinKey对大小表数据进行重分布。从实现上来看,BoardCastJoin更适合小表比较少的场景下,在每个Backend上,BoardCastJoin得到的RuntimeFilter都是完整的,可以直接下推到外表。而RedistributeJoin得到的RuntimeFilter需要先Merge后再Shuffle后,才能下推到外表。RedistributeJoin由于右表构建的RuntimeFilter是不完整的,如果将不完整的RuntimeFilter下推到Scan层,则有可能会漏掉部分数据,所以需要将所有RuntimeFilter合并后才可以下推到外表。多个RuntimeFilter可以通过And的逻辑运算同时过滤数据。

3.2 RuntimeFilter的关键数据结构

RuntimeFilterBuilder: 用于构建RuntimeFilter并更新RuntimeFilterMgr中对应的RuntimeFilterMerger中的RuntimeFilter。

RuntimeFilterMgr:Backend上全局唯一的数据结构,维护所有queryid到RuntimeFilterMerger的映射关系

RuntimeFilterMerger:维护当前执行query的所有RuntimeFilter结构,每个FilterId对应唯一的RuntimeFilterMergerEntity,而RuntimeFilterKey唯一对应FilterId, 多个RuntimeFilterKey可以对应同一个FilterId。FilterId是在优化器生成计划时生成的。

假设有两张表,每张表有两个col,t1(col1, col2), t2(col1, col2)。以查询select count(*) from t1 inner join t2 on t1.col1 = t2.col1 and t1.col1 = t2.col2为例,此时有两个RuntimeFilterKey , FilterId, RuntimeFilter的对应关系则如下所示。

图2:RuntimeFilter关键数据结构图2:RuntimeFilter关键数据结构

3.3 RuntimeFilter的执行流程

  1. 在Pipeline执行框架中分为prepare和execute两个阶段,Prepare阶段会进行整个pipeline的构建,在Prepare阶段根据已经生成的执行计划构建RuntimeKey,FilterId,RuntimeFilterMerger所有的关键数据结构。
  2. 在HashJoin build内表时通过RuntimeFilterBuilder构建RuntimeFilter,在Build结束时将RuntimeFilter加入对应的RuntimeFilterMergerEntity中。如果RuntimeFilter需要Merge,则每个Backend将当前构建的RuntimeFilter发送到Merge节点,Merge节点接收到所有的RuntimeFilter合并后shuffle到指定的Backend节点.Backend节点接收到最终的RuntimeFilter后将自身标记为Ready。RuntimeFilter的merge以及dispatch过程都是异步执行的,不会阻塞整个Pipeline的执行。
  3. 在Pipeline执行时,Filter operator中的checkRuntimeFilter函数会反复检测是否当前key对应的RuntimeFilter是Ready的,如果是Ready的则获取后通过RuntimeFilter对每一个需要Filter的Column执行过滤操作,如果没有Ready则不过滤数据。
  4. Probe结束后将所有的RuntimeFilter回收资源,清理RuntimeFilterMgr中当前queryId对应的内存资源。

图3:RuntimeFilter执行流程图图3:RuntimeFilter执行流程图

4. RuntimeFilter有哪些实现方式

在分布式数据库的并行执行框架中有两种方式,一种是算子间并行,一种是算子内并行。Pipeline是实现算子间并行的最好的方式,Pipeline在很多领域都有提及,比如CPU的pipeline流水线。Pipeline的核心在于调度,每个算子只做一件事,但不同算子可以并行执行,在一个Pipeline内部不必等待上一个算子完全执行完毕。算子内并行是将算子逻辑拆分为多个子算子,子算子执行同样的工作但处理不同的数据。在MPP执行引擎的并行执行框架中两者是同时存在的,从而实现最大的并行度,获得最大的性能收益。

在ClickHouse的Pipeline的实现中,Scan算子要处理的数据块Granule是在Pipelien生成阶段确定的。而RuntimeFilter是在PipeLine执行时才可以确定,所以在Pipeline上实现RuntimeFilter的简单的 方式是通过插入Filter operator(Function)算子的方式,具体实现的是通过Internal Function函数实现的。Internal Function在执行时会通过RuntimeFilterMgr获取对应的RuntimeFilter,如果可以拿到则使用RuntimeFilter来过滤数据,如果没有则直接返回当前数据,由于runtimefilter一般执行足够快,一般在10ms-100ms内,所以这里不会成为瓶颈。

实际上ClickHouse中可以通过主键过滤和PreWhere来提前过滤数据,从而减少IO,这就需要等待RuntimeFilter生成。当RuntimeFilter对应的key是主键索引或者二级索引时,等待RuntimeFilter可以获得更优的性能。

Pipeline中实现的RuntimeFilter具有以下优势:

  1. RuntimeFilter for pipeline 有更少的RPC,一方面这是因为Pipeline中内表只有一个Instance执行,与算子内并行方案相比不需要向每个Fragment Instance发送RuntimeFilter,对于一个RuntimeFilter只需要一次(Backend number -1)次RPC,而像其它大多数方案需要(Backend_number - 1)* Parallel_degree 次RPC。
  2. 不同的RuntimeFilter之间完全独立,通过FilterId 隔离,更少的加锁阻塞,多个RuntimeFilter Consumer可以消费同一个Runtime Filter Producer。
  3. 在算子内并行时RuntimeFilter在Scan时一般需要等待RuntimeFilter生成,在一些并发较大的场景下有可能阻塞查询执行,Pipeline中RuntimeFilter是不会阻塞整个Pipeline执行的,一旦Ready,则立刻生效。在具有主键索引或者二级索引的情况下,也需要等待RuntimeFilter的生成,以获得更好的性能。
图4: 算子内并行与算子间并行图4: 算子内并行与算子间并行

5. RuntimeFilter为什么能提升性能

  1. RuntimeFilter将Join Porbe phase时的计算下推到Scan层,一方面更快的模糊过滤方法使得Probe处理的数据量显著减少,另一方面对于外表需要Shuffle时减少了执行的network overhead。
  2. RuntimeFilter下推的Key如果是主键时,在Scan时可以进一步裁减数据,减少IO

但是RuntimeFilter也是双刃剑,

  1. RuntimeFilter如果没有过滤效果或者过滤效果非常差时,显然对于整个计划来说是一种冗余,此时RuntimeFilter对性能来说并不会有提升的效果。

     目前在Pipeline上实现的效果如下,下图是 RuntimeFilter开启前后SSB Join 100GB测试集的性能对比结果。

图4: RuntimeFilter优化前后性能对比图4: RuntimeFilter优化前后性能对比

6. RuntimeFilter的进一步优化

  1. ClickHouse的Prewhere优化和主键索引可以减少扫描的数据量从而提升scan性能,如何更好的利用prewhere的优化,可以进一步IO性能从而优化系统性能
  2. 更精确的统计数据帮助生成精确的RuntimeFilter信息,防止RuntimeFilter对性能的反向优化。

0 人点赞