前言
本文使用环境版本
- Hive:2.3.9
- Flink:flink-1.12.7-bin-scala_2.12
使用代码连接到 Hive
Hive 需要开启元数据服务
代码语言:javascript复制nohup hive --service metastore >/dev/null 2>&1 &
需要将配置了hive.metastore.uris的配置文件复制到项目resources路径下
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive_remote/warehouse</value>
</property>
<property>
<name>hive.metastore.local</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://192.168.7.101:9083</value>
</property>
</configuration>
依赖
代码语言:javascript复制<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.12.7</flink.version>
<scala.version>2.12.15</scala.version>
<hadoop.version>2.7.7</hadoop.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- Flink Dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Hive Dependency -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
</dependencies>
调用代码
代码语言:javascript复制import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableEnvironment, TableResult}
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.types.Row
import org.apache.flink.util.CloseableIterator
object HiveTest {
def main(args: Array[String]): Unit = {
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build()
val tableEnv: TableEnvironment = TableEnvironment.create(settings)
val name:String = "myhive"
val defaultDataBase:String = "default"
val hiveConfDir :String = "/data/tools/bigdata/apache-hive-2.3.9-bin/conf"
val hive = new HiveCatalog(name, defaultDataBase, hiveConfDir )
tableEnv.registerCatalog("myhive", hive) // 注册Catalog
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.useCatalog("myhive") // 使用注册的Catalog ,不使用的话查不到数据
tableEnv.useDatabase("mydb") // 设置要查询的数据库
tableEnv.executeSql("show tables").print()
}
}
获取数据
代码语言:javascript复制TableResult result;
String SelectTables_sql ="select * from test.testdata";
result = tableEnv.executeSql(SelectTables_sql);
result.print();
Flink SQL Cli集成Hive
环境变量
Apache Hive 是基于 Hadoop 之上构建的, 所以还需要 Hadoop 的依赖,配置好HADOOP_CLASSPATH即可。这一点非常重要,否则在使用FlinkSQL Cli查询Hive中的表时,会报如下错误:
java.lang.ClassNotFoundException: org.apache.hadoop.mapred.JobConf
添加HADOOP_CLASSPATH
代码语言:javascript复制vi /etc/profile.d/hadoop.sh
内容
代码语言:javascript复制#HADOOP_HOME
export HADOOP_HOME=/data/tools/bigdata/hadoop-2.7.7
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=`hadoop classpath`
配置生效
代码语言:javascript复制source /etc/profile
测试
代码语言:javascript复制echo $HADOOP_CLASSPATH
添加Jar
Flink1.12集成Hive只需要在Flink的lib中添加如下三个jar包
以Hive2.3.9为例,分别为:
flink-sql-connector-hive-2.3.6_2.12-1.14.6.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.6_2.12/1.14.6/
flink-connector-hive_2.12-1.12.7.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.12.7/
hive-exec-2.3.9.jar
Hive安装路径下的lib文件夹
Flink 支持以下 Hive 版本。
1.0.x(1.0.0、1.0.1),1.1.x(1.1.0、1.1.1),1.2.x(1.2.0、1.2.1、1.2.2)
2.0.x(2.0.0、2.0.1),2.1.x(2.1.0、2.1.1),2.2.0,2.3.x(2.3.0、2.3.1、2.3.2、2.3.4、2.3.5、2.3.6)
3.1.x(3.1.0、3.1.1、3.1.2)
配置
Hive下配置
hive-site.xml
代码语言:javascript复制<property>
<name>hive.metastore.uris</name>
<value>thrift://192.168.7.101:9083</value>
</property>
Flink下配置
sql-client-defaults.yaml
该文件时Flink SQL Cli启动时使用的配置文件,该文件位于Flink安装目录的conf/文件夹下,具体的配置如下,主要是配置catalog:
代码语言:javascript复制catalogs: [] # empty list
# A typical catalog definition looks like:
# - name: myhive
# type: hive
# hive-conf-dir: /opt/hive_conf/
# default-database: ...
修改为
代码语言:javascript复制catalogs:
- name: myhive
type: hive
hive-conf-dir: /data/tools/bigdata/apache-hive-2.3.9-bin/conf
default-database: default
开启元数据服务元数据服务
Hive 需要开启元数据服务
代码语言:javascript复制nohup hive --service metastore >/dev/null 2>&1 &
Hive中创建表
代码语言:javascript复制create table t_user(id int,name string,password string);
INSERT INTO t_user VALUES (1,'Zhang San', '123456');
select * from t_user;
退出
代码语言:javascript复制exit;
Flink中操作Hive中的表
首先启动FlinkSQL Cli,命令如下:
代码语言:javascript复制$FLINK_HOME/bin/sql-client.sh embedded
接下来,我们可以查看注册的catalog
代码语言:javascript复制show catalogs;
结果
default_catalog myhive
使用注册的myhive catalog
代码语言:javascript复制use catalog myhive;
FlinkSQL操作Hive中的表,比如查询,写入数据。
代码语言:javascript复制show tables;
select * from t_user;
退出
代码语言:javascript复制exit;
运行报错
java.lang.RuntimeException: The Yarn application application_1667981758965_0021 doesn’t run anymore
修改yarn-site.xml配置文件,原因是可能内存超过虚拟内存的限制,所以需要对yarn进行虚拟内存限制修正,将如下两个配置改为false
代码语言:javascript复制<property>
<!--pmem指的是默认检查物理内存,容器使用的物理内存不能超过我们限定的内存大小,因为我们上面设置了所有容器能够使用的最大内存数量,超出这个内存限制,任务就会被kill掉-->
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<!--vmem指的是默认检查虚拟内存,容器使用的虚拟内存不能超过我们设置的虚拟内存大小-->
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
Flink SQL
Show
代码语言:javascript复制-- 列出catalog
SHOW CATALOGS;
-- 列出数据库
SHOW DATABASES;
--列出表
SHOW TABLES;
--列出函数
SHOW FUNCTIONS;
-- 列出所有激活的 module
SHOW MODULES;
create
CREATE 语句用于向当前或指定的 Catalog 中注册表、视图或函数。注册后的表、视图和函数可以在 SQL 查询中使用。
代码语言:javascript复制CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [( <like_options> )] ]
-- 例如
CREATE TABLE Orders_with_watermark (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'latest-offset'
);
drop
DROP 语句可用于删除指定的 catalog,也可用于从当前或指定的 Catalog 中删除一个已经注册的表、视图或函数。
代码语言:javascript复制--删除表
DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
--删除数据库
DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
--删除视图
DROP [TEMPORARY] VIEW [IF EXISTS] [catalog_name.][db_name.]view_name
--删除函数
DROP [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.]function_name;
alter
ALTER 语句用于修改一个已经在 Catalog 中注册的表、视图或函数定义。
代码语言:javascript复制--修改表名
ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name
--设置或修改表属性
ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)
--修改视图名
ALTER VIEW [catalog_name.][db_name.]view_name RENAME TO new_view_name
--在数据库中设置一个或多个属性。若个别属性已经在数据库中设定,将会使用新值覆盖旧值。
ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)
insert
INSERT 语句用来向表中添加行(INTO是追加,OVERWRITE是覆盖)
代码语言:javascript复制-- 1. 插入别的表的数据
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] select_statement
-- 2. 将值插入表中
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES [values_row , values_row ...]
-- 追加行到该静态分区中 (date='2019-8-30', country='China')
INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China')
SELECT user, cnt FROM page_view_source;
-- 追加行到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30';country 是动态分区,其值由每一行动态决定
INSERT INTO country_page_view PARTITION (date='2019-8-30')
SELECT user, cnt, country FROM page_view_source;
-- 覆盖行到静态分区 (date='2019-8-30', country='China')
INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30', country='China')
SELECT user, cnt FROM page_view_source;
-- 覆盖行到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30';country 是动态分区,其值由每一行动态决定
INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30')
SELECT user, cnt, country FROM page_view_source;