1.文档编写目的
Iceberg是一种开放的数据湖表格式,您可以借助Iceberg快速地在HDFS上构建自己的数据湖存储服务,并借助开源大数据生态的Spark、Flink、Hive和Presto等计算引擎来实现数据湖的分析。本篇文章主要介绍如何在Apache Spark3环境下集成Iceberg并使用,Iceberg使用Apache Spark的DataSourceV2 API来实现Data Source和Catalog。Spark DSv2是一个不断更新迭代的API,在不同的Spark版本中支持的程度也不一样,目前Spark2.4版本是不支持SQL DDL操作。关于CDP中安装Spark3可以参考Fayson前面的文章《7.1.7-如何在CDP集群中安装Spark3》。
- 测试环境
1.CM7.4.4和CDP7.1.7
2.操作系统Redhat7.6
3.Spark版本为3.2
4.集群未启用Kerberos
2.Iceberg包下载并集成
1.在Spark3环境中使用Iceberg前需要先从官网下载Iceberg的依赖包,通过如下地址下载最新版本的包:
https://iceberg.apache.org/releases/
3.将下载的包上传至CDP集群所有节点的/opt/cloudera/iceberg目录下
代码语言:javascript复制mkdir -p /opt/cloudera/iceberg
cp /root/iceberg-spark-runtime-3.2_2.12-0.13.1.jar /opt/cloudera/iceberg/
chmod 644 /opt/cloudera/iceberg/iceberg-spark-runtime-3.2_2.12-0.13.1.jar
ll /opt/cloudera/iceberg/
完成Iceberg的部署后,登录到CM的控制台页面进入Spark3服务的配置页面
在spark-env.sh配置中增加如下配置
代码语言:javascript复制export SPARK_DIST_CLASSPATH=$SPARK_DIST_CLASSPATH:/opt/cloudera/iceberg/iceberg-spark-runtime-3.2_2.12-0.13.1.jar
在spark-default.conf配置中增加如下配置
代码语言:javascript复制spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
修改配置后保存,根据CM提示重启Spark相应服务及部署客户配置到Gateway节点。
3.Spark3中使用Iceberg
本章节主要通过spark3-shell的方式来测试及验证Iceberg的使用,具体操作如下:
1.在命令行执行如下命令,进入spark shell命令
代码语言:javascript复制spark3-shell
--conf spark.sql.catalog.hive_prod=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.hive_prod.type=hive
注意:通过在命令行中添加spark.sql.catalog.(catalog-name)参数及其实现类,来创建和命名目录,实现类主要有如下两种:
org.apache.iceberg.spark.SparkCatalog:支持HiveMetastore或Hadoop作为Iceberg的Catalog。
org.apache.iceberg.spark.SparkSessionCatalog:支持Spark的内置Catalog作为Iceberg的Catalog。
上述命令行中hive_prod为自定义名称,名称的定义没有限制。基于spark.sql.catalog.
hive_prod的属性定义,常见的HADOOP和Hive的属性有如下:
属性 | 参数值 | 说明 |
---|---|---|
spark.sql.catalog.catalog-name.type | hive or hadoop | 底层Iceberg的目录实现基于HiveCatalog, HadoopCatalog或者自定义 |
spark.sql.catalog.catalog-name.catalog-impl | Iceberg目录的底层实现,这里也可以自己编写实现类 | |
spark.sql.catalog.catalog-name.default-namespace | default | 当前目录的默认命名空间 |
spark.sql.catalog.catalog-name.uri | thrift://host:port | HiveMetastore的访问地址,默认可不配置,从hive-site.xml文件中读取 |
spark.sql.catalog.catalog-name.warehouse | hdfs://nn:8020/warehouse/path | 仓库目录 |
spark.sql.catalog.catalog-name.cache-enabled | true or false | 是否开启catalog缓存,默认为true |
spark.sql.catalog.catalog-name.cache.expiration-interval-ms | 30000 (30 seconds) | 设置为-1禁用缓存过期,0完全禁用缓存 |
2. 在spark3-shell中执行如下代码,创建表并插入数据、修改数据以及删除操作
代码语言:javascript复制sql("create database iceberg")
sql("show tables from iceberg").show()
代码语言:javascript复制sql("CREATE TABLE hive_prod.iceberg.test_iceberg (id bigint, data string) USING iceberg")
sql("insert into hive_prod.iceberg.test_iceberg values(1,'fayson')")
sql("select * from hive_prod.iceberg.test_iceberg").show()
代码语言:javascript复制sql("insert into hive_prod.iceberg.test_iceberg values(2,'fayson')")
sql("select * from hive_prod.iceberg.test_iceberg").show()
代码语言:javascript复制sql("update hive_prod.iceberg.test_iceberg set data='fayson2' where id=2")
sql("select * from hive_prod.iceberg.test_iceberg").show()
代码语言:javascript复制sql("delete from hive_prod.iceberg.test_iceberg where id=2")
sql("select * from hive_prod.iceberg.test_iceberg").show()
3.通过Hive查看创建的Iceberg表
代码语言:javascript复制show databases;
代码语言:javascript复制show create table test_iceberg;
4.查看创建的Iceberg表在HDFS路径上存储格式
代码语言:javascript复制hadoop fs -lsr /warehouse/tablespace/external/hive/iceberg.db/test_iceberg
4.总结
1.Catalog Name 可以任意指定,但不能不指定,否则无法通过指定的Catalog Name查找到Iceberg相关的表。
2.在CDP集群的Spark3默认与Hive集成,因此在指定catalog类型为Hive时则不需要额外的配置HiveMetaStore的URI信息
3.使用HiveMetastore作为Catalog时,创建的Iceberg表会将元数据信息记录到Hive的元数据,在不指定Catalog的存储目录时,默认使用Hive的仓库目录路径。
4.如果不添加spark.sql.extensions=
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions时,在执行Update操作时会报”UPDATE TABLE is not supported temporarily.”