0911-7.1.7-如何在CDP集群使用Flink SQL Client并与Hive集成

2023-12-04 10:27:55 浏览数 (2)

1 文档概述

在前面Fayson介绍了《0876-7.1.7-如何在CDP中部署Flink1.14》,同时Flink也提供了SQL Client的能力,可以通过一种简单的方式来编写、调试和提交程序到Flink集群,而无需编写一行Java或Scala代码。本篇文章主要介绍如何在CDP集群中使用Flink SQL Client与Hive集成。Flink与Hive的集成,主要有如下两个目的:

首先,可以利用Hive的Metastore作为一个持久目录和Flink的HiveCatalog来跨会话存储Flink特定的元数据。例如:用户可以使用HiveCatalog将Kafka和ElasticSearch表存储在HiveMetastore中,然后在SQL查询中重复使用。

其次,Flink可以作为读写Hive的替代引擎。

  • • 测试环境
    1. 1. CM7.4.4和CDP7.1.7
    2. 2. 操作系统Redhat7.6
    3. 3. Flink1.14.0-csa1.6.1.0
    4. 4. 集群未启用Kerberos

2 与Hive集成说明及依赖准备

1.Flink支持的Hive版本如下:

注意:Hive不同版本与Flink的集成有不同的功能差异,是Hive本身支持的问题,目前CDP中的Hive版本为3.1.3000,并不在当前的支持列表中。

  • • 1.2及更高版本支持Hive内置函数
  • • 3.1及更高版本支持列约束(即PRIMARY KEY和NOT NULL)
  • • 1.2.0及更高版本支持更改表统计信息
  • • 1.2.0及更高版本支持DATE列统计信息
  • • 2.0.x不支持写入ORC表

2.Hive与Flink的集成需要引入额外的依赖包,可用使用官方提供的可用依赖包,也可以自己通过引入独立的依赖实现

  • • 当前Flink官网提供的可用的依赖包如下

注意:当前官方提供的Hive3的依赖版本与CDP7.1.7中Hive版本不一致,并且经过测试也是不可用的。

  • • 引入独立的依赖包(如下列表简单列了几个版本,具体参考官网)

3.官方提供可以执行的依赖包并不能很好的适配CDP,只能通过第二种方式下载独立的依赖实现与Hive的集成

  • • 从Cloudera官方的Maven库下载flink-connector-hive依赖包
代码语言:javascript复制
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive_2.12/1.14.0-csa1.6.0.0

Maven依赖引入方式:

代码语言:javascript复制
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_2.12</artifactId>
    <version>1.14.0-csa1.6.0.0</version>
    <scope>provided</scope>
</dependency>

4.将下载的依赖包上传至CDP集群有Flink Gateway角色的/opt/cloudera/iceberg目录下

代码语言:javascript复制
mkdir -p /opt/cloudera/iceberg

5.上述还提到了hive-exec及其他依赖包均在集群中获取,具体路径如下:

代码语言:javascript复制
/opt/cloudera/parcels/CDH/lib/hive/lib/hive-exec.jar
/opt/cloudera/parcels/CDH/lib/hadoop/client/hadoop-mapreduce-client-core.jar
/opt/cloudera/parcels/CDH/lib/hive/lib/libfb303-0.9.3.jar
/opt/cloudera/parcels/CDH/jars/antlr-runtime-3.5.2.jar

Flink与Hive的集成,找到了依赖的Jar包后,可以将上述依赖的jar包拷贝至Flink的安装目录/opt/cloudera/parcels/FLINK/lib/flink/lib/(需要拷贝至集群所有节点),可以在客户端命令行启动时通过-j的方式引入。

代码语言:javascript复制
flink-sql-client embedded 
  -j /opt/cloudera/iceberg/iceberg-flink-runtime-1.14-0.13.1.jar 
  -j /opt/cloudera/iceberg/flink-connector-hive_2.12-1.14.0-csa1.6.0.0.jar 
  -j /opt/cloudera/parcels/CDH/lib/hadoop/client/hadoop-mapreduce-client-core.jar 
  -j /opt/cloudera/parcels/CDH/lib/hive/lib/libfb303-0.9.3.jar 
  -j /opt/cloudera/parcels/CDH/lib/hive/lib/hive-exec.jar 
  shell

3 Flink与Hive集成验证

1.在命令行执行执行如下脚本启动Flink SQL Client

代码语言:javascript复制
export HADOOP_USER_NAME=hive
flink-sql-client embedded 
  -j /opt/cloudera/iceberg/flink-connector-hive_2.12-1.14.0-csa1.6.0.0.jar 
  -j /opt/cloudera/parcels/CDH/lib/hadoop/client/hadoop-mapreduce-client-core.jar 
  -j /opt/cloudera/parcels/CDH/lib/hive/lib/libfb303-0.9.3.jar 
  -j /opt/cloudera/parcels/CDH/lib/hive/lib/hive-exec.jar 
  shell

2.在命令行执行如下命令设置结果显示方式及执行模式

