数据湖(Data lake)是一种将数据以原始格式存储在同一个系统或存储库的设计思想。它可以实现在一份数据之上进行多种数据计算,以避免为了多种计算场景而导致数据冗余存储和搬迁成本。以数据湖架构建立数据分析平台能让企业以较低的成本实现原始数据的集中式管理,提供统一口径和灵活的分析能力。当前,比较主流的开源数据湖格式有Iceberg,Hudi和DeltaLake。
不管是数据存储还是计算引擎,都是为了用户有更好的使用体验。在大数据分析领域,交互式查询是一个重要的方向。单次查询TB甚至PB级别的数据已经非常常见。如何为用户提供秒级、压秒级的交互式查询一直是大数据分析领域的挑战。在实际生产中,需要扫描全部数据的情况是不多见的。大部分数据分析一般都是带有过滤条件。在提升查询性能的诸多手段中,如何尽可能地降低数据扫描量一直是行之有效的方法,屡试不爽。例如我们熟知的分区裁剪就是减少数据扫描的关键技术。
为了实现更少的数据扫描,需要计算引擎和存储引擎的共同协作。计算引擎需要实现支持谓词下推,而存储引擎需要能够根据下推的过滤条件尽可能的跳过无关数据或文件。上面我们提到了开源数据湖格式Iceberg,Hudi和DeltaLake等都提供了文件级别的统计信息,例如Min/Max/BloomFilter,支持快速过滤无关数据和文件。这个技术叫做DataSkipping。但是单单只有Dataskipping技术,往往在实际生产中不能产生多少积极作用。本文将介绍腾讯如何在Apache Iceberg上通过数据组织优化来加速大规模数据分析。
本文将分成以下四个章节内容:
- 查询分析中的IO效率
- 数据组织优化技术
- Iceberg上的技术实现剖析
- 性能评测
1. 查询分析中的IO效率
Iceberg自上而下提供了三层数据过滤策略,分别是:
- 分区裁剪
- 文件过滤
- RowGroup过滤
分区剪裁:对于分区表来说,优化器可以自动从where条件中根据分区键直接提取出需要访问的分区,从而避免扫描所有的分区,降低了IO请求。分区剪裁可以细分为静态分区剪裁和动态分区剪裁,其中静态分区剪裁发生在SQL语句编译阶段,而动态分区剪裁则发生在SQL语句执行阶段。例如Spark 3.0就提供了动态动态分区技术DPP,想要了解的可以查看Spark官方文档。Iceberg支持分区表和隐式分区技术,所以很自然地支持分区裁剪优化。
文件过滤:上文我们提到Iceberg提供了文件级别的统计信息,例如Min/Max等。我们可以用where语句中的过滤条件去判断目标数据是否存在于文件中。例如上面的查询SQL的过滤条件有first_name和last_name两个字段,通过判断文件中字段first_name和last_name的upper_bounds和lower_bounds,判断文件是否包含符合Tho和Frank前缀的字符串。如果不在upper_bounds和lower_bounds区间内,则跳过这个文件,否则读取这个文件。
RowGroup过滤:对于Parquet这类列式存储文件格式,它也会有文件级别的统计信息,例如Min/Max/BloomFiter等等,利用这些信息可以快速跳过无关的RowGroup,减少文件内的数据扫描。关于Parquet的RowGroup过滤在此不多做介绍,有兴趣的可以查看相关资料。
至此经过这三层数据过滤,最终可能需要扫描的数据会大大降低。不过,这种过滤效果也不是每次都生效,因为他会数据的整体分布有很大关系。当数据均匀分布在所有文件中时,你会发现每个文件的upper_bounds和lower_bounds的range会很大,甚至可能是column的range。这时,再用where中的column过滤条件来判断文件会失效,最终的结果是我们还是需要扫描所有文件或者分区的所有文件。为了提升文件dataskipping效果,我们常常会对列进行排序,这样对这个列来说,在整个文件中是单调的,文件级别的upper_bounds和lower_bounds的range重合度会降低,这样dataskipping的效果会非常好。但这种排序方法也只能对一个列的效果是好的,如果参与排序的列很多则会大大降低效果。所以我们需要找到一种方法来解决多列数据的组织优化,来提升dataskipping效果。下面我们介绍一种称为Z-Order的空间曲线填充算法以及它的应用场景。
2. 数据组织优化技术
空间填充曲线除了数学重要性之外,有个非常重要的应用特性,就是降维。它可以将多维空间问题降维到低维或者一维空间问题。常见的有: Z阶曲线(Z-order curve)、皮亚诺曲线(Peano curve)、希尔伯特曲线(Hilbert curve)等。Z-Oder算法其实已经在各种数据分析产品中都有应用,例如MySQL,Amazon Aurora等。
Z-Order曲线的一个典型应用就是Geohash地理位置编码,它是一种分级的数据结构,把空间划分成网格。二维空间搜索范围通过Z-Order算法转换之后,可以变换为一维空间的搜索问题。他有一个重要的特性:一个点附近的hash字符串总有公共前缀,并且公共前缀越长,两个点的距离越近。这给了我们启发,能否使用Z-Order算法来解决上面发现的数据分布问题?事实上是可以的,并且工业界也是这么做的。我们直接对多维列进行排序没法有效进行dataskipping,但是我们可以将多维列值通过Z-Order算法转换为一维值(Z-Index或者Z-Id),这样我们再利用Z-Index进行有效的数据排序或者数据聚合。这样处理后,多维列根据Z-Order值相近的数据会分布到同一个文件中,从各个维度的值分布来说,从数据整体来看也会呈现近似单调的分布。这时,文件的upper_bounds和lower_bounds的重合度会有效降低,dataskipping技术又可以重新生效。
腾讯Iceberg实现了基于Z-Order算法的数据组织优化,并提供了原生的SQL支持,方便用户进行表级别的数据OPTIMIZE。
下面我们来看看腾讯Iceberg是如何实现了基于Z-Order的OPTIMIZE功能。
3. Iceberg上的技术实现剖析
腾讯Iceberg支持两种策略的OPTIMIZE,即全量(all)和增量(incremental)。通常,首次需要对表数据进行全量OPTIMIZE,随着新数据的写入,我们可以定期的进行增量OPTIMIZE。经过OPTIMIZE之后,大量的小文件会合并成大文件,默认1GB,并且数据分布会根据Z-Order算法进行了优化。
第一步:我们需要筛选出待优化的文件。这里有两个原则:OPTIMIZE语句中的where条件和OPTIMIZE策略。OPTIMIZE语句的where条件只支持使用分区列,也就是支持对表的某些分区进行OPTIMIZE。OPTIMIZE策略支持全量和增量两种。全量策略是对表或者分区的所有数据进行优化,增量策略是在全量优化的基础上对新写入数据进行优化。
第二步:根据多维列值计算出Z地址。图中示例SQL,我们要根据first_name和last_name的数据来进行数据组织优化。首先,我们需要将每行中的first_name和last_name两列的值进行数字化。我们取用每个cel值在整个column值的range id作为cel值的数字化。然后再将这个数字使用若干字节bits表示,最后将多个字节bits进行交错位,最终得到转换后的Z地址。整个过程和Geohash很相似。
第三步:根据每一行计算得到的Z地址进行Range重分区,数据会shuffle到多个partition中。这一步等价于Dataset.repartitionByRange(ZOrderAddress)
最后,将重分区的partition使用Copy on Write写回到存储系统中。至此,我们就完成了表数据的OPTIMIZE。
下面我们来看看表数据的组织优化究竟能带来多大的查询性能提升。
4. 性能评测
本次主要进行两方面的评测:
- 关键参数评测:考察影响性能的几个核心参数和配置
- 聚合列:选择不同的列会影响到OPTIMIZE开销和最终的查询效果。
- 输出文件大小:配置写出文件的大小。
- CUBE大小:实际参与进行多维数据聚合的最小数据单元。
- SSB基准测试
4.1 关键参数评测
测试配置:
- 集群配置:10台 16核64GB、500GB 云SSD
- 文件数:100万
- 数据条数:100亿
- 查询语句:select count(*) from employee where first_name like ' Tho%' and last_name like ' Frank %';
4.2 SSB基准测试
测试配置:
- 集群配置:10台 8核32GB、500GB 云SSD
- Scale:100
一个改动:将Q3.1,Q3.2,Q3.3和Q3.4中的公共部分打成宽表,再基于这张宽表进行查询性能测试。
从上面的两个测评可以看出,经过Z-Order OPTIMIZE之后,查询性能都有了明显的提升,最高有几十倍的提升。当然,这一些也是有成本的,即OPTIMIZE过程本身需要消耗一定的计算资源。建议每天在业务低峰期进行OPTIMIZE计算。
5. 小结
本文介绍了腾讯Iceberg基于Z-Order算法实现了数据组织优化,并从多角度的性能测评中可以看出,Iceberg表经过OPTIMIZE之后可以极大地提升查询性能。