环境:本地测试环境 JDK1.8 、Flink 1.11.2 、Hadoop3.0.0 、Hive2.1.1
一、前置说明
本文记录了使用HDFS的一个路径作为iceberg 的结果表,使用Flink实时消费kafka中的数据并写入iceberg表,并且使用Hive作为客户端实时读取。
因为iceberg强大的读写分离特性,新写入的数据几乎可以实时读取。
二、使用步骤
1.创建Hadoop Catalog的Iceberg 表
代码如下(示例):
代码语言:javascript复制 // create hadoop catalog
tenv.executeSql("CREATE CATALOG hadoop_catalog WITH (n"
" 'type'='iceberg',n"
" 'catalog-type'='hadoop',n"
" 'warehouse'='hdfs://nameservice1/tmp',n"
" 'property-version'='1'n"
")");
// change catalog
tenv.useCatalog("hadoop_catalog");
tenv.executeSql("CREATE DATABASE if not exists iceberg_hadoop_db");
tenv.useDatabase("iceberg_hadoop_db");
// create iceberg result table
tenv.executeSql("drop table hadoop_catalog.iceberg_hadoop_db.iceberg_002");
tenv.executeSql("CREATE TABLE hadoop_catalog.iceberg_hadoop_db.iceberg_002 (n"
" user_id STRING COMMENT 'user_id',n"
" order_amount DOUBLE COMMENT 'order_amount',n"
" log_ts STRINGn"
")");
2.使用Hive Catalog创建Kafka流表
代码如下(示例):
代码语言:javascript复制 String HIVE_CATALOG = "myhive";
String DEFAULT_DATABASE = "tmp";
String HIVE_CONF_DIR = "/xx/resources";
Catalog catalog = new HiveCatalog(HIVE_CATALOG, DEFAULT_DATABASE, HIVE_CONF_DIR);
tenv.registerCatalog(HIVE_CATALOG, catalog);
tenv.useCatalog("myhive");
// create kafka stream table
tenv.executeSql("DROP TABLE IF EXISTS ods_k_2_iceberg");
tenv.executeSql(
"CREATE TABLE ods_k_2_iceberg (n"
" user_id STRING,n"
" order_amount DOUBLE,n"
" log_ts TIMESTAMP(3),n"
" WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECONDn"
") WITH (n"
" 'connector'='kafka',n"
" 'topic'='t_kafka_03',n"
" 'scan.startup.mode'='latest-offset',n"
" 'properties.bootstrap.servers'='xx:9092',n"
" 'properties.group.id' = 'testGroup_01',n"
" 'format'='json'n"
")");
3. 使用SQL连接kafka流表和iceberg 目标表
代码如下(示例):
代码语言:javascript复制 System.out.println("---> 3. insert into iceberg table from kafka stream table .... ");
tenv.executeSql(
"INSERT INTO hadoop_catalog.iceberg_hadoop_db.iceberg_002 "
" SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd') FROM myhive.tmp.ods_k_2_iceberg");
4. 数据验证
代码语言:javascript复制bin/kafka-console-producer.sh --broker-list xx:9092 --topic t_kafka_03
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:12:12"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:15:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:20:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"}
{"user_id":"a1111","order_amount":13.0,"log_ts":"2020-06-29 12:32:00"}
{"user_id":"a1112","order_amount":15.0,"log_ts":"2020-11-26 12:12:12"}
hive> add jar /home/zmbigdata/iceberg-hive-runtime-0.10.0.jar;
hive> CREATE EXTERNAL TABLE tmp.iceberg_002(user_id STRING,order_amount DOUBLE,log_ts STRING)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION '/tmp/iceberg_hadoop_db/iceberg_002';
hive> select * from tmp.iceberg_002 limit 5;
a1111 11.0 2020-06-29
a1111 11.0 2020-06-29
a1111 11.0 2020-06-29
a1111 11.0 2020-06-29
a1111 13.0 2020-06-29
Time taken: 0.108 seconds, Fetched: 5 row(s)
总结
本文仅仅简单介绍了使用Flink Table API 消费kafka并实时写入基于HDFS Hadoop Catalog的iceberg 结果表中,初步验证了该方案的可行性。