大数据开发之Flink连接Hive

2022-11-22 16:26:35 浏览数 (1)

前言

本文使用环境版本

  • Hive:2.3.9
  • Flink:flink-1.12.7-bin-scala_2.12

使用代码连接到 Hive

Hive 需要开启元数据服务

代码语言:javascript复制
nohup hive --service metastore >/dev/null 2>&1 &

需要将配置了hive.metastore.uris的配置文件复制到项目resources路径下

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>hive.metastore.warehouse.dir</name>
    <value>/user/hive_remote/warehouse</value>
  </property>
  <property>
    <name>hive.metastore.local</name>
    <value>false</value>
  </property>
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://192.168.7.101:9083</value>
  </property>
</configuration>

依赖

代码语言:javascript复制
<properties>
  <maven.compiler.source>8</maven.compiler.source>
  <maven.compiler.target>8</maven.compiler.target>
  <flink.version>1.12.7</flink.version>
  <scala.version>2.12.15</scala.version>
  <hadoop.version>2.7.7</hadoop.version>
  <scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
  <!-- Flink Dependency -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <!-- Hive Dependency -->
  <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
  </dependency>
</dependencies>

调用代码

代码语言:javascript复制
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableEnvironment, TableResult}
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.types.Row
import org.apache.flink.util.CloseableIterator

object HiveTest {
  def main(args: Array[String]): Unit = {
    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build()
    val tableEnv: TableEnvironment = TableEnvironment.create(settings)

    val name:String = "myhive"
    val defaultDataBase:String = "default"
    val hiveConfDir :String = "/data/tools/bigdata/apache-hive-2.3.9-bin/conf"

    val hive = new HiveCatalog(name, defaultDataBase, hiveConfDir )
    tableEnv.registerCatalog("myhive", hive) // 注册Catalog
    tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
    tableEnv.useCatalog("myhive") // 使用注册的Catalog ,不使用的话查不到数据
    tableEnv.useDatabase("mydb") // 设置要查询的数据库
    tableEnv.executeSql("show tables").print()
    
  }
}

获取数据

代码语言:javascript复制
TableResult result;
String SelectTables_sql ="select * from test.testdata";
result = tableEnv.executeSql(SelectTables_sql);
result.print();

Flink SQL Cli集成Hive

环境变量

Apache Hive 是基于 Hadoop 之上构建的, 所以还需要 Hadoop 的依赖,配置好HADOOP_CLASSPATH即可。这一点非常重要,否则在使用FlinkSQL Cli查询Hive中的表时,会报如下错误:

java.lang.ClassNotFoundException: org.apache.hadoop.mapred.JobConf

添加HADOOP_CLASSPATH

代码语言:javascript复制
vi /etc/profile.d/hadoop.sh

内容

代码语言:javascript复制
#HADOOP_HOME
export HADOOP_HOME=/data/tools/bigdata/hadoop-2.7.7
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=`hadoop classpath`

配置生效

代码语言:javascript复制
source /etc/profile

测试

代码语言:javascript复制
echo $HADOOP_CLASSPATH

添加Jar

Flink1.12集成Hive只需要在Flink的lib中添加如下三个jar包

以Hive2.3.9为例,分别为:

flink-sql-connector-hive-2.3.6_2.12-1.14.6.jar

https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.6_2.12/1.14.6/

flink-connector-hive_2.12-1.12.7.jar

https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.12.7/

hive-exec-2.3.9.jar

Hive安装路径下的lib文件夹

Flink 支持以下 Hive 版本。

1.0.x(1.0.0、1.0.1),1.1.x(1.1.0、1.1.1),1.2.x(1.2.0、1.2.1、1.2.2)

2.0.x(2.0.0、2.0.1),2.1.x(2.1.0、2.1.1),2.2.0,2.3.x(2.3.0、2.3.1、2.3.2、2.3.4、2.3.5、2.3.6)

3.1.x(3.1.0、3.1.1、3.1.2)

配置

Hive下配置

hive-site.xml

代码语言:javascript复制
<property>
    <name>hive.metastore.uris</name>
    <value>thrift://192.168.7.101:9083</value>
</property>

Flink下配置

sql-client-defaults.yaml

该文件时Flink SQL Cli启动时使用的配置文件,该文件位于Flink安装目录的conf/文件夹下,具体的配置如下,主要是配置catalog:

代码语言:javascript复制
catalogs: [] # empty list
# A typical catalog definition looks like:
#  - name: myhive
#    type: hive
#    hive-conf-dir: /opt/hive_conf/
#    default-database: ...

