数据湖(十八):Flink与Iceberg整合SQL API操作

2022-07-22 06:38:50 浏览数 (1)

​Flink与Iceberg整合SQL API操作

Flink SQL 在操作Iceberg时,对应的版本为Flink 1.11.x 与Iceberg0.11.1版本,目前,Flink1.14.2版本与Iceberg0.12.1版本对于SQL API 来说兼容有问题,所以这里使用Flink1.11.6版本与Iceberg0.11.1版本来演示Flink SQL API 操作Iceberg。

一、​​​​​​​SQL API 创建Iceberg表并写入数据

1、创建新项目,导入如下maven依赖包

代码语言:javascript复制
 <properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <maven.compiler.source>1.8</maven.compiler.source>
  <maven.compiler.target>1.8</maven.compiler.target>
  <!-- flink 1.11.x 与Iceberg 0.11.1 合适-->
  <flink.version>1.11.6</flink.version>
  <hadoop.version>3.2.2</hadoop.version>
</properties>

<dependencies>
  <!-- Flink 操作Iceberg 需要的Iceberg依赖 -->
  <dependency>
    <groupId>org.apache.iceberg</groupId>
    <artifactId>iceberg-flink-runtime</artifactId>
    <version>0.11.1</version>
  </dependency>

  <!-- java 开发Flink 所需依赖 -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <!-- Flink Kafka连接器的依赖 -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <!-- 读取hdfs文件需要jar包-->
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>${hadoop.version}</version>
  </dependency>

  <!-- Flink SQL & Table-->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-runtime-blink_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.11</artifactId>
    <version>${flink.version}</version>
  </dependency>


  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
    <scope>test</scope>
  </dependency>

  <!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉-->
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
    <scope>test</scope>
  </dependency>
  <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
  </dependency>
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.25</version>
  </dependency>
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-nop</artifactId>
    <version>1.7.25</version>
    <scope>test</scope>
  </dependency>
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.5</version>
  </dependency>
</dependencies>

2、编写Flink SQL 创建Iceberg表并写入数据

代码语言:javascript复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

        env.enableCheckpointing(1000);

        //1.创建Catalog
        tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH ("  
                "'type'='iceberg',"  
                "'catalog-type'='hadoop',"  
                "'warehouse'='hdfs://mycluster/flink_iceberg')");

        //2.使用当前Catalog
        tblEnv.useCatalog("hadoop_iceberg");

        //3.创建数据库
        tblEnv.executeSql("create database iceberg_db");

        //4.使用数据库
        tblEnv.useDatabase("iceberg_db");

        //5.创建iceberg表 flink_iceberg_tbl
        tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl2(id int,name string,age int,loc string) partitioned by (loc)");

        //6.写入数据到表 flink_iceberg_tbl
        tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 values (1,'zs',18,'beijing'),(2,'ls',19,'shanghai'),(3,'ww',20,'guangzhou')");

3、在Hive中映射Iceberg表并查询

在Hive中执行如下命令创建对应的Iceberg表:

代码语言:javascript复制
#在Hive中创建Iceberg表
CREATE TABLE flink_iceberg_tbl2  (
  id int, 
  name string,
  age int,
  loc string
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/flink_iceberg/iceberg_db/flink_iceberg_tbl2' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table');

代码语言:javascript复制
#在Hive中查询Iceberg表中的数据
hive> select * from flink_iceberg_tbl2;
OK
3	ww	20	guangzhou
1	zs	18	beijing
2	ls	19	shanghai

二、​​​​​​​​​​​​​​SQL API 批量查询Iceberg表数据

Flink SQL API 批量查询Iceberg表数据,直接查询显示即可。代码如下:

代码语言:javascript复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

env.enableCheckpointing(1000);

//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH ("  
    "'type'='iceberg',"  
    "'catalog-type'='hadoop',"  
    "'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.批量读取表数据
TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 ");

tableResult.print();

结果如下:

三、​​​​​​​​​​​​​​SQL API 实时查询Iceberg表数据

Flink SQL API 实时查询Iceberg表数据时需要设置参数“table.dynamic-table-options.enabled”为true,以支持SQL语法中的“OPTIONS”选项,代码如下:

代码语言:javascript复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

env.enableCheckpointing(1000);

Configuration configuration = tblEnv.getConfig().getConfiguration();
// 支持SQL语法中的 OPTIONS 选项
configuration.setBoolean("table.dynamic-table-options.enabled", true);

//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH ("  
        "'type'='iceberg',"  
        "'catalog-type'='hadoop',"  
        "'warehouse'='hdfs://mycluster/flink_iceberg')");

//2.从Iceberg表当前快照读取所有数据,并继续增量读取数据
// streaming指定为true支持实时读取数据,monitor_interval 监控数据的间隔,默认1s
TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*  OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");

tableResult.print();

启动以上代码后,可以看到会将目前存在于Iceberg表中的数据读取出来,向Hive中对应的Iceberg表中插入数据,可以看到控制台实时获取数据。

代码语言:javascript复制
#在向Hive的Iceberg表中插入数据之前需要加入以下两个包:
add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;

#向Hive 中Iceberg 表插入两条数据
hive> insert into flink_iceberg_tbl2 values (4,'ml',30,'shenzhen'),(5,'tq',31,'beijing');

在控制台可以看到实时新增数据

四、​​​​​​​​​​​​​​SQL API指定基于快照实时增量读取数据

Flink SQL API 还支持基于某个snapshot-id来继续实时获取数据,代码如下:

代码语言:javascript复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);

Configuration configuration = tblEnv.getConfig().getConfiguration();
// 支持SQL语法中的 OPTIONS 选项
configuration.setBoolean("table.dynamic-table-options.enabled", true);

//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH ("  
      "'type'='iceberg',"  
      "'catalog-type'='hadoop',"  
      "'warehouse'='hdfs://mycluster/flink_iceberg')");

//2.从Iceberg 指定的快照继续实时读取数据,快照ID从对应的元数据中获取
//start-snapshot-id :快照ID
TableResult tableResult2 = tblEnv.executeSql("SELECT * FROM hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*  OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/");
tableResult2.print();

0 人点赞