计算引擎之下、数据存储之上 | 数据湖Iceberg快速入门

2021-07-06 14:22:19 浏览数 (1)

目前市面上流行的三大开源数据湖方案分别为:Delta、Iceberg 和 Hudi,但是 Iceberg是一个野心勃勃的项目,因为它具有高度抽象和非常优雅的设计,为成为一个通用的数据湖方案奠定了良好基础。目前 Flink Iceberg 构建全场景实时数仓已经有了非常良好的实践,本文带大家简单了解下Iceberg。后面五分钟学大数据会有一期专门介绍基于Flink Iceberg打造T 0实时数仓,本文算是这篇文章的前置铺垫。

Apache Iceberg is an open table format for huge analytic datasets. 这是Iceberg官网上对于Iceberg的定义。从这个定义上来看,Iceberg是一个用于海量数据分析场景下的开源的表格式(其实笔者更愿意用Table Format),也就是说Iceberg本质上是一个表格式。那什么是表格式?表格式和我们熟悉的文件格式(File Format)是一回事吗?

表和表格式是两个概念。表是一个具象的概念,应用层面的概念,我们天天说的表是简单的行和列的组合。而表格式是数据库系统实现层面一个抽象的概念,它定义了一个表中包含哪些字段,表下面文件的组织形式、表索引信息、统计信息以及上层查询引擎读取、写入表中文件的接口。这个直接理解起来可能有点困难,那我们绕个弯用类比的方式先说说文件格式(File Format)是怎么一回事。

1

预备知识:File Format解读

大家熟知的HDFS上的文件格式有Text、Json、Parquet、ORC等,另外,很多数据库系统中的数据都是以特有的文件格式存储,比如HBase的文件格式是HFile。这里就用大家熟知的Parquet来做说明。如果对Parquet不甚了解,可以预先阅读文末参考资料[1],读过之后对Parquet是什么,必然有所了解,这里笔者做个简单的总结:

1.Parquet定义了存储的数据模型。Parquet不仅支持普通的数据模型,而且还支持嵌套的数据模型,对于嵌套数据模型的支持是Parquet的一大特色。参考文章中用了大量篇幅介绍了Parquet用什么算法支持嵌套的数据模型,并解决其中的相关问题。

2.Parquet定义了数据在文件中的存储方式。为了方便叙述,将下图拿出来介绍:

Parquet文件将数据按照列式存储,但并不是说在整个文件中一个列的数据都集中存储在一起,而是划分了Row Group、Column Chunk以及Page的概念。如下所述:

  • Parquet文件会划分为很多Row Group。每个Row Group会存储一个表中相连的多行数据。
  • 每个Row Group会分成多个Column Chunk。多行数据会按照列进行划分,每列的数据集中存储于一个Column Chunk中,因为每个列的数据类型不同,因此不同的Column Chunk会使用不同算法进行压缩解压缩。
  • 每个Column Chunk会分为多个Page。

3.元数据统计信息/索引信息。Parquet文件在footer部分会记录这个文件每个Page、Column Chunk以及Row Group相关的元数据,比如这个Row Group中每一列的最大值、最小值等。这里补充一下,很多文件中是有索引信息的,比如HBase的文件HFile,就是有索引信息包含在文件中的,数据写完之后除了构建元数据统计信息之外,还会构建索引信息。

4.上述1~3从理论上定义了Parquet这个文件格式是如何处理复杂数据类型,如何将数据按照一定规则写成一个文件,又是如何记录元数据信息。实际上,Parquet就是一系列jar包,这些jar包提供了相关的读取和写入API,上层计算引擎只需要调用对应的API就可以将数据写成Parquet格式的文件,这个jar包里面实现了如何将复杂类型的数据进行处理,如何按照列式存储构建一个Page,再构建一个Column Chunk,再接着构建一个Row Group,最后构建元数据统计信息后形成一个Parqeut文件。相反,调用扫描API,这个jar包实现了如果通过元数据统计信息定位扫描的起始位置,如何按照文件格式正确高效地解压数据块将数据扫描出来。

所以,一个Parquet文件格式实际上包含了数据schema定义(是否支持复杂数据类型),数据在文件中的组织形式,文件统计信息、索引以及读写的API实现。

2

Iceberg Table Format解读

