前言
学习和使用Hudi近一年了,由于之前忙于工作和学习,没时间总结,现在从头开始总结一下,先从入门开始
Hudi 概念
Apache Hudi 是一个支持插入、更新、删除的增量数据湖处理框架,有两种表类型:COW和MOR,可以自动合并小文件,Hudi自己管理元数据,元数据目录为.hoodie
,
具体的概念可以查看官网https://hudi.apache.org/cn/docs/0.9.0/overview
Hudi 学习
- Hudi 官网 https://hudi.apache.org/cn/docs/0.9.0/overview/(因本人最开始学习时Hudi的版本为0.9.0版本,所以这里列的也是0.9.0的连接)
- Hudi 官方公众号号:ApacheHudi (Hudi PMC leesf 运营的),自己搜索即可,这里不贴二维码了
- Github https://github.com/leesf/hudi-resources 这个是Hudi PMC leesf整理的公众号上的文章,PC 浏览器上看比较方便
- GitHub 源码 https://github.com/apache/hudi 想要深入学习,还是得看源码并多和社区交流
Hudi 安装
只需要将Hudi的jar包放到Spark和Hive对应的路径下,再修改几个配置
Spark
Hudi支持Spark程序读写Hudi表,同时也支持Spark SQL insert/update/delete/merge等
包名:hudi-spark-bundle_2.11-0.9.0.jar 下载地址:https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark-bundle_2.11/0.9.0/hudi-spark-bundle_2.11-0.9.0.jar 包名:hudi-utilities-bundle_2.11-0.9.0.jar 下载地址:https://repo1.maven.org/maven2/org/apache/hudi/hudi-utilities-bundle_2.11/0.9.0/hudi-utilities-bundle_2.11-0.9.0.jar
将hudi-spark-bundle_2.11-0.9.0.jar 和 hudi-utilities-bundle_2.11-0.9.0.jar拷贝到
Hive
Hudi可以将元数据同步到Hive表中,Hive只能用来查询,不能insert/update/delete
包名:hudi-hadoop-mr-bundle-0.9.0.jar
下载地址:https://repo1.maven.org/maven2/org/apache/hudi/hudi-hadoop-mr-bundle/0.9.0/hudi-hadoop-mr-bundle-0.9.0.jar
1、将hudi-hadoop-mr-bundle-0.9.0.jar 拷贝至SPARKHOME/jars,当前版本目录为/usr/hdp/3.1.0.0−78/spark2/jars/版本说明:0.9.0为hudi发行版本,2.11为HDP中Spark对应的scala版本这里提供的是Maven的下载地址,对于其他版本,Maven上可以下载到,当然也可以自己打包¨K25KHudi可以将元数据同步到Hive表中,Hive只能用来查询,不能insert/update/delete包名:hudi−hadoop−mr−bundle−0.9.0.jar下载地址:[https://repo1.maven.org/maven2/org/apache/hudi/hudi−hadoop−mr−bundle/0.9.0/hudi−hadoop−mr−bundle−0.9.0.jar](https://repo1.maven.org/maven2/org/apache/hudi/hudi−utilities−bundle2.11/0.9.0/hudi−utilities−bundle2.11−0.9.0.jar)1、将hudi−hadoop−mr−bundle−0.9.0.jar拷贝至HIVE_HOME/lib,当前版本目录为:/usr/hdp/3.1.0.0-78/hive/lib/
2、修改hive配置(在hive-site.xml) hive.input.format=org.apache.hudi.hadoop.HoodieParquetInputFormat
hive.resultset.use.unique.column.names=false (修改这里的配置是因为如果我们用hudi-utilities-bundle中的工具类HoodieDeltaStreamer
,其中的JdbcbasedSchemaProvider
解析Hive表Schema时需要设置这个属性,否则解析异常,关于HoodieDeltaStreamer
的使用我会单独在另一篇文章中总结)
3、重启hive
Tez
1、将上述hudi-hadoop-mr-bundle-0.9.0.jar 打到/hdp/apps/${hdp.version}/tez/tez2.tar.gz中 注意:这里的路径是指HDFS路径 2、修改hive配置(在hive-site.xml) hive.tez.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat 3、重启Tez、Hive
关于第一个打包到tez2.tar.gz,我自己写了一个脚本,如下:
代码语言:javascript复制jar=$1
sudo rm -r tez_temp
mkdir tez_temp
cd tez_temp
hadoop fs -get /hdp/apps/3.1.0.0-78/tez/tez.tar.gz
mkdir tez
tar -zxvf tez.tar.gz -C tez
mkdir gz
sudo rm -r tez/lib/hudi-hadoop-mr*
cp $jar tez/lib/
cd tez
tar -zcvf ../gz/tez.tar.gz ./*
hadoop fs -rm -r /hdp/apps/3.1.0.0-78/tez/tez.tar.gz.back
hadoop fs -mv /hdp/apps/3.1.0.0-78/tez/tez.tar.gz /hdp/apps/3.1.0.0-78/tez/tez.tar.gz.back
cd ../gz/
hadoop fs -put tez.tar.gz /hdp/apps/3.1.0.0-78/tez/
su - hdfs <<EOF
kinit -kt /etc/security/keytabs/hdfs.headless.keytab hdfs-cluster1@INDATA.COM
hadoop fs -chown hdfs:haoop /hdp/apps/3.1.0.0-78/tez/tez.tar.gz
hadoop fs -chmod 444 /hdp/apps/3.1.0.0-78/tez/tez.tar.gz
hadoop fs -ls /hdp/apps/3.1.0.0-78/tez/
exit
EOF
这个脚本在我自己的环境上是可以正常运行使用的,当然可能因本人水平有限,写的还不够好,不能适用所有环境,可以自行修改,仅做参考
Flink
Hudi也支持Flink,本人目前还不会Flink~,可以参考官网https://hudi.apache.org/cn/docs/0.9.0/flink-quick-start-guide
Hudi 写入
Hudi支持Spark、Flink、Java等多种客户端,本人常用Spark、Java客户端,这俩相比较而言,大家用Spark较多,这里就以Spark代码进行简单的示例总结
Spark 配置参数
代码语言:javascript复制import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.spark.sql.SaveMode.{Append, Overwrite}
import org.apache.spark.sql.hudi.command.UuidKeyGenerator
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
val spark = SparkSession.builder().
master("local[*]").
appName("SparkHudiDemo").
config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").
// 扩展Spark SQL,使Spark SQL支持Hudi
config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension").
// 支持Hive,本地测试时,注释掉
// enableHiveSupport().
getOrCreate()
写Hudi并同步到Hive表
代码示例:
代码语言:javascript复制 val spark = SparkSession.builder().
master("local[*]").
appName("SparkHudiDemo").
config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").
// 扩展Spark SQL,使Spark SQL支持Hudi
config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension").
// 支持Hive,本地测试时,注释掉
// enableHiveSupport().
getOrCreate()
import spark.implicits._
val df = Seq((1, "a1", 10, 1000, "2022-05-12")).toDF("id", "name", "value", "ts", "dt")
val databaseName = "default"
val tableName1 = "test_hudi_table_1"
val primaryKey = "id"
val preCombineField = "ts"
val partitionField = "dt"
val tablePath1 = "/tmp/test_hudi_table_1"
save2HudiSyncHiveWithPrimaryKey(df, databaseName, tableName1, primaryKey, preCombineField, partitionField,
UPSERT_OPERATION_OPT_VAL, tablePath1, Overwrite)
spark.read.format("hudi").load(tablePath1).show(false)
// 删除表
save2HudiSyncHiveWithPrimaryKey(df, databaseName, tableName1, primaryKey, preCombineField, partitionField,
DELETE_OPERATION_OPT_VAL, tablePath1, Append)
spark.read.format("hudi").load(tablePath1).show(false)
/**
* 写hudi并同步到hive,有主键,分区字段dt
*
*/
def save2HudiSyncHiveWithPrimaryKey(df: DataFrame, databaseName: String, tableName: String, primaryKey: String, preCombineField: String,
partitionField: String, operation: String, tablePath: String, mode: SaveMode): Unit = {
df.
write.format("hudi").
option(RECORDKEY_FIELD.key, primaryKey). // 主键字段
option(PRECOMBINE_FIELD.key, preCombineField). // 预合并字段
option(PARTITIONPATH_FIELD.key, partitionField).
option(TBL_NAME.key, tableName).
option(KEYGENERATOR_CLASS_NAME.key(), classOf[ComplexKeyGenerator].getName).
option(OPERATION.key(), operation).
// 下面的参数和同步hive元数据,查询hive有关
option(META_SYNC_ENABLED.key, true).
option(HIVE_USE_JDBC.key, false).
option(HIVE_DATABASE.key, databaseName).
option(HIVE_AUTO_CREATE_DATABASE.key, true).
// 内部表,这里非必须,但是在用saveAsTable时则必须,因为0.9.0有bug,默认外部表
option(HIVE_CREATE_MANAGED_TABLE.key, true).
option(HIVE_TABLE.key, tableName).
option(HIVE_CREATE_MANAGED_TABLE.key, true).
option(HIVE_STYLE_PARTITIONING.key, true).
option(HIVE_PARTITION_FIELDS.key, partitionField).
option(HIVE_PARTITION_EXTRACTOR_CLASS.key, classOf[MultiPartKeysValueExtractor].getName).
// 为了SparkSQL更新用,0.9.0版本有bug,需要设置这个参数,最新版本已经修复,可以不设置这个参数
// 详情查看PR:https://github.com/apache/hudi/pull/3745
option(DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key, s"primaryKey=$primaryKey").
mode(mode)
.save(tablePath)
}
代码说明:本地测试需要把同步Hive的代码部分注释掉,因为同步Hive需要连接Hive metaStore 服务器spark-shell里可以跑完整的代码,可以成功同步Hive,0.9.0版本同步Hive时会抛出一个关闭Hive的异常,这个可以忽略,这是该版本的一个bug,虽然有异常但是已同步成功,最新版本已经修复该bug,具体可以查看PR:https://github.com/apache/hudi/pull/3364
读Hudi
Spark 读取如上述代码示例:
代码语言:javascript复制spark.read.format("hudi").load(tablePath1).show(false)
结果:
代码语言:javascript复制 ------------------- -------------------- ------------------ ---------------------- ------------------------------------------------------------------------ --- ---- ----- ---- ----------
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name |id |name|value|ts |dt |
------------------- -------------------- ------------------ ---------------------- ------------------------------------------------------------------------ --- ---- ----- ---- ----------
|20220512101542 |20220512101542_0_1 |id:1 |2022-05-12 |38c1ff87-8bc9-404c-8d2c-84f720e8133c-0_0-20-12004_20220512101542.parquet|1 |a1 |10 |1000|2022-05-12|
------------------- -------------------- ------------------ ---------------------- ------------------------------------------------------------------------ --- ---- ----- ---- ----------
可以看到多了几个Hudi元数据字段其中_hoodie_record_key
为Hudi主键,如果设置了RECORDKEY_FIELD
,比如这里的ID,那么_hoodie_record_key
是根据我们设置字段生成的,默认不是复合主键,这里代码示例改为了复合主键,具体配置为
option(KEYGENERATOR_CLASS_NAME.key(), classOf[ComplexKeyGenerator].getCanonicalName).
这里主要为了和SparkSQL保持一致,因为SparkSQL默认的为复合主键,如果不一致,那么upsert/delete时会有问题
默认情况RECORDKEY_FIELD
是必须设置的,RECORDKEY_FIELD
的默认值为uuid
,如果不设置,则会去找uuid,因为schema里没有uuid
,那么会报错
Hive
在服务器上运行示例代码是可以成功同步到Hive表的,我们看一下Hive表情况:
代码语言:javascript复制show create table test_hudi_table_1;
下面是Hive Hudi表的建表语句,和普通的Hive表的建表语句的区别可以自己比较,其中SERDEPROPERTIES里的内容是为了SparkSQL用的,可以看到这里包含了'primaryKey'='id',在0.9.0版本,Spark SQL获取Hudi的主键字段是根据Hive表里这里的'primaryKey'获取的,如果没有这个属性,那么 Spark SQL认为该表不是主键表,则不能进行update等操作
代码语言:javascript复制 ----------------------------------------------------
| createtab_stmt |
----------------------------------------------------
| CREATE TABLE `test_hudi_table_1`( |
| `_hoodie_commit_time` string, |
| `_hoodie_commit_seqno` string, |
| `_hoodie_record_key` string, |
| `_hoodie_partition_path` string, |
| `_hoodie_file_name` string, |
| `id` int, |
| `name` string, |
| `value` int, |
| `ts` int) |
| PARTITIONED BY ( |
| `dt` string) |
| ROW FORMAT SERDE |
| 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' |
| WITH SERDEPROPERTIES ( |
| 'hoodie.query.as.ro.table'='false', |
| 'path'='/tmp/test_hudi_table_1', |
| 'primaryKey'='id') |
| STORED AS INPUTFORMAT |
| 'org.apache.hudi.hadoop.HoodieParquetInputFormat' |
| OUTPUTFORMAT |
| 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
| LOCATION |
| 'hdfs://cluster1/tmp/test_hudi_table_1' |
| TBLPROPERTIES ( |
| 'last_commit_time_sync'='20220512101500', |
| 'spark.sql.sources.provider'='hudi', |
| 'spark.sql.sources.schema.numPartCols'='1', |
| 'spark.sql.sources.schema.numParts'='1', |
| 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"value","type":"integer","nullable":false,"metadata":{}},{"name":"ts","type":"integer","nullable":false,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}', |
| 'spark.sql.sources.schema.partCol.0'='dt', |
| 'transient_lastDdlTime'='1652320902') |
----------------------------------------------------
Hive查询Hudi表:
代码语言:javascript复制select * from test_hudi_table_1;
代码语言:javascript复制 ---------------------- ----------------------- --------------------- ------------------------- --------------------------------------------------------------------------- ----- ------- -------- ------- ------------- --
| _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | id | name | value | ts | dt |
---------------------- ----------------------- --------------------- ------------------------- --------------------------------------------------------------------------- ----- ------- -------- ------- ------------- --
| 20220513150854 | 20220513150854_0_1 | id:1 | dt=2022-05-12 | dd4ef080-97b6-4046-a337-abb01e26943e-0_0-21-12005_20220513150854.parquet | 1 | a1 | 10 | 1000 | 2022-05-12 |
---------------------- ----------------------- --------------------- ------------------------- --------------------------------------------------------------------------- ----- ------- -------- ------- ------------- --
Hive是可以查询Hudi表的,但是不能update/delete,要想使用update/delete等语句,只能使用Spark SQL,另外Hive可以增量查询。关于如何使用Hudi Spark SQL和Hive的增量查询,这里不展开描述,以后会单独写
配置项说明
这里只说明几个比较重要的配置,其他相关的配置可以看官网和源码
RECORDKEY_FIELD
:默认情况RECORDKEY_FIELD
是必须设置的,RECORDKEY_FIELD
的默认值为uuid
,如果不设置,则会去找uuid,因为schema里没有uuid
,那么会报错。另外Hudi0.9.0支持非主键Hudi表,只需要配置option(KEYGENERATOR_CLASS_NAME.key, classOf[UuidKeyGenerator].getName).
即可,但是在后面的版本已经不支持了KEYGENERATOR_CLASS_NAME
:默认值为SimpleKeyGenerator
,默认不支持复合主键,默认情况下上述_hoodie_record_key
的内容为1
,而不是id:1
,而SparkSQL的默认值为SqlKeyGenerator
,该类是ComplexKeyGenerator
的子类:
class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
也就是本示例所使用的的复合主键类,当使用SimpleKeyGenerator
和ComplexKeyGenerator
同时upsert一个表时,那么会生成两条记录,因为_hoodie_record_key
的内容不一样,所以一张表的 KEYGENERATOR_CLASS_NAME
必须保证一致(父类和子类也是一致的)
PRECOMBINE_FIELD
: 预合并字段,默认值:ts,想详细了解预合并可以参考我的另外两篇博客https://dongkelun.com/2021/07/10/hudiPreCombinedField/和https://dongkelun.com/2021/11/30/hudiPreCombineField2/ upsert时,预合并是必须的,如果我们的表里没有预合并字段,或者不想使用预合并,不设置的话是会抛异常的,因为默认去找ts字段,找不到则跑异常,那么我们可以将预合并字段设置为主键字段PARTITIONPATH_FIELD
: Hudi的分区字段,默认值partitionpath
,对于没有分区的表,我们需要将该字段设置为空字符串option(PARTITIONPATH_FIELD.key, "")
,否则可能会因找不到默认值partitionpath
而抛异常。最新版本已经去掉分区字段默认值,详情可见:https://github.com/apache/hudi/pull/4195OPERATION
: Hudi的写操作类型,默认值为UPSERT_OPERATION_OPT_VAL
即upsert,Hudi支持多种操作类型 如:upsert、insert、bulk_insert、delete等,具体每个版本支持哪些操作类型,可以查看官网或源码,可以根据自己的需求选择选择操作类型。本代码展示了upsert成功后,又删除成功。
下面的参数和同步hive元数据,查询hive有关
META_SYNC_ENABLED
: 默认为false,不同步Hive,要想同步Hive可以将该值设为true,另外也可以设置HIVE_SYNC_ENABLED
进行同步Hive,作用差不多,至于区别,这里不详细解说HIVE_USE_JDBC
: 是否使用jdbc同步hive,默认为true,如果使用jdbc,那么需要设置HIVE_URL
、HIVE_USER
、HIVE_PASS
等配置,因为url和ip有关,每个环境不一样,用起来比较麻烦,所以这里不采用,另外因为实际使用是和Hive绑定的,可以直接使用HMS进行同步,使用起来比较方便,改为false后默认使用HMS同步Hive,具体逻辑可以看Hudi Hive 同步模块的源码,这里不展开HIVE_STYLE_PARTITIONING
: 是否使用Hive格式的分区路径,默认为false,如果设置为true,那么分区路径格式为 =,在这里为dt=2022-05-12,默认情况下只有即2022-05-12,因为我们常用Hive表查询Hudi所以,这里设置为trueHIVE_CREATE_MANAGED_TABLE
: 同步Hive建表时是否为内部表,默认为false,使用saveAsTable(实际调用的Hudi Spark SQL CTAS)建表时0.9.0版本有,本应该为内部表,但还是为外部表,可以通过设置这个参数修正,最新版本已修复,详情可见PR:https://github.com/apache/hudi/pull/3146HIVE_TABLE_SERDE_PROPERTIES
: 同步到Hive表SERDEPROPERTIES,为了Hudi Spark SQL 使用,在0.9.0版本,Spark SQL获取Hudi的主键字段是根据Hive表里这里的'primaryKey'获取的,如果没有这个属性,那么Spark SQL认为该表不是主键表,则不能进行update等操作,而默认情况同步Hive时没有将主键字段同步过去,最新版本已经不需要设置该属性了。相关PR:https://github.com/apache/hudi/pull/3745 这个PR添加了支持HIVE_CREATE_MANAGED_TABLE
配置,但是CTAS依旧有bug,代码里的虽然判断表类型是否为内部表,并添加到options中,但是最后并没有将options用到最终写Hudi的参数中。另一个PR:https://github.com/apache/hudi/pull/3998 该PR的主要目的不是为了解决这个bug,但是附带解决了这个问题,因为options最终被正确传到写Hudi的参数中了
其他Hive相关的配置参数不一一解释,可自行查看官网
hoodie.properties
.hoodie
目录下有表属性文件.hoodie.properties
,内容为:
hoodie.table.precombine.field=ts
hoodie.table.partition.fields=dt
hoodie.table.type=COPY_ON_WRITE
hoodie.archivelog.folder=archived
hoodie.populate.meta.fields=true
hoodie.timeline.layout.version=1
hoodie.table.version=2
hoodie.table.recordkey.fields=id
hoodie.table.base.file.format=PARQUET
hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.table.name=test_hudi_table_1
新版本在该属性文件里增加了很多属性,如HIVE_STYLE_PARTITIONING
即hoodie.datasource.write.hive_style_partitioning
,增加属性便于使表的属性前后保持统一
非主键表
如上面配置项说明所示,Hudi0.9.0版本支持非主键表,对于纯insert的表有用,这里进行简单的代码示例
代码语言:javascript复制 val tableName2 = "test_hudi_table_2"
val tablePath2 = "/tmp/test_hudi_table_2"
save2HudiWithNoPrimaryKey(df, tableName2, tablePath2)
spark.read.format("hudi").load(tablePath2).show(false)
/**
* 非主键表,非分区表
*/
def save2HudiWithNoPrimaryKey(df: DataFrame, tableName: String, tablePath: String): Unit = {
df.
write.format("hudi").
option(KEYGENERATOR_CLASS_NAME.key, classOf[UuidKeyGenerator].getName).
option(RECORDKEY_FIELD.key, "").
option(PARTITIONPATH_FIELD.key, "").
option(TBL_NAME.key, tableName).
option(OPERATION.key(), INSERT_OPERATION_OPT_VAL).
mode(Overwrite).
save(tablePath)
}
结果:
代码语言:javascript复制 ------------------- -------------------- ------------------------------------ ---------------------- ------------------------------------------------------------------------ --- ---- ----- ---- ----------
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key |_hoodie_partition_path|_hoodie_file_name |id |name|value|ts |dt |
------------------- -------------------- ------------------------------------ ---------------------- ------------------------------------------------------------------------ --- ---- ----- ---- ----------
|20220512145525 |20220512145525_0_1 |7263eac1-51f6-42eb-834d-bb5dfe13708e| |4fe619f1-58b1-4f58-94e6-002f9f5f5155-0_0-20-12004_20220512145525.parquet|1 |a1 |10 |1000|2022-05-12|
------------------- -------------------- ------------------------------------ ---------------------- ------------------------------------------------------------------------ --- ---- ----- ---- ----------
可以看到Hudi的主键为uuid,_hoodie_partition_path
为空,即非主键非分区表
代码语言:javascript复制备注:insert默认是会随机更新的(如果是主键表,大家可以将程序改为主键表,自行测试),随机指某些情况下,这和Hudi合并小文件有关,原理这里不详细解释,可以自行查看源码(以后可能会单独总结一篇相关的文章,和Hudi写文件、合并文件有关)。 要想是insert操作不更新,可以使用以下配置:
hoodie.merge.allow.duplicate.on.inserts = true
相关PR:https://github.com/apache/hudi/pull/3644,这个PR是在Java客户端支持这个参数的,Spark客户端本身(在这之前)就支持这个参数
saveAsTable
利用saveAsTable写Hudi并同步Hive,实际最终调用的是Spark SQL CTAS(CreateHoodieTableAsSelectCommand)
CTAS 先用的insert into(InsertIntoHoodieTableCommand),再建表,默认insert,这里展示怎么配置参数使用bulk_insert,并且不使用预合并,这对于转化没有重复数据的历史表时很有用。
insert into SQL 默认是insert,配置一些参数就可以使用upsert/bulk_insert,具体可以看InsertIntoHoodieTableCommand
源码
val tableName3 = "test_hudi_table_3"
save2HudiWithSaveAsTable(df, databaseName, tableName3, primaryKey)
spark.table(tableName3).show()
def save2HudiWithSaveAsTable(df: DataFrame, databaseName: String, tableName: String, primaryKey: String): Unit = {
df.
write.format("hudi").
option(RECORDKEY_FIELD.key(), primaryKey).
// 不需要预合并,所以设置为primaryKey
// 当insert/bulk_insert等操作,并且关闭了相关参数,则不需要设置
// SparkSQL中如果没有显示配置预合并字段,则默认将预合并字段设置为schema的最后一个字段
// 如果为默认值的话,则可能会报null异常,所以设置为主键
// `PRECOMBINE_FIELD.key -> tableSchema.fields.last.name`
// 相关issue:https://github.com/apache/hudi/issues/4131
option(PRECOMBINE_FIELD.key(), primaryKey).
option(DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key, s"primaryKey=$primaryKey").
option(TBL_NAME.key(), tableName).
option(HIVE_CREATE_MANAGED_TABLE.key, true).
// 关闭预合并,虽然默认值为false,但是0.9.0版本SparkSQL,当有主键时,设置为了true
option(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key, false).
// 使用bulk_insert
option(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key, true).
// 这里虽然为Overwrite,但是Hudi CTAS要求目录必须为空,否则会报验证错误
mode(Overwrite).
saveAsTable(s"$databaseName.$tableName")
}
这段代码本地是可以直接跑通的,结果为:
代码语言:javascript复制 ------------------- -------------------- ------------------ ---------------------- -------------------- --- ---- ----- ---- ----------
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| id|name|value| ts| dt|
------------------- -------------------- ------------------ ---------------------- -------------------- --- ---- ----- ---- ----------
| 20220512154039| 20220512154039_0_1| id:1| |de3c99a2-3858-462...| 1| a1| 10|1000|2022-05-12|
------------------- -------------------- ------------------ ---------------------- -------------------- --- ---- ----- ---- ----------
本地测试并没有同步到Hive表,因为并没有开启enableHiveSupport()
(本地验证时,注释掉这个配置),当在服务器上运行时,则可以成功同步到Hive表,可以自己试试,用saveAsTable的好处是,很多配置比如同步Hive都在Hudi Spark SQL的源码里配置了,所以配置较少。CTAS也有一些限制,比如不能覆盖写,不如save(path)
灵活
代码
完整代码地址:https://gitee.com/dongkelun/spark-hudi/blob/master/src/main/scala/com/dkl/blog/hudi/SparkHudiDemo.scala 备注:以后可能因重构地址有所变动
总结
本文对Hudi安装、读写进行了简单的总结,因为精力原因写的可能没有很全面,希望对刚入门Hudi的同学有所帮助,后面会继续总结Hudi Spark SQL 等其他方面的知识。