hudi HMS Catalog尝鲜指南

2022-09-02 17:46:06 浏览数 (1)

hudi支持HMS catalog啦!

功能亮点:当flink和spark同时接入hive metastore时,用hive metastore对hudi的元数据进行管理,无论是使用flink还是spark引擎建表,另外一种引擎或者hive都可以直接查询。

由于该功能为社区同学开发,还没合并到master分支,具体可以参考下述PR:

https://github.com/apache/hudi/pull/6013

本文以HDP集群为例,其他版本分别为:

flink 1.13.6

spark 3.2.1

在HDP集群中,hive的配置文件路径为/etc/hive/conf,所以在flink sql client中使用hive的配置文件来创建hudi-hive catalog从而将hudi元数据存储于hive metastore中。

在flink中写入数据

在flink sql client中进行如下操作:

代码语言:javascript复制
create catalog hudi with(
 'type' = 'hudi-hive',
  'hive-conf-dir'='/etc/hive/conf'
);

--- 创建数据库供hudi使用
create database hudi.hudidb;

Flink sql client中建表

代码语言:javascript复制
--- order表
CREATE TABLE hudi.hudidb.orders_hudi(
  uuid INT,
  ts INT,
  num INT,
  PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ'
);

--- product表
CREATE TABLE hudi.hudidb.product_hudi(
  uuid INT,
  ts INT,
  name STRING,
  PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ'
);

--- 宽表
CREATE TABLE hudi.hudidb.orders_product_hudi(
  uuid INT,
  ts INT,
  name STRING,
  num INT,
  PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ'
);

使用Flink SQL进行数据写入:

代码语言:javascript复制
insert into hudi.hudidb.orders_hudi values
  (1, 1, 2),
  (2, 2, 3),
  (3, 3, 4);

insert into hudi.hudidb.product_hudi values
  (1, 1, 'tony'),
  (2, 2, 'mike'),
  (3, 3, 'funny');

insert into hudi.hudidb.orders_product_hudi
select
  hudi.hudidb.orders_hudi.uuid as uuid,
  hudi.hudidb.orders_hudi.ts as ts,
  hudi.hudidb.product_hudi.name as name,
  hudi.hudidb.orders_hudi.num as num
from hudi.hudidb.orders_hudi
inner join hudi.hudidb.product_hudi on hudi.hudidb.orders_hudi.uuid = hudi.hudidb.product_hudi.uuid;

在Flink SQL中查看数据

代码语言:javascript复制
select * from hudi.hudidb.orders_hudi;

得到:

代码语言:javascript复制
select * from hudi.hudidb.product_hudi;

得到:

代码语言:javascript复制
select * from hudi.hudidb.orders_product_hudi;

得到:

在Spark中查看数据

hive为了连接集群hive metastore,只需要将hive的配置文件hive-site.xml放置到spark的配置文件目录即可。

通过beeline连接spark thriftserver,查看数据库:

代码语言:javascript复制
show databases;

得到:

可以看到刚刚在flink中创建的hudidb数据库。现在查看里面的表:

代码语言:javascript复制
use hudidb;
show tables;

由于在将数据写入hudi时,默认会新增_hoodie_commit_time、 _hoodie_record_key等字段用于内部使用,如果使用select * 表进行查询时会查出上述字段。

以查看orders_hudi表数据为例:

代码语言:javascript复制
select * from orders_hudi;

得到

所以为了正确查询,需要指定字段:

代码语言:javascript复制
select uuid, ts, num from orders_hudi;

得到

其他表同理。

在hive中查看数据

为了在hive引擎中查看,对于MERGE_ON_READ表,至少需要执行过一次压缩,也就是把avro文件压缩为parquet文件,才能够正常查看数据。由于上述操作为批量操作,默认是不会触发压缩操作的,所以需要手动触发压缩。

所以对product_hudi表进行手动压缩

代码语言:javascript复制
./bin/flink run -c 
org.apache.hudi.sink.compact.HoodieFlinkCompactor 
lib/hudi-flink1.13-bundle_2.12-0.12.0-SNAPSHOT.jar 
--path hdfs://host146:8020/warehouse/tablespace/managed/hive/hudidb.db/product_hudi 
--schedule

注意:如果是使用flink将数据实时流式写入hudi的话,默认在写入五次时会自动触发压缩,不需要手动执行。

为了在hive中查看hudi所有数据,需要设置如下参数:

代码语言:javascript复制
set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;

如果设置该参数出现如下报错,

代码语言:javascript复制
Error: Error while processing statement: Cannot modify hive.input.format at runtime. It is not in list of params that are allowed to be modified at runtime (state=42000,code=1)

那么通过如下方式修改hive-site.xml配置,新增如下配置,然后重启hive即可。

代码语言:javascript复制
hive.security.authorization.sqlstd.confwhitelist.append=hive.input.format

进入hive客户端

设置变量:

代码语言:javascript复制
set hive.input.format= org.apache.hadoop.hive.ql.io.HiveInputFormat;

查看数据库

代码语言:javascript复制
show databases;

得到:

查看hudidb库中product_hudi表数据

代码语言:javascript复制
select * from hudidb.product_hudi;

得到

0 人点赞