实战 | 将Kafka流式数据摄取至Hudi

2021-04-13 11:47:47 浏览数 (1)

1. 引入

Hudi支持以下存储数据的视图

  • 读优化视图 : 在此视图上的查询将查看给定提交或压缩操作中数据集的最新快照。该视图仅将最新parquet文件暴露给查询,所以它有可能看不到最新的数据,并保证与非Hudi列式数据集相比,具有相同的列式查询性能
  • 增量视图 : 对该视图的查询只能看到从某个提交/压缩后写入数据集的新数据。该视图有效地提供了更改流,来支持增量数据管道。
  • 实时视图 : 在此视图上的查询将查看某个增量提交操作中数据集的最新快照。该视图通过动态合并最新的基本文件和增量文件来提供近实时数据集。

使用Hudi自带的DeltaStreamer工具写数据到Hudi,开启--enable-hive-sync 即可同步数据到hive表。

2. 步骤

2.1 DeltaStreamer启动命令

代码语言:javascript复制
spark-submit --master yarn   
  --driver-memory 1G 
  --num-executors 2 
  --executor-memory 1G 
  --executor-cores 4 
  --deploy-mode cluster 
  --conf spark.yarn.executor.memoryOverhead=512 
  --conf spark.yarn.driver.memoryOverhead=512 
  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /.../hudi-utilities-bundle_2.11-0.5.2-SNAPSHOT.jar` 
  --props hdfs://../kafka.properties 
  --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider 
  --source-class org.apache.hudi.utilities.sources.JsonKafkaSource 
  --target-base-path hdfs://../business 
  --op UPSERT 
  --target-table business      '这里其实并不是hive表的名称,实际表名是在kafka.properties中配置'
  --enable-hive-sync           '开启同步至hive'
  --table-type MERGE_ON_READ 
  --source-ordering-field create_time 
  --source-limit 5000000

2.2 kafka.properties配置实例

代码语言:javascript复制
hoodie.upsert.shuffle.parallelism=2
hoodie.insert.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.datasource.write.recordkey.field=uuid
hoodie.datasource.write.partitionpath.field=create_time
hoodie.datasource.write.precombine.field=update_time
hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://../t_business.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://../t3_trip.t_business.avsc
hoodie.deltastreamer.source.kafka.topic=t_business_topic
group.id=t_business_group
bootstrap.servers=localhost
auto.offset.reset=latest
hoodie.parquet.max.file.size=134217728
hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.keygen.TimestampBasedKeyGenerator
hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING
hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.datasource.hive_sync.database=dwd
hoodie.datasource.hive_sync.table=test
hoodie.datasource.hive_sync.username=用户名
hoodie.datasource.hive_sync.password=密码
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://.....
hoodie.datasource.hive_sync.partition_fields=分区字段

3. 不同模式

3.1 MOR模式

如果使用MOR模式写入数据会在Hive的dwd库下面生成两张表。分别是testro 和 testrt testrt表支持:快照视图和增量视图查询 testro表支持:读优化视图查询

3.1.1 使用Spark查询
代码语言:javascript复制
spark-shell --master yarn 
--driver-memory 1G 
--num-executors 1 
--executor-memory 1G 
--executor-cores 1 
--jars /home/t3cx/apps/hudi/hudi-spark-bundle_2.11-0.5.2-SNAPSHOT.jar 
--conf spark.sql.hive.convertMetastoreParquet=false   '在进行快照视图查询的时候需要添加此配置'

#快照视图
spark.sql("select count(*) from dwd.test_rt").show() 
#读优化视图
spark.sql("select count(*) from dwd.test_ro").show() 
#增量视图
saprk sql不支持
3.1.2 使用Hive查询
代码语言:javascript复制
beeline -u jdbc:hive2://incubator-t3-infra04:10000 -n t3cx  -p t3cx  
  --hiveconf hive.stats.autogather=false 

 #读优化查询
 select * from dwd.test_ro;
 #快照查询
 select * from dwd.test_rt;
 #增量查询
 set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
 set hoodie.test.consume.mode=INCREMENTAL;
 set hoodie.test.consume.max.commits=3;
 set hoodie.test.consume.start.timestamp=20200427114546;
 select count(*) from  dwd.test_rt where `_hoodie_commit_time` > '20200427114546';

 #注意:
#1、hudi中parquet做了shaded,我在测试中发现(CDH6.3.0)下必须加载hudi-hadoop-mr中的parquet-avro包才行,clouder用户需要必须要重新安装mr所需要的jar
#2、set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat 最好显示设置,否则有可能在某种情况下无法加载到hive.input.formate,即便在create-table的时候已经指定

3.2 COW模式

如果使用COW模式写入数据,会在Hive的dwd库下面生成一张表,test test表支持:快照视图和增量视图

3.2.1 使用Spark查询
代码语言:javascript复制
spark-shell --master yarn 
--driver-memory 1G 
--num-executors 1 
--executor-memory 1G 
--executor-cores 1 
--jars /home/t3cx/apps/hudi/hudi-spark-bundle_2.11-0.5.2-SNAPSHOT.jar 
--conf spark.sql.hive.convertMetastoreParquet=false

#快照视图
spark.sql("select count(*) from dwd.test").show()
代码语言:javascript复制
//增量视图 无需遍历全部数据,即可获取时间大于20200426140637的数据
import org.apache.hudi.DataSourceReadOptions
val hoodieIncViewDF = spark.read.format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20200426140637").load("hdfs://..../t3_trip_t_business15")
spark.sql("select count(*) from dwd.test_rt where _hoodie_commit_time>'20200426140637'").show()
3.2.2 使用Hive查询
代码语言:javascript复制
beeline -u jdbc:hive2://incubator-t3-infra04:10000 -n t3cx  -p t3cx  
  --hiveconf hive.stats.autogather=false 

  #快照查询
  select count(*) from dwd.test;
  #增量查询
  set hoodie.test.consume.mode=INCREMENTAL;
  set hoodie.test.consume.max.commits=3;
  set hoodie.test.consume.start.timestamp=20200427114546;
  select count(*) from  dwd.test where `_hoodie_commit_time` > '20200427114546';

4. 总结

DeltaStreamer是Hudi提供的非常实用的工具,通过DeltaStreamer可以将Kafka、DFS上的数据导入Hudi,而本篇博文主要讲解了如何使用DeltaStreamer将数据从Kafka导入Hudi,并演示了如何使用Spark和Hive查询Hudi数据。

0 人点赞