相对应的,一个表格式实际上也对应的包含表schema定义(是否支持复杂数据类型),表中文件的组织形式(Partition模式,是Range Partition还是Hash Partition),表相关统计信息、表索引信息以及表的读写API实现。它在整个数据库系统中的位置如下图左侧所示:

上图右侧是Iceberg在数据仓库生态中的位置,和它差不多相当的一个组件是Metastore。不过Metastore是一个服务,而Iceberg就是一系列jar包。既然Metastore和Iceberg我们认为都是表格式,那可以将两者在schema、partition、metadata/index以及读写api这几个方面做个对比:

1.schema基本相同。

两者底层都依赖于Parquet/ ORC等文件格式,这些文件格式都支持复杂数据类型,因此上层只需要做一些适配工作就可以支持复杂数据类型。

2.partition实现完全不同。两者在partition上有很大的不同:

Metastore中partition字段不能是表字段,因为partition字段本质上是一个目录结构,不是用户表中的一列数据。如下图所示是一个二级分区目录,其中一级分区是天级别时间分区,二级分区是小时级别时间分区:

代码语言:javascript复制
date=20200616/
 |- hour=18/
 | |- ...
 |- hour=19/
 | |- ...
 |- hour=20/
 | |- ...
 |- ...

基于Metastore,用户想定位到一个partition下的所有数据,首先需要在Metastore中定位出该partition对应的所在目录位置信息,然后再到HDFS上执行list命令获取到这个分区下的所有文件,对这些文件进行扫描得到这个partition下的所有数据。

Iceberg中partition字段就是表中的一个字段。Iceberg中每一张表都有一个对应的文件元数据表,如下所示:

代码语言:javascript复制
 ----------------------------------------------------------------------------------------------------------------- --------- ----------- --------------- --------------
|file_path                                                                                                                  |file_format|    partition  |       ***     
 --------------------------------------------------------------------------------------------------------------------------- ----------- --------------- --------------
|***/action_logs/data/event_time_hour=2020-06-04-19/action=view/00007-39-4e7af786-9668-4e3d-b8aa-07b7b30fa60a-00000.parquet |PARQUET    |[442027, view] |
|***/action_logs/data/event_time_hour=2020-06-04-19/action=click/00015-47-a9f5ce8f-ee6f-4748-9f49-0f94761859bc-00000.parquet|PARQUET    |[442027, click]|
|***/action_logs/data/event_time_hour=2020-06-04-20/action=click/00031-63-a04ce10d-ae98-4004-bda8-2f18d842b66b-00000.parquet|PARQUET    |[442028, click]|
 -------------------------------------------------------------------------------------------------------------------------------------------------------- --------------

文件元数据表中每条记录表示一个文件的相关信息,这些信息中有一个字段是partition字段,表示这个文件所在的partition。上表中action_logs表的partition字段(event_time_hour,action),第一个文件的对应partition是[442027, view],即[event_time_hour=442027, action="view"],其他文件对应的partition以此类推。因此基于Iceberg,用户想定位到一个partition下的所有数据,只需要在这个表的文件元数据表中找到该partition的所有文件,然后扫描对应文件即可。

很明显,Iceberg表根据partition定位文件相比metastore少了一个步骤,就是根据目录信息去HDFS上执行list命令获取分区下的文件。试想,对于一个二级分区的大表来说,一级分区是小时时间分区,二级分区是一个枚举字段分区,假如每个一级分区下有30个二级分区,那么这个表每天就会有24 * 30 = 720个分区。基于Metastore的partition方案,如果一个SQL想基于这个表扫描昨天一天的数据的话,就需要向NameNode下发720次list请求,如果扫描一周数据或者一个月数据,请求数就更是相当夸张。这样,一方面会导致NameNode压力很大,一方面也会导致SQL请求响应延迟很大。而基于Iceberg的partition方案,就完全没有这个问题。

3.表统计信息实现粒度不同。

(1)Metastore中一张表的统计信息是表/分区级别粒度的统计信息,比如记录一张表中某一列的记录数量、平均长度、为null的记录数量、最大值最小值等。感兴趣的话可以参考Metastore元数据表TAB_COL_STATS,该表用来表示数据表的列统计信息。

(2)Iceberg中统计信息精确到文件粒度,即每个数据文件都会记录所有列的记录数量、平均长度、最大值最小值等。如下所示为数据库icebergdb下action_logs表的所有文件的相关统计信息:

