0874-7.1.7-如何在CDP集群为Spark3集成Iceberg

2022-04-08 12:45:31 浏览数 (1)

1.文档编写目的

Iceberg是一种开放的数据湖表格式,您可以借助Iceberg快速地在HDFS上构建自己的数据湖存储服务,并借助开源大数据生态的Spark、Flink、Hive和Presto等计算引擎来实现数据湖的分析。本篇文章主要介绍如何在Apache Spark3环境下集成Iceberg并使用,Iceberg使用Apache Spark的DataSourceV2 API来实现Data Source和Catalog。Spark DSv2是一个不断更新迭代的API,在不同的Spark版本中支持的程度也不一样,目前Spark2.4版本是不支持SQL DDL操作。关于CDP中安装Spark3可以参考Fayson前面的文章《7.1.7-如何在CDP集群中安装Spark3》。

  • 测试环境

1.CM7.4.4和CDP7.1.7

2.操作系统Redhat7.6

3.Spark版本为3.2

4.集群未启用Kerberos

2.Iceberg包下载并集成

1.在Spark3环境中使用Iceberg前需要先从官网下载Iceberg的依赖包,通过如下地址下载最新版本的包:

代码语言:javascript复制
https://iceberg.apache.org/releases/

3.将下载的包上传至CDP集群所有节点的/opt/cloudera/iceberg目录下

代码语言:javascript复制
mkdir -p /opt/cloudera/iceberg
cp /root/iceberg-spark-runtime-3.2_2.12-0.13.1.jar /opt/cloudera/iceberg/
chmod 644 /opt/cloudera/iceberg/iceberg-spark-runtime-3.2_2.12-0.13.1.jar     
ll /opt/cloudera/iceberg/

完成Iceberg的部署后,登录到CM的控制台页面进入Spark3服务的配置页面

在spark-env.sh配置中增加如下配置

代码语言:javascript复制
export SPARK_DIST_CLASSPATH=$SPARK_DIST_CLASSPATH:/opt/cloudera/iceberg/iceberg-spark-runtime-3.2_2.12-0.13.1.jar

在spark-default.conf配置中增加如下配置

代码语言:javascript复制
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

修改配置后保存,根据CM提示重启Spark相应服务及部署客户配置到Gateway节点。

3.Spark3中使用Iceberg

本章节主要通过spark3-shell的方式来测试及验证Iceberg的使用,具体操作如下:

1.在命令行执行如下命令,进入spark shell命令

代码语言:javascript复制
spark3-shell 
  --conf spark.sql.catalog.hive_prod=org.apache.iceberg.spark.SparkCatalog 
  --conf spark.sql.catalog.hive_prod.type=hive

注意:通过在命令行中添加spark.sql.catalog.(catalog-name)参数及其实现类,来创建和命名目录,实现类主要有如下两种:

org.apache.iceberg.spark.SparkCatalog:支持HiveMetastore或Hadoop作为Iceberg的Catalog。

org.apache.iceberg.spark.SparkSessionCatalog:支持Spark的内置Catalog作为Iceberg的Catalog。

上述命令行中hive_prod为自定义名称,名称的定义没有限制。基于spark.sql.catalog.

hive_prod的属性定义,常见的HADOOP和Hive的属性有如下:

属性

参数值

说明

spark.sql.catalog.catalog-name.type

hive   or hadoop

底层Iceberg的目录实现基于HiveCatalog, HadoopCatalog或者自定义

spark.sql.catalog.catalog-name.catalog-impl

Iceberg目录的底层实现,这里也可以自己编写实现类

spark.sql.catalog.catalog-name.default-namespace

default

当前目录的默认命名空间

spark.sql.catalog.catalog-name.uri

thrift://host:port

HiveMetastore的访问地址,默认可不配置,从hive-site.xml文件中读取

spark.sql.catalog.catalog-name.warehouse

hdfs://nn:8020/warehouse/path

仓库目录

spark.sql.catalog.catalog-name.cache-enabled

true or false

是否开启catalog缓存,默认为true

spark.sql.catalog.catalog-name.cache.expiration-interval-ms

30000 (30 seconds)

设置为-1禁用缓存过期,0完全禁用缓存

2. 在spark3-shell中执行如下代码,创建表并插入数据、修改数据以及删除操作

代码语言:javascript复制
sql("create database iceberg")
sql("show tables from iceberg").show()
代码语言:javascript复制
sql("CREATE TABLE hive_prod.iceberg.test_iceberg (id bigint, data string) USING iceberg")
sql("insert into hive_prod.iceberg.test_iceberg values(1,'fayson')")
sql("select * from hive_prod.iceberg.test_iceberg").show()
代码语言:javascript复制
sql("insert into hive_prod.iceberg.test_iceberg values(2,'fayson')")
sql("select * from hive_prod.iceberg.test_iceberg").show()
代码语言:javascript复制
sql("update hive_prod.iceberg.test_iceberg set data='fayson2' where id=2")
sql("select * from hive_prod.iceberg.test_iceberg").show()
代码语言:javascript复制
sql("delete from hive_prod.iceberg.test_iceberg where id=2")
sql("select * from hive_prod.iceberg.test_iceberg").show()

3.通过Hive查看创建的Iceberg表

代码语言:javascript复制
show databases;
代码语言:javascript复制
show create table test_iceberg;

4.查看创建的Iceberg表在HDFS路径上存储格式

代码语言:javascript复制
hadoop fs -lsr /warehouse/tablespace/external/hive/iceberg.db/test_iceberg

4.总结

1.Catalog Name 可以任意指定,但不能不指定,否则无法通过指定的Catalog Name查找到Iceberg相关的表。

2.在CDP集群的Spark3默认与Hive集成,因此在指定catalog类型为Hive时则不需要额外的配置HiveMetaStore的URI信息

3.使用HiveMetastore作为Catalog时,创建的Iceberg表会将元数据信息记录到Hive的元数据,在不指定Catalog的存储目录时,默认使用Hive的仓库目录路径。

4.如果不添加spark.sql.extensions=

org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions时,在执行Update操作时会报”UPDATE TABLE is not supported temporarily.”

0 人点赞