1. 环境准备
•Flink 1.12.2_2.11•Hudi 0.9.0-SNAPSHOT(master分支)•Spark 2.4.5、Hadoop 3.1.3、Hive 3.1.2
2. Flink CDC写入Hudi
MySQL建表语句如下
代码语言:javascript复制create table users( id bigint auto_increment primary key, name varchar(20) null, birthday timestamp default CURRENT_TIMESTAMP not null, ts timestamp default CURRENT_TIMESTAMP not null); // 随意插入几条数据insert into users (name) values ('hello');insert into users (name) values ('world');insert into users (name) values ('iceberg');insert into users (id,name) values (4,'spark');insert into users (name) values ('hudi'); select * from users;update users set name = 'hello spark' where id = 5;delete from users where id = 5;
启动sql-client
代码语言:javascript复制$FLINK_HOME/bin/sql-client.sh embedded //1.创建 mysql-cdcCREATE TABLE mysql_users ( id BIGINT PRIMARY KEY NOT ENFORCED , name STRING, birthday TIMESTAMP(3), ts TIMESTAMP(3)) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'server-time-zone' = 'Asia/Shanghai', 'database-name' = 'mydb', 'table-name' = 'users' ); // 2.创建hudi表CREATE TABLE hudi_users2( id BIGINT PRIMARY KEY NOT ENFORCED, name STRING, birthday TIMESTAMP(3), ts TIMESTAMP(3), `partition` VARCHAR(20)) PARTITIONED BY (`partition`) WITH ( 'connector' = 'hudi', 'table.type' = 'MERGE_ON_READ', 'path' = 'hdfs://localhost:9000/hudi/hudi_users2', 'read.streaming.enabled' = 'true', 'read.streaming.check-interval' = '1' ); //3.mysql-cdc 写入hudi ,会提交有一个flink任务INSERT INTO hudi_users2 SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') FROM mysql_users;
Flink任务提交成功后可以查看任务界面
同时可以查看HDFS里的Hudi数据路径,这里需要等Flink 5次checkpoint(默认配置可修改)之后才能查看到这些目录,一开始只有.hoodie
一个文件夹
在MySQL执行insert、update、delete
等操作,当进行compaction生成parquet文件后就可以用hive/spark-sql/presto(本文只做了hive和spark-sql的测试)进行查询,这里需要注意下:如果没有生成parquet文件,我们建的parquet表是查询不出数据的。
3. Hive查询Hudi表
代码语言:javascript复制cd $HIVE_HOMEmkdir auxlib
然后将hudi-hadoop-mr-bundle-0.9.0-SNAPSHOT.jar
拷贝过来
使用beeline登录hive
代码语言:javascript复制beeline -u jdbc:hive2://localhost:10000 -n hadoop hadoop
创建外部表关联Hudi路径,有两种建表方式
代码语言:javascript复制方式一:INPUTFORMAT是org.apache.hudi.hadoop.HoodieParquetInputFormat这种方式只会查询出来parquet数据文件中的内容,但是刚刚更新或者删除的数据不能查出来// 创建外部表CREATE EXTERNAL TABLE `hudi_users_2`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `id` bigint, `name` string, `birthday` bigint, `ts` bigint) PARTITIONED BY ( `partition` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://localhost:9000/hudi/hudi_users2'; 方式二:INPUTFORMAT是org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat // 这种方式是能够实时读出来写入的数据,也就是Merge On Write,会将基于Parquet的基础列式文件、和基于行的Avro日志文件合并在一起呈现给用户。 CREATE EXTERNAL TABLE `hudi_users_2_mor`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `id` bigint, `name` string, `birthday` bigint, `ts` bigint) PARTITIONED BY ( `partition` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://localhost:9000/hudi/hudi_users2'; // 添加分区alter table hudi_users_2 add if not exists partition(`partition`='20210414') location 'hdfs://localhost:9000/hudi/hudi_users2/20210414'; alter table hudi_users_2_mor add if not exists partition(`partition`='20210414') location 'hdfs://localhost:9000/hudi/hudi_users2/20210414'; // 查询分区的数据select * from hudi_users_2 where `partition`=20210414;select * from hudi_users_2_mor where `partition`=20210414;
INPUTFORMAT是org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat格式的表在hive3.1.2里面是不能够执行统计操作的
执行select count(1) from hudi_users3_mor where
partition='20210414'
;
查看hive日志 tail -fn 100 hiveserver2.log
需要进行如下设置:set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat ;
具体原因参照这个issue:https://github.com/apache/hudi/issues/2813,或者阿里云技术文档:https://help.aliyun.com/document_detail/193310.html?utm_content=g_1000230851&spm=5176.20966629.toubu.3.f2991ddcpxxvD1#title-ves-82n-odd
再执行一遍依旧报错
但是在本地用hive-2.3.8
执行成功了,社群里面的同学测试1.1版本的也报同样的错误,目前猜测是hive版本兼容性有关
4. Spark-SQL查询Hudi表
将hudi-spark-bundle_2.11-0.9.0-SNAPSHOT.jar
拷贝到$SPAKR_HOME/jars
,每个节点都拷贝一份
将hudi-hadoop-mr-bundle-0.9.0-SNAPSHOT.jar
拷贝到$HADOOP_HOME/share/hadoop/hdfs
下,每个节点都拷贝一份,然后重启hadoop
创建表,同样有两种方式
代码语言:javascript复制CREATE EXTERNAL TABLE `hudi_users3_spark`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `id` bigint, `name` string, `birthday` bigint, `ts` bigint) PARTITIONED BY ( `partition` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://localhost:9000/hudi/hudi_users3'; alter table hudi_users3_spark add if not exists partition(`partition`='20210414') location 'hdfs://localhost:9000/hudi/hudi_users3/20210414'; select * from hudi_users3_spark where `partition`='20210414'; // 创建可以实时读表数据的格式CREATE EXTERNAL TABLE `hudi_users3_spark_mor`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `id` bigint, `name` string, `birthday` bigint, `ts` bigint) PARTITIONED BY ( `partition` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' LOCATION 'hdfs://localhost:9000/hudi/hudi_users3'; alter table hudi_users3_spark_mor add if not exists partition(`partition`='20210414') location 'hdfs://localhost:9000/hudi/hudi_users3/20210414'; select * from hudi_users3_spark_mor where `partition`='20210414';
如果Spark-SQL读取实时Hudi数据,必须进行如下设置set spark.sql.hive.convertMetastoreParquet=false;
这里需要注意如果创建表的时候字段类型不对会报错,比如
代码语言:javascript复制CREATE EXTERNAL TABLE `hudi_users3_spark_mor`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `id` string, `name` string, `birthday` string, `ts` string) PARTITIONED BY ( `partition` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' LOCATION 'hdfs://localhost:9000/hudi/hudi_users3';
id 、ts、birthday都设置为String,会报下面错误。Spark-SQL想读取Hudi数据,字段类型需要严格匹配
5. 后续
目前使用小规模数据测试Flink CDC写入Hudi,后面我们准备用生产数据来走一波,看看Flink-CDC写入Hudi的性能和稳定性。