HiveCatalog 介绍与使用

2022-07-20 20:40:22 浏览数 (1)

作者:苏文鹏,腾讯 CSIG 工程师

一、背景

Apache Hive 已经成为了数据仓库生态系统中的核心。它不仅仅是一个用于大数据分析和 ETL 场景的 SQL 引擎,同样它也是一个数据管理平台,可用于发现、定义和演化数据。Flink 与 Hive 的集成包含两个层面:

  • 一是利用了 Hive 的 Metastore 作为持久化的 Catalog,用户可通过 HiveCatalog 将不同会话中的 Flink 元数据存储到 Hive Metastore 中。例如,用户可以使用 HiveCatalog 将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。
  • 二是利用 Flink 来读写 Hive 的表。

HiveCatalog 的设计提供了与 Hive 良好的兼容性,用户可以"开箱即用"的访问其已有的 Hive 数仓。您不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。

二、前置准备

创建私有网络 VPC

私有网络(VPC)是一块在腾讯云上自定义的逻辑隔离网络空间,在构建 Oceanus 集群、Hive 组件等服务时选择的网络建议选择同一个 VPC,网络才能互通。否则需要使用对等连接、NAT 网关、VPN 等方式打通网络。私有网络创建步骤请参考 帮助文档 [1]。

创建流计算 Oceanus 集群

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

在 Oceanus 控制台的【集群管理】->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。VPC 及子网使用刚刚创建好的网络。创建完后 Oceanus 的集群如下:

创建 EMR 集群

EMR 是云端托管的弹性开源泛 Hadoop 服务,支持 Hive、Kudu、HDFS、Presto、Flink、Druid 等大数据框架,本次示例主要需要使用 Hive、Zookeeper、HDFS、Yarn、Knox 组件。

进入 EMR 控制台 [2],单击左上角【创建集群】进行集群的创建,创建过程中注意选择【产品版本】,不同的版本包含的组件不同,笔者这里选择EMR-V2.2.0版本,另外【集群网络】需选择之前创建好的 VPC 及对应的子网。具体过程可参考 创建 EMR 集群 [3]。

创建 Oceanus SQL 作业

上传依赖

在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传 Hive 有关的配置文件。

代码语言:javascript复制
hdfs-site.xmlhive-site.xmlhivemetastore-site.xmlhiveserver2-site.xml

创建 SQL 作业

流计算 Oceanus 控制台作业管理 > 新建作业 中新建 SQL 作业,选择在新建的集群中新建作业。

创建 HiveCatalog

三、Hive Metastore 的用途

1. 利用 Hive Metastore 作为持久化的 Catalog

创建 Source
代码语言:javascript复制
CREATE TABLE datagen_source_table (  id   INT,    name STRING ,    dt   STRING,  hr   STRING) WITH (   'connector' = 'datagen',   'rows-per-second'='1'  -- 每秒产生的数据条数);

创建 Sink
代码语言:javascript复制
CREATE TABLE IF NOT EXISTS `_hive235`.`tinatest1`.`jdbc_upsert_sink_table` (  id   INT,    name STRING ,    dt   STRING,  hr   STRING) WITH (   -- 指定数据库连接参数   'connector' = 'jdbc',   'url' = 'jdbc:mysql://xx.xx.xx.xx:3306/tina?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- 请替换为您的实际 MySQL 连接参数   'table-name' = 'testhive', -- 需要写入的数据表   'username' = 'root',             -- 数据库访问的用户名(需要提供 INSERT 权限)   'password' = 'xxxxxxxxx',        -- 数据库访问的密码   'sink.buffer-flush.max-rows' = '70000',  -- 批量输出的条数   'sink.buffer-flush.interval' = '1s'    -- 批量输出的间隔);

算子操作
代码语言:javascript复制
insert into `_hive235`.`tinatest1`.`jdbc_upsert_sink_table` select * from datagen_source_table;

结果验证

2. 读写 Hive 表的数据

创建 Hive 实体表
代码语言:javascript复制
CREATE TABLE `record`( `id`   int, `name` string)PARTITIONED BY ( `dt` string, `hr` string)ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'WITH SERDEPROPERTIES ( 'field.delim'=',', 'serialization.format'=',')STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'LOCATION 'cosn://tinametatest-xxxxxxxx/xxxxx/xxxxx'TBLPROPERTIES ( 'hive-version'='2.3.5', 'streaming-source.consume-order'='create-time', 'streaming-source.enable'='true', 'streaming-source.monitor-interval'='10s', 'streaming-source.partition.include'='all',);
CREATE TABLE `record_target`( `id`   int, `name` string, `dt`   string, `hr`   string)ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'WITH SERDEPROPERTIES ( 'field.delim'=',', 'serialization.format'=',')STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'TBLPROPERTIES ( 'hive-version'='2.3.5');

算子操作

代码语言:javascript复制
insert into `_hive235`.`tinatest1`.`record_target` select * from `_hive235`.`tinatest1`.`record`;

结果验证

3. Hive 用做维表

基于 processing time join 最新 Hive 分区中的数据
创建 Hive 实体表
代码语言:javascript复制
CREATE TABLE `record2`( `id`   int , `name` string)PARTITIONED BY ( `dt` string, `hr` string)LOCATION 'cosn://tinametatest-xxxxxxxxx/xxxxx/xxxxx'TBLPROPERTIES ( 'hive-version'='2.3.5', 'streaming-source.consume-order'='partition-name', 'streaming-source.enable'='true', 'streaming-source.monitor-interval'='10s', 'streaming-source.partition.include'='latest');

创建 Source
代码语言:javascript复制
CREATE TABLE `mysql_cdc_source_table` ( `id`   INT, `name` STRING,proctime as proctime(), PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH ( 'connector' = 'mysql-cdc',      -- 固定值 'mysql-cdc' 'hostname' = 'xx.xx.xx.xx',     -- 数据库的 IP 'port' = '3306',                -- 数据库的访问端口 'username' = 'root',            -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限) 'password' = 'xxxxxxxx',        -- 数据库访问的密码 'database-name' = 'tina',       -- 需要同步的数据库 'table-name' = 'my_table' ,     -- 需要同步的数据表名 'server-id'='7400-7412');

创建 Sink
代码语言:javascript复制
CREATE TABLE `jdbc_sink_table` (    `id`   INT PRIMARY key,    `name` STRING) WITH (    -- 指定数据库连接参数    'connector' = 'jdbc',    'url' = 'jdbc:mysql://xx.xx.xx.xx:3306/tina?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',    -- 请替换为您的实际 MySQL 连接参数    'table-name' = 'hivecdc',         -- 需要写入的数据表    'username' = 'root',              -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)    'password' = 'xxxxxxxx',               -- 数据库访问的密码    'sink.buffer-flush.max-rows' = '200',  -- 批量输出的条数    'sink.buffer-flush.interval' = '2s'    -- 批量输出的间隔);

