2022 年 6 月,Cloudera宣布在 Cloudera 数据平台 (CDP) 中全面推出 Apache Iceberg。Iceberg 是一种 100% 开放表格式,由Apache Software Foundation开发,可帮助用户避免供应商锁定并实现开放式 Lakehouse。
普遍可用性涵盖了在 CDP 中的一些关键数据服务中运行的 Iceberg,包括 Cloudera 数据仓库 ( CDW )、Cloudera 数据工程 ( CDE ) 和 Cloudera 机器学习 ( CML )。这些连接使分析师和数据科学家能够通过他们选择的工具和引擎轻松地就相同的数据进行协作。不再有锁定、不必要的数据转换或跨工具和云的数据移动,只是为了从数据中提取洞察力。
使用 CDP 中的 Iceberg,您可以从以下主要功能中受益:
- CDE 和 CDW 支持 Apache Iceberg:分别按照 Spark ETL 和 Impala 商业智能模式在 CDE 和 CDW 中运行查询。
- 探索性数据科学和可视化: 通过 CML 项目中自动发现的 CDW 连接访问 Iceberg 表。
- 丰富的 SQL(查询、DDL、DML)命令集:使用为 CDW 和 CDE 开发的 SQL 命令创建或操作数据库对象、运行查询、加载和修改数据、执行时间旅行操作以及将 Hive 外部表转换为 Iceberg 表。
- 时间旅行:重现给定时间或快照ID的查询,例如可用于历史审计和错误操作的回滚。
- 就地表(架构、分区)演进:演进 Iceberg 表架构和分区布局,而不会造成代价高昂的干扰,例如重写表数据或迁移到新表。
- SDX 集成 (Ranger):通过 Apache Ranger 管理对 Iceberg 表的访问。
在这篇由两部分组成的博客文章中,我们将向您展示如何在 CDP 中使用 Iceberg 来构建一个开放的湖仓,并利用从数据工程到数据仓库再到机器学习的 CDP 计算服务。
在第一部分中,我们将重点介绍如何在 CDP 中使用 Apache Iceberg 构建开放式湖屋;使用 CDE 摄取和转换数据;并利用时间旅行、分区演变和对 Cloudera 数据仓库上的 SQL 和 BI 工作负载的访问控制。
解决方案概述
先决条件:
应提供以下 CDP 公共云 (AWS) 数据服务:
- Cloudera 数据仓库 Impala 虚拟仓库
- 启用 Airflow 的 Cloudera 数据工程 (Spark 3)
- Cloudera 机器学习
使用 CDE 将数据加载到 Iceberg 表中
我们首先在 CDE 中创建 Spark 3虚拟集群(VC)。为了控制成本,我们可以调整虚拟集群的配额并使用 Spot 实例。此外,选择启用 Iceberg 分析表的选项可确保 VC 具有与 Iceberg 表交互所需的库。
几分钟后,VC 将启动并运行,准备好部署新的 Spark 作业。
由于我们将使用 Spark 执行一系列表操作,因此我们将使用 Airflow 来编排这些操作的管道。
第一步是加载我们的 Iceberg 表。除了直接使用新数据创建和加载 Iceberg 表之外,CDP 还提供了一些其他选项。您可以导入或迁移现有的外部 Hive 表。
- 导入使源和目标保持完整和独立。
- 迁移会将表转换为 Iceberg 表。
在这里,我们只是将现有的航班表导入到我们航空公司的 Iceberg 数据库表中。
代码语言:javascript复制from pyspark.sql import SparkSession
import sys
spark = SparkSession
.builder
.appName("Iceberg prepare tables")
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hive")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.getOrCreate()
spark.sql("""CALL spark_catalog.system.snapshot('airlines_csv.flights_external',
'airlines_iceberg.flights_v3')""")
我们导入的航班表现在包含与现有外部 hive 表相同的数据,我们可以快速检查按年份的行数以确认:
代码语言:javascript复制year _c1
1 2008 7009728
2 2007 7453215
3 2006 7141922
4 2005 7140596
5 2004 7129270
6 2003 6488540
7 2002 5271359
8 2001 5967780
9 2000 5683047
…
…
就地分区演变
接下来,最常见的数据管理任务之一是修改表的模式。如果它是非分区列,通常这很容易执行。但是如果分区方案需要更改,您通常必须从头开始重新创建表。在 Iceberg 中,这些表管理操作可以以最少的返工来应用,从而减轻数据从业人员在改进表以更好地满足业务需求时的负担。
在管道的第二阶段,我们使用一行代码更改分区方案以包含年份列!
代码语言:javascript复制print(f"Alter partition scheme using year n")
spark.sql("""ALTER TABLE airlines_iceberg.flights_v3
ADD PARTITION FIELD year""")
When describing the table we can see “year” is now a partition column:
…
# Partition Transform Information
# col_name transform_type
year IDENTITY
…
在 ETL 管道的最后阶段,我们将新数据加载到此分区中。让我们看一下如何使用 Impala 来利用这个 Iceberg 表来运行交互式 BI 查询。
将 CDW 与 Iceberg 一起使用
时间旅行
现在我们已经将数据加载到 Iceberg 表中,让我们使用 Impala 来查询表。首先,我们将在 CDW 中打开 Hue 并访问我们刚刚在 CDE 中使用 Spark 创建的表。转到 CDW 并在 Impala 虚拟仓库中打开 Hue。
首先我们检查表的历史并查看:
代码语言:javascript复制DESCRIBE HISTORY flight_v3 ;
示例结果:
creation_time | snapshot_id | parent_id | is_current_ancestor |
---|---|---|---|
2022-07-20 09:38:27.421000000 | 7445571238522489274 | NULL | TRUE |
2022-07-20 09:41:24.610000000 | 1177059607967180436 | 7445571238522489274 | TRUE |
2022-07-20 09:50:16.592000000 | 2140091152014174701 | 1177059607967180436 | TRUE |
现在我们可以使用时间戳和snapshot_id在不同的时间点查询表来查看结果,如下所示。
代码语言:javascript复制select year, count(*) from flights_v3
FOR SYSTEM_VERSION AS OF 7445571238522489274
group by year
order by year desc;
年 | 数数(*) |
---|---|
2005年 | 7140596 |
2004年 | 7129270 |
2003年 | 6488540 |
2002年 | 5271359 |
2001年 | 5967780 |
2000 | 5683047 |
1999 | 5527884 |
1998 | 5384721 |
1997 | 5411843 |
1996 | 5351983 |
1995 | 5327435 |
我们看到,截至第一个快照 ( 7445571238522489274),我们在表中拥有 1995 年至 2005 年的数据。让我们看看第二个快照的数据:
代码语言:javascript复制select year, count(*) from flights_v3
FOR SYSTEM_VERSION AS OF 1177059607967180436
group by year
order by year desc;
年 | 数数(*) |
---|---|
2006年 | 7141922 |
2005年 | 7140596 |
2004年 | 7129270 |
2003年 | 6488540 |
2002年 | 5271359 |
2001年 | 5967780 |
2000 | 5683047 |
1999 | 5527884 |
1998 | 5384721 |
1997 | 5411843 |
1996 | 5351983 |
1995 | 5327435 |
现在我们在表中也有截至 2006 年的数据。使用“FOR SYSTEM_VERSION AS OF <snapshot id>”,您可以查询旧数据。您还可以使用“FOR SYSTEM_TIME AS OF <timestamp>”来使用时间戳。
就地分区演变
除了 CDE (Spark) 的就地分区演化功能外,您还可以使用 CDW (Impala) 执行就地分区演化。首先,我们将使用show create table命令检查表的当前分区,如下所示:
代码语言:javascript复制SHOW CREATE TABLE flights_v3;
我们看到该表是按年份列分区的。我们可以将表的分区方案从按年分区更改为按年和月列分区。将新数据加载到表中后,所有后续查询都将受益于月列和年列的分区修剪。
代码语言:javascript复制ALTER TABLE flights_v3 SET PARTITION spec (year, month);
SHOW CREATE TABLE flights_v3;
CREATE EXTERNAL TABLE flights_v3 ( month INT NULL, dayofmonth INT NULL, dayofweek INT NULL, deptime INT NULL, crsdeptime INT NULL, arrtime INT NULL, crsarrtime INT NULL, uniquecarrier STRING NULL, flightnum INT NULL, tailnum STRING NULL, actualelapsedtime INT NULL, crselapsedtime INT NULL, airtime INT NULL, arrdelay INT NULL, depdelay INT NULL, origin STRING NULL, dest STRING NULL, distance INT NULL, taxiin INT NULL, taxiout INT NULL, cancelled INT NULL, cancellationcode STRING NULL, diverted STRING NULL, carrierdelay INT NULL, weatherdelay INT NULL, nasdelay INT NULL, securitydelay INT NULL, lateaircraftdelay INT NULL, year INT NULL ) PARTITIONED BY SPEC ( year, month ) STORED AS ICEBERG LOCATION 's3a://xxxxxx/warehouse/tablespace/external/hive/airlines.db/flights_v3' TBLPROPERTIES ('OBJCAPABILITIES'='EXTREAD,EXTWRITE', 'engine.hive.enabled'='true', 'external.table.purge'='TRUE', 'iceberg.catalog'='hadoop.tables', 'numFiles'='2', 'numFilesErasureCoded'='0', 'totalSize'='6958', 'write.format.default'='parquet')
通过 SDX 集成进行细粒度访问控制 (Ranger)
为了保护 Iceberg 表,我们支持基于 Ranger 的行和列安全规则,如下所示。
taxout列的列掩码:
早于 2000 年的行掩码:
代码语言:javascript复制SELECT taxiout FROM flights_v3 limit 10;
SELECT distinct (year) FROM flights_v3;
BI 查询
查询查找所有国际航班,定义为目的地机场国家与始发机场国家不同的航班:
代码语言:javascript复制SELECT DISTINCT
flightnum, uniquecarrier, origin, dest, month, dayofmonth, `dayofweek`
FROM flights_v3, airports_iceberg oa, airports_iceberg da
WHERE
f.origin = oa.iata and f.dest = da.iata and oa.country <> da.country
ORDER BY month ASC, dayofmonth ASC
LIMIT 4 ;
flightnum | uniquecarrier | origin | dest | month | dayofmonth | dayofweek |
---|---|---|---|---|---|---|
2280 | XE | BTR | IAH | 1 | 1 | 4 |
1673 | DL | ATL | BTR | 1 | 1 | 7 |
916 | DL | BTR | ATL | 1 | 1 | 2 |
3470 | MQ | BTR | DFW | 1 | 1 | 1 |
查询以探索乘客清单数据。例如,我们有国际转机航班吗?
代码语言:javascript复制SELECT * FROM unique_tickets a, flights_v3 o, flights_v3 d, airports oa, airports da WHERE a.leg1flightnum = o.flightnum AND a.leg1uniquecarrier = o.uniquecarrier AND a.leg1origin = o.origin AND a.leg1dest = o.dest AND a.leg1month = o.month AND a.leg1dayofmonth = o.dayofmonth AND a.leg1dayofweek = o.`dayofweek` AND a.leg2flightnum = d.flightnum AND a.leg2uniquecarrier = d.uniquecarrier AND a.leg2origin = d.origin AND a.leg2dest = d.dest AND a.leg2month = d.month AND a.leg2dayofmonth = d.dayofmonth AND a.leg2dayofweek = d.`dayofweek` AND d.origin = oa.iata AND d.dest = da.iata AND oa.country <> da.country ;
总结
在第一篇博客中,我们与您分享了如何使用 Cloudera 数据平台中的 Apache Iceberg 来构建一个开放的 Lakehouse。在示例工作流中,我们向您展示了如何使用 Cloudera 数据工程 (CDE) 将数据集摄取到Iceberg表中,执行时间旅行和就地分区演化,以及使用 Cloudera 数据仓库应用细粒度访问控制 (FGAC) ( CDW)。请继续关注第二部分!
要自行构建开放式 Lakehouse,请注册 60 天试用版或试驾 CDP,尝试 Cloudera Data Warehouse (CDW)、Cloudera Data Engineering (CDE) 和 Cloudera Machine Learning ( CML )。如果您有兴趣在 CDP 中讨论 Apache Iceberg,请让您的客户团队知道。
原文作者:Bill Zhang、Peter Ableda、Shaun Ahmadian和Manish Maheshwari
原文链接:https://blog.cloudera.com/how-to-use-apache-iceberg-in-cdps-open-lakehouse/