代码语言:javascript复制
scala> spark.read.format("iceberg").load("icebergdb.action_logs.files").show(false)
 --------------------------------------------------------------------------------------------------------------------------------------------------------- ----------- --------------- ------------ ------------------ ------------------- --------------------------------------------- ---------------------------------------- ---------------------------------------- -------------------------------------------------------------------- -------------------------------------------------------------------- ------------ ------------- 
|file_path                                                                                                                                                |file_format|partition      |record_count|file_size_in_bytes|block_size_in_bytes|column_sizes                                 |value_counts                            |null_value_counts                       |lower_bounds                                                        |upper_bounds                                                        |key_metadata|split_offsets|
 --------------------------------------------------------------------------------------------------------------------------------------------------------- ----------- --------------- ------------ ------------------ ------------------- --------------------------------------------- ---------------------------------------- ---------------------------------------- -------------------------------------------------------------------- -------------------------------------------------------------------- ------------ ------------- 
|/libis/hive-2.3.6/hive_iceberg.db/action_logs/data/event_time_hour=2020-06-04-19/action=view/00007-39-4e7af786-9668-4e3d-b8aa-07b7b30fa60a-00000.parquet |PARQUET    |[442027, view] |1           |1418              |67108864           |[1 -> 51, 2 -> 50, 3 -> 51, 4 -> 47, 5 -> 51]|[1 -> 1, 2 -> 1, 3 -> 1, 4 -> 1, 5 -> 1]|[1 -> 0, 2 -> 0, 3 -> 0, 4 -> 0, 5 -> 0]|[1 -> , 2 -> lly, 3 -> view, 4 -> K5, 5 -> !�F�]      |[1 -> , 2 -> lly, 3 -> view, 4 -> K5, 5 -> !�F�]      |null        |[4]          |
|/libis/hive-2.3.6/hive_iceberg.db/action_logs/data/event_time_hour=2020-06-04-19/action=click/00015-47-a9f5ce8f-ee6f-4748-9f49-0f94761859bc-00000.parquet|PARQUET    |[442027, click]|1           |1425              |67108864           |[1 -> 51, 2 -> 50, 3 -> 52, 4 -> 47, 5 -> 51]|[1 -> 1, 2 -> 1, 3 -> 1, 4 -> 1, 5 -> 1]|[1 -> 0, 2 -> 0, 3 -> 0, 4 -> 0, 5 -> 0]|[1 -> , 2 -> lly, 3 -> click, 4 -> K5, 5 -> ���F�]     |[1 -> , 2 -> lly, 3 -> click, 4 -> K5, 5 -> ���F�]     |null        |[4]          |
|/libis/hive-2.3.6/hive_iceberg.db/action_logs/data/event_time_hour=2020-06-04-20/action=view/00023-55-f0494272-6166-4386-88c7-059e3081aa11-00000.parquet |PARQUET    |[442028, view] |1           |1460              |67108864           |[1 -> 51, 2 -> 56, 3 -> 51, 4 -> 47, 5 -> 51]|[1 -> 1, 2 -> 1, 3 -> 1, 4 -> 1, 5 -> 1]|[1 -> 0, 2 -> 0, 3 -> 0, 4 -> 0, 5 -> 0]|[1 -> , 2 -> mint_1989, 3 -> view, 4 -> ч, 5 -> '��G�] |[1 -> , 2 -> mint_1989, 3 -> view, 4 -> ч, 5 -> '��G�] |null        |[4]          |
|/libis/hive-2.3.6/hive_iceberg.db/action_logs/data/event_time_hour=2020-06-04-20/action=click/00031-63-a04ce10d-ae98-4004-bda8-2f18d842b66b-00000.parquet|PARQUET    |[442028, click]|1           |1467              |67108864           |[1 -> 51, 2 -> 56, 3 -> 52, 4 -> 47, 5 -> 51]|[1 -> 1, 2 -> 1, 3 -> 1, 4 -> 1, 5 -> 1]|[1 -> 0, 2 -> 0, 3 -> 0, 4 -> 0, 5 -> 0]|[1 -> , 2 -> mint_1989, 3 -> click, 4 -> ч, 5 -> @r�G�]|[1 -> , 2 -> mint_1989, 3 -> click, 4 -> ч, 5 -> @r�G�]|null        |[4]          |
 --------------------------------------------------------------------------------------------------------------------------------------------------------- ----------- --------------- ------------ ------------------ ------------------- --------------------------------------------- ---------------------------------------- ---------------------------------------- -------------------------------------------------------------------- -------------------------------------------------------------------- ------------ ------------- 