算子操作
代码语言:javascript复制
INSERT INTO jdbc_sink_table SELECT o.id,o.name FROM mysql_cdc_source_table AS o JOIN `_hive235`.`tinatest1`.`record2` FOR SYSTEM_TIME AS OF o.proctime AS dimON o.id = dim.id;

基于 processing time join 最新 Hive 表中的数据
创建 Hive 实体表
代码语言:javascript复制
CREATE TABLE `record_batch`(  `id`   int ,  `name` string)PARTITIONED BY (  `dt` string,  `hr` string)LOCATION  'cosn://tinametatest-xxxxxxxx/xxxxx/xxxxx'TBLPROPERTIES (  'hive-version'='2.3.5',  'lookup.join.cache.ttl'='10s');

创建 Source
代码语言:javascript复制
CREATE TABLE `mysql_cdc_source_table` (  `id`    INT,  `name`  STRING,  proctime as proctime(),  PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH (  'connector' = 'mysql-cdc',    -- 固定值 'mysql-cdc'  'hostname' = 'xx.xx.xx.xx',   -- 数据库的 IP  'port' = '3306',              -- 数据库的访问端口  'username' = 'root',          -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)  'password' = 'xxxxxxxx',      -- 数据库访问的密码  'database-name' = 'tina',     -- 需要同步的数据库  'table-name' = 'my_table' ,   -- 需要同步的数据表名  'server-id'='7400-7412');

创建 Sink
代码语言:javascript复制
CREATE TABLE `jdbc_sink_table` (    `id`   INT PRIMARY key,    `name` STRING) WITH (    -- 指定数据库连接参数    'connector' = 'jdbc',    'url' = 'jdbc:mysql://xx.xx.xx.xx:3306/tina?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- 请替换为您的实际 MySQL 连接参数    'table-name' = 'hivecdc',      -- 需要写入的数据表    'username' = 'root',           -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)    'password' = 'xxxxxxxx',       -- 数据库访问的密码    'sink.buffer-flush.max-rows' = '200',  -- 批量输出的条数    'sink.buffer-flush.interval' = '2s'    -- 批量输出的间隔);

算子操作
代码语言:javascript复制
insert into jdbc_sink_table SELECT o.id,o.name FROM mysql_cdc_source_table AS o JOIN `_hive235`.`tinatest1`.`record_batch` FOR SYSTEM_TIME AS OF o.proctime AS dimON o.id = dim.id;

四、注意事项

  • 配置文件 hive-site.xml 文件中需要配置 Metastore 的路径;
  • 同一个 SQL 作业中只能使用一个 HiveCatalog;
  • 读取 Hive 数仓中的表时需要在配置表的 Properties 属性;

五、参考链接

[1] VPC 帮助文档:https://cloud.tencent.com/document/product/215/36515

[2] EMR 控制台:https://console.cloud.tencent.com/emr/

[3] 创建 EMR 集群:https://cloud.tencent.com/document/product/589/10981

[4] Flink 官网:https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/overview/

[5] 使用 HiveCatalog:https://cloud.tencent.com/document/product/849/71854

扫码加入 流计算 Oceanus 产品交流群

0 人点赞