修改为

代码语言:javascript复制
catalogs: 
  - name: myhive
    type: hive
    hive-conf-dir: /data/tools/bigdata/apache-hive-2.3.9-bin/conf
    default-database: default

开启元数据服务元数据服务

Hive 需要开启元数据服务

代码语言:javascript复制
nohup hive --service metastore >/dev/null 2>&1 &

Hive中创建表

代码语言:javascript复制
create table t_user(id int,name string,password string);
INSERT INTO t_user VALUES (1,'Zhang San', '123456');
select * from t_user;

退出

代码语言:javascript复制
exit;

Flink中操作Hive中的表

首先启动FlinkSQL Cli,命令如下:

代码语言:javascript复制
$FLINK_HOME/bin/sql-client.sh embedded

接下来,我们可以查看注册的catalog

代码语言:javascript复制
show catalogs;

结果

default_catalog myhive

使用注册的myhive catalog

代码语言:javascript复制
use catalog myhive;

FlinkSQL操作Hive中的表,比如查询,写入数据。

代码语言:javascript复制
show tables;
select * from t_user;

退出

代码语言:javascript复制
exit;

运行报错

java.lang.RuntimeException: The Yarn application application_1667981758965_0021 doesn’t run anymore

修改yarn-site.xml配置文件,原因是可能内存超过虚拟内存的限制,所以需要对yarn进行虚拟内存限制修正,将如下两个配置改为false

代码语言:javascript复制
<property>
  <!--pmem指的是默认检查物理内存,容器使用的物理内存不能超过我们限定的内存大小,因为我们上面设置了所有容器能够使用的最大内存数量,超出这个内存限制,任务就会被kill掉-->
  <name>yarn.nodemanager.pmem-check-enabled</name>
  <value>false</value>
</property>
<property>
  <!--vmem指的是默认检查虚拟内存,容器使用的虚拟内存不能超过我们设置的虚拟内存大小-->
  <name>yarn.nodemanager.vmem-check-enabled</name>
  <value>false</value>
</property>

Flink SQL

Show

代码语言:javascript复制
-- 列出catalog
SHOW CATALOGS;
-- 列出数据库
SHOW DATABASES;
--列出表
SHOW TABLES;
--列出函数
SHOW FUNCTIONS;
-- 列出所有激活的 module
SHOW MODULES;

create

CREATE 语句用于向当前或指定的 Catalog 中注册表、视图或函数。注册后的表、视图和函数可以在 SQL 查询中使用。

代码语言:javascript复制
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] ]


-- 例如
CREATE TABLE Orders_with_watermark (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector' = 'kafka',
    'scan.startup.mode' = 'latest-offset'
);

drop

DROP 语句可用于删除指定的 catalog,也可用于从当前或指定的 Catalog 中删除一个已经注册的表、视图或函数。

代码语言:javascript复制
--删除表
DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
--删除数据库
DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
--删除视图
DROP [TEMPORARY] VIEW  [IF EXISTS] [catalog_name.][db_name.]view_name
--删除函数
DROP [TEMPORARY|TEMPORARY SYSTEM] FUNCTION [IF EXISTS] [catalog_name.][db_name.]function_name;

alter

ALTER 语句用于修改一个已经在 Catalog 中注册的表、视图或函数定义。

代码语言:javascript复制
--修改表名
ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name
--设置或修改表属性
ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)
--修改视图名
ALTER VIEW [catalog_name.][db_name.]view_name RENAME TO new_view_name
--在数据库中设置一个或多个属性。若个别属性已经在数据库中设定,将会使用新值覆盖旧值。
ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)

insert

INSERT 语句用来向表中添加行(INTO是追加,OVERWRITE是覆盖)

代码语言:javascript复制
-- 1. 插入别的表的数据
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] select_statement

-- 2. 将值插入表中 
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES [values_row , values_row ...]



-- 追加行到该静态分区中 (date='2019-8-30', country='China')
INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China')
  SELECT user, cnt FROM page_view_source;

-- 追加行到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30';country 是动态分区,其值由每一行动态决定
INSERT INTO country_page_view PARTITION (date='2019-8-30')
  SELECT user, cnt, country FROM page_view_source;

-- 覆盖行到静态分区 (date='2019-8-30', country='China')
INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30', country='China')
  SELECT user, cnt FROM page_view_source;

-- 覆盖行到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30';country 是动态分区,其值由每一行动态决定
INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30')
  SELECT user, cnt, country FROM page_view_source;

0 人点赞