代码语言:javascript复制
SET 'sql-client.execution.result-mode' = 'tableau';
SET 'execution.runtime-mode' = 'batch';

此处为了便于显示,采用批量的方式执行以及表格的方式显示。

3.执行如下命令,创建一个Hive的Catalog

代码语言:javascript复制
CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/etc/hive/conf'
);

查看已经创建的Catalog

代码语言:javascript复制
show CATALOGS;

4.进入到创建的myhive的Catalog并查看表

代码语言:javascript复制
use catalog myhive;
show tables;

此处看到的表与Hive中的表一致,也是相应的hive表。

5.在命令行执行SQL语句查询表数据

代码语言:javascript复制
select * from test;

与Hive中查询的数据一致

6.执行一个SQL Count的操作

代码语言:javascript复制
select count(*) from test;

4 异常处理

1.在命令行运行Flink的wordcount示例时,当作业运行结束后有如下异常日志输出

代码语言:javascript复制
Exception in thread "Thread-5" java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
    at 

解决方法:

在Cloudera Manager中为Flink Gateway角色的flink-conf.yaml中增加如下配置:

代码语言:javascript复制
classloader.check-leaked-classloader: false

2.在命令行Flink的WordCount示例时会有如下大量异常日志输出

代码语言:javascript复制
2022-04-13 08:37:50,368 ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState [] - Authentication failed
2022-04-13 08:37:50,399 ERROR org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.EnsembleTracker [] - Invalid config event received: {server.1=cdp03.fayson.com:3181:4181:participant, version=0, server.3=cdp02.fayson.com:3181:4181:participant, server.2=cdp01.fayson.com:3181:4181:participant}

解决方案:

在Cloudera Manager中为Flink Gateway角色的log4j.properties和log4j-cli.properties中增加如下配置:

代码语言:javascript复制
logger.curator.name = org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.EnsembleTracker
logger.curator.level = OFF

问题分析:

上述出现的异常日志,主要是因为Flink中引入了Curator依赖包,该依赖包在处理Zookeeper的消息时,收到的信息中携带了”{}”,导致数据解析出现异常,目前该异常并不影响服务的使用(https://issues.apache.org/jira/browse/CURATOR-526),在Curator5.2之后版本修复,在修复的代码中可以看到只是将日志的级别从log.error调整为log.debug,参考https://github.com/apache/curator/pull/382

尝试将5.2版本修复后的类,打包到flink-shaded-zookeeper-3.5.5.7.1.7.0-551.jar包的org/apache/flink/shaded/curator4/org/apache/curator/framework/imps目录下,但在启动跑作业时失败,提示异常日志如下:

代码语言:javascript复制
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/shaded/curator4/org/apache/curator/framework/imps/EnsembleTracker (wrong name: org/apache/curator/framework/imps/EnsembleTracker)

这里可以考虑重新编译,暂未尝试而是通过过滤该ERROR日志的方式解决。

3.在Flink与Hive集成后,运行SQL代码时报大量的异常日志

代码语言:javascript复制
2022-04-13 08:58:24,505 WARN  org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An exception occurred when fetching query results
java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (ef7f994a08f57141fafd18481d13ab85)
    at 

问题分析: 通过分析Flink作业的jobmanager的日志可以看到

通过日志可以看到报错的jobid(ef7f994a08f57141fafd18481d13ab85)实际上是在对应的JobMaster 停止以后收到的请求,因此才会出现该错误。通过报错的日志段,查找对应的源码

可以看到获取结果这块是一个while的循环,在不停的从jobmaster获取结果,这里少了对JobMaster关闭状态的判断,或者少了sleep等待,while的循环导致jobmaster还未完全结束,又来了一次新的请求导致。

解决方案: 在CM的FLink服务中将log的日志级别调整为ERROR,具体配置如下:

代码语言:javascript复制
logger.flink-collect.name = org.apache.flink.streaming.api.operators.collect.CollectResultFetcher
logger.flink-collect.level = ERROR

5 总结

1.官方提供的flink-connector-hive依赖包并不能与CDP的Hive集成,需要使用Cloudera提供的flink-connector-hive_2.12-1.14.0-csa1.6.0.0.jar集成。

2.在Flink SQL Client中创建的Hive Catalog在当前会话有效,在会话重新启动后则需要再次创建。

3.在FLink的Gateway节点必须部署Hive On Tez的Gateway,否则在创建Catalog时会找不到Hive Metastore相关的配置信息(如Metastore URI以及Warehouse的HDFS路径)。

4.在加入了antlr-runtime-3.5.2.jar依赖后,并不能通过设置'table.sql-dialect' = 'hive'使用Hive方言。

5.在未添加hadoop-mapreduce-client-core.jar依赖时,在SQL Client中执行SQL会卡主。

6.在SQL Client下运行Flink作业只支持Per-Job Mode不支持Session Mode模式。

7.通过Flink SQL向表中插入数据后,生成的Flink作业无法自动结束,一直处于运行状态,实际数据已写入表中。

0 人点赞