很明显,文件粒度的统计信息对于查询中谓词(即where条件)的过滤会更有效果。基于Metastore,查询谓词只能基于分区进行过滤,选中的分区需要解压甚至扫描其下的所有文件。而基于Iceberg,查询谓词不仅可以过滤到分区级别,也可以基于文件级别的统计信息(每一列的最大值最小值)对这个分区下的文件进行过滤,对于不满足条件的文件可以不用解压扫描。

4.读写API实现不同。

(1)Metastore表格式:上层引擎写好一批文件,调用Metastore的add partition接口将这些文件添加到某个分区下。

(2)Iceberg表格式:上层业务写好一批文件,调用Iceberg的commit接口提交本次写入形成一个新的snapshot快照。整个过程可以用下图表示:

写入引擎调用Iceberg的commit接口,Iceberg主要会做如下几个事情:

  • 会根据提交的文件解析出对应的文件元数据生成一个manifest文件,manifest文件中包含所有提交的数据文件的统计信息,每个数据文件在manifest文件中就是一条记录。
  • manifest文件生成之后,会紧接着生成一个manifests文件。manifests文件中每条记录是这个表当前所有manifest文件统计信息集合。每个manifest文件在manifests文件中就是一条记录。记录内容如下:
代码语言:javascript复制
scala> spark.read.format("iceberg").load("hive_iceberg.action_logs.manifests").show(false)
 --------------------------------------------------------------------------------------------------- ------ ----------------- ------------------- ---------------------- ------------------------- ------------------------ ------------------------------------------------------------- 
|path                                                                                               |length|partition_spec_id|added_snapshot_id  |added_data_files_count|existing_data_files_count|deleted_data_files_count|partition_summaries                                          |
 --------------------------------------------------------------------------------------------------- ------ ----------------- ------------------- ---------------------- ------------------------- ------------------------ ------------------------------------------------------------- 
|/libis/hive-2.3.6/hive_iceberg.db/action_logs/metadata/bb641961-162a-49a8-b567-885430d4e799-m0.avro|5040  |0                |6771375506965563160|4                     |0                        |0                       |[[false, 2020-06-04-19, 2020-06-04-20], [false, click, view]]|
 --------------------------------------------------------------------------------------------------- ------ ----------------- ------------------- ---------------------- ------------------------- ------------------------ ------------------------------------------------------------- 
  • manifests文件生成之后,再紧接着生成一个snapshot文件(文件名为:v2-metadata.json,其中v2是当前snapshot的版本号)。snapshot文件记录这个snapshot对应的表schema信息、partition spec信息以及manifests文件的路径等。

需要说明的是,整个commit过程是一个事务执行,即实现了ACID保证。

  • 原子性:整个提交要么成功,要么失败。不会存在中间过程。
  • 一致性:事务提交成功之后表的snapshot会从一个版本变更为另一个版本。
  • 隔离性:一旦提交成功之后其他查询服务才可以查询到数据,否则查询不到。
  • 持久性:事务提交之后,数据会被永久性地持久化到存储系统。

至于如何实现多线程并发场景下的ACID:

  • 每个iceberg表都有一个HDFS文件记录这个表的当前snapshot版本,文件称为version-hint.text。见下:
代码语言:javascript复制
hadoop@ntsdb2:~$ hdfs dfs -ls /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata
***
-rw-r--r--  1 hadoop supergroup  1 2020-06-08 17:24 /libis/hive-2.3.6/hadoop_iceberg/action_logs/metadata/version-hint.text
  • commit开始之后读取version-hint.text文件中记录的当前snapshot版本,称为base-version。
  • 基于当前base-version加1生成new-version,在tmp目录下生成一个新的snapshot文件,命名为{new-version}-metadata.json。
  • 将这个tmp目录下的snapshot文件rename到表的metadata目录下。

因此整个commit过程利用了乐观锁以及HDFS rename操作的原子性保证ACID事务性。很明显,Iceberg的数据文件写入过程相比Metastore复杂了很多。

