hudi支持HMS catalog啦!
功能亮点:当flink和spark同时接入hive metastore时,用hive metastore对hudi的元数据进行管理,无论是使用flink还是spark引擎建表,另外一种引擎或者hive都可以直接查询。
由于该功能为社区同学开发,还没合并到master分支,具体可以参考下述PR:
https://github.com/apache/hudi/pull/6013
本文以HDP集群为例,其他版本分别为:
flink 1.13.6
spark 3.2.1
在HDP集群中,hive的配置文件路径为/etc/hive/conf,所以在flink sql client中使用hive的配置文件来创建hudi-hive catalog从而将hudi元数据存储于hive metastore中。
在flink中写入数据
在flink sql client中进行如下操作:
代码语言:javascript复制create catalog hudi with(
'type' = 'hudi-hive',
'hive-conf-dir'='/etc/hive/conf'
);
--- 创建数据库供hudi使用
create database hudi.hudidb;
Flink sql client中建表
代码语言:javascript复制--- order表
CREATE TABLE hudi.hudidb.orders_hudi(
uuid INT,
ts INT,
num INT,
PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ'
);
--- product表
CREATE TABLE hudi.hudidb.product_hudi(
uuid INT,
ts INT,
name STRING,
PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ'
);
--- 宽表
CREATE TABLE hudi.hudidb.orders_product_hudi(
uuid INT,
ts INT,
name STRING,
num INT,
PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ'
);
使用Flink SQL进行数据写入:
代码语言:javascript复制insert into hudi.hudidb.orders_hudi values
(1, 1, 2),
(2, 2, 3),
(3, 3, 4);
insert into hudi.hudidb.product_hudi values
(1, 1, 'tony'),
(2, 2, 'mike'),
(3, 3, 'funny');
insert into hudi.hudidb.orders_product_hudi
select
hudi.hudidb.orders_hudi.uuid as uuid,
hudi.hudidb.orders_hudi.ts as ts,
hudi.hudidb.product_hudi.name as name,
hudi.hudidb.orders_hudi.num as num
from hudi.hudidb.orders_hudi
inner join hudi.hudidb.product_hudi on hudi.hudidb.orders_hudi.uuid = hudi.hudidb.product_hudi.uuid;
在Flink SQL中查看数据
代码语言:javascript复制select * from hudi.hudidb.orders_hudi;
得到:
代码语言:javascript复制select * from hudi.hudidb.product_hudi;
得到:
代码语言:javascript复制select * from hudi.hudidb.orders_product_hudi;
得到:
在Spark中查看数据
hive为了连接集群hive metastore,只需要将hive的配置文件hive-site.xml放置到spark的配置文件目录即可。
通过beeline连接spark thriftserver,查看数据库:
代码语言:javascript复制show databases;
得到:
可以看到刚刚在flink中创建的hudidb数据库。现在查看里面的表:
代码语言:javascript复制use hudidb;
show tables;
由于在将数据写入hudi时,默认会新增_hoodie_commit_time、 _hoodie_record_key等字段用于内部使用,如果使用select * 表进行查询时会查出上述字段。
以查看orders_hudi表数据为例:
代码语言:javascript复制select * from orders_hudi;
得到
所以为了正确查询,需要指定字段:
代码语言:javascript复制select uuid, ts, num from orders_hudi;
得到
其他表同理。
在hive中查看数据
为了在hive引擎中查看,对于MERGE_ON_READ表,至少需要执行过一次压缩,也就是把avro文件压缩为parquet文件,才能够正常查看数据。由于上述操作为批量操作,默认是不会触发压缩操作的,所以需要手动触发压缩。
所以对product_hudi表进行手动压缩
代码语言:javascript复制./bin/flink run -c
org.apache.hudi.sink.compact.HoodieFlinkCompactor
lib/hudi-flink1.13-bundle_2.12-0.12.0-SNAPSHOT.jar
--path hdfs://host146:8020/warehouse/tablespace/managed/hive/hudidb.db/product_hudi
--schedule
注意:如果是使用flink将数据实时流式写入hudi的话,默认在写入五次时会自动触发压缩,不需要手动执行。
为了在hive中查看hudi所有数据,需要设置如下参数:
代码语言:javascript复制set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;
如果设置该参数出现如下报错,
代码语言:javascript复制Error: Error while processing statement: Cannot modify hive.input.format at runtime. It is not in list of params that are allowed to be modified at runtime (state=42000,code=1)
那么通过如下方式修改hive-site.xml配置,新增如下配置,然后重启hive即可。
代码语言:javascript复制hive.security.authorization.sqlstd.confwhitelist.append=hive.input.format
进入hive客户端
设置变量:
代码语言:javascript复制set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;
查看数据库
代码语言:javascript复制show databases;
得到:
查看hudidb库中product_hudi表数据
代码语言:javascript复制select * from hudidb.product_hudi;
得到