为什么要引入这种复杂性呢?那我们先说结论,基于事务提交的snapshot写入模式相比Metastore有两个优势:

  • 表schema和表partition spec可以低成本高效变更。回顾一下如果Hive中要想在一个表中新增一个字段或者删减一个字段的话要怎么处理?是不是要重新建一张表,然后将数据重建一遍。这个代价不可谓不高,而且很低效。同样,如果要新增一个分区字段或者删减一个分区字段,一样需要重建表。但是对于iceberg,每个snapshot文件中会记录对应的schema和partition spec,用户更新schema或者partition字段,会在新生成的snapshot中生效,历史的snapshot还用之前的schema和partition spec。因此,表schema和partition字段更新非常高效,而且低成本。
  • 可以实现增量拉取。所谓增量拉取是指可以读取指定某个时间区间的文件数据,读取的最小粒度是文件。Iceberg因为是上游写入程序一段时间会提交一次事务生成一个snapshot,假如每10分钟提交一次,那在时间点[00:00:00,00:10:00,00:20:00,00:30:00,00:40:00,00:50:00]有对应的snapshot快照[s0,s1,s2,s3,s4,s5]。下游读取程序假如分别要读取[00:05:00~00:28:00]之间和[00:28:00~00:46:00]之间的快照数据,前者对应[s1~s2]之间的文件,后者对应[s3~s4]之间的文件。通过这种方式,可以实现下游读取程序增量读取文件数据。

增量拉取文件数据可以实现上游生产程序增量写入,下游消费程序可以一致性地增量消费。这种增量写入-增量消费的处理模式可以实现准实时的上下游ETL,这为端到端的分钟级别准实时数仓建设提供了可能。相反,基于Metastore的写入模式,是无法实现增量写入-增量消费的。

上面所述的写入API,读取API最大的不同也介绍了,就是Metastore表格式不支持增量拉取,而Iceberg表格式支持增量拉取,同时Iceberg表格式支持文件级别的谓词过滤,查询性能更佳。

3

Iceberg表格式可以解决业务什么问题?

上文笔者从table format这个层面解读了Iceberg在schema、partition、表统计信息以及表的读写API等几个方面与Metastore的不同之处,相信阅读完之后就会明白Iceberg可以解决业务的几大问题:

1.降低NameNode的list请求压力。[新partition模式]

2.提高查询性能。[新partition模式&&新表统计信息]

3.T 1离线数仓进化为分钟级别的准实时数仓。[新API提供了准实时增量消费]

4.所有数据基于Parquet等通用开源文件格式,没有lambad架构,不需要额外的运维成本和机器成本。

5.高效低成本的表schema和partition字段变更。[基于snapshot的schema/partition变更]

4

Iceberg社区新功能规划

在文章最后,笔者再聊聊社区最近的一些功能上的新规划:

1.集成Spark 3.0。当前与Iceberg兼容的Spark版本是2.4.5,随着Spark社区发布最新的3.0版本,Iceberg第一时间对3.0做了支持,并且预计在马上到来的0.9.0版本进行支持。集成Spark 3.0有什么收益呢?Spark 2.4.5仅支持DataFrame方式对Iceberg表进行各种DDLDML操作,不支持SQL方式。而基于Spark 3.0的版本可以支持SQL的基本语句,同时也支持通过DataFrame进行表读写操作。除此之外,Spark 3.0的查询性能会比2.4.5提升很多,这主要得益于Spark 3.0在查询优化器上的改进。

(详见:https://github.com/apache/iceberg/milestone/8)

2.支持Hive InputFormat。当前Iceberg表仅能使用Spark和Presto进行查询,对于使用非常广泛的Hive目前还不支持。这个功能支持之后,就可以使用Hive SQL查询Iceberg表,极大地方便了很多使用Hive进行数据处理的业务。

3.支持Flink Sink/Source。这部分工作可能是很多同学比较关注的,目前整个实现方案已经完成,社区也已经将部分PR合并到了master分支,随着其他相关PR都合并到master分支之后,业务就可以使用Flink将数据写入到Iceberg表中。

(详见:https://github.com/apache/iceberg/milestone/6)

4.支持批量row-level deletes。这个功能主要用于数据合规修正处理。

(详见:https://github.com/apache/iceberg/milestone/4)

5

总结

读到这里,相信大家对Iceberg是个什么东西、它能解决业务什么问题以及社区后续的一些新功能规划都有一个初步的了解了吧。

参考资料:

[1] 深入分析 Parquet 列式存储格式: https://www.infoq.cn/article/in-depth-analysis-of-parquet-column-storage-format/

[2] Apache Iceberg官网:http://iceberg.apache.org/

--END--

0 人点赞