Catalog 提供元数据,如数据库、表、分区、视图,以及访问存储在数据库或其他外部系统中的数据所需的函数和信息。
数据处理中最关键的一个方面是管理元数据。它可能是暂时性的元数据,如临时表,或针对表环境注册的 UDFs。或者是永久性的元数据,比如Hive元存储中的元数据。Catalog提供了一个统一的API来管理元数据,并使其可以从表API和SQL查询中访问。
Catalog使用户能够引l用他们数据系统中的现有元数据,并自动将它们映射到Flink的相应元数据。
例如,Flink可以将JDBC表自动映射到Flink表,用户不必在Flink中手动重写DDL。Catalog大大简化了用户现有系统开始使用Flink所需的步骤,并大大增强了用户体验。
此外,还可以自己开发自定义的catalog。
目前常用的catalog有
GenericInMemoryCatalog # 内存模式,重启丢失
JdbcCatalog # 目前支持pg和mysql这2种类型数据库
HiveCatalog # 作为纯 Flink 元数据的持久存储,以及作为读取和写入现有 Hive 元数据的接口
此外,用户还可以自行开发自定义的catalog
创建hive类型的catalog的SQL写法:
代码语言:txt复制 // the catalog should have been registered via yaml file
Flink SQL> CREATE DATABASE mydb WITH (...);
Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...);
Flink SQL> SHOW TABLES;
mytable
JDBC catalogs示例
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/jdbc/#jdbc-catalog
代码语言:txt复制 CREATE CATALOG my_catalog WITH(
'type' = 'jdbc',
'default-database' = 'flink_catalog',
'username' = 'dts',
'password' = 'Abcd@1234',
'base-url' = 'jdbc:mysql://127.0.0.1:3306'
);
说明
1、catalog必须要的参数:
代码语言:txt复制 name: required, name of the catalog.
default-database: required, default database to connect to.
username: required, username of Postgres/MySQL account.
password: required, password of the account.
base-url: required (should not contain the database name)
for Postgres Catalog this should be "jdbc:postgresql://<ip>:<port>"
for MySQL Catalog this should be "jdbc:mysql://<ip>:<port>"
2、pg类型的catalog,稍微有些不一样,因为pg里面有schema的概念,具体可以参考
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/jdbc/#jdbc-catalog-for-postgresql
3、mysql类型的catalog,具体可以参考
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/jdbc/#jdbc-catalog-for-mysql
代码语言:txt复制
列出当前的catalogs:
Flink SQL> show catalogs;
-----------------
| catalog name |
-----------------
| default_catalog |
-----------------
1 row in set
列出当前可用的数据库:
Flink SQL> show catalogs;
-----------------
| catalog name |
-----------------
| default_catalog |
| my_catalog |
-----------------
2 rows in set
列出当前可用的表:
Flink SQL> show tables;
------------
| table name |
------------
| t1 |
| t2 |
| t_total |
------------
3 rows in set
切换catalog
Flink SQL> USE CATALOG my_catalog;
查询该catalog下面的表:
Flink SQL> show tables;
------------
| table name |
------------
| t1 |
------------
1 row in set
Flink SQL> select * from t1;
catatalog的优势
例如我们在远程的mysql的flink_catalog库里里面已经创建好了3张表: t1 t2 t_total ,需要用flink进行洗数据操作。
原先的方法是:进到flink sql client中,先create table定义这3张表,然后执行insert select操作。
而用了catalog后,我们步骤可以简化为如下:
代码语言:txt复制
0、进入flink sql client 命令行
1、创建包含待处理的表的catalog
CREATE CATALOG my_catalog WITH(
'type' = 'jdbc',
'default-database' = 'flink_catalog',
'username' = 'dts',
'password' = 'Abcd@1234',
'base-url' = 'jdbc:mysql://127.0.0.1:3306'
);
2、切到新建的catalog中
USE CATALOG my_catalog;
3、执行insert select命令
SET 'pipeline.name' = 'mysql-test';
INSERT INTO t_total select t1.id,t2.product from t1 inner join t2 on t1.id=t2.id;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 09747e9627193060dce79a69aba816e3
4、在 flink web ui 上,也可以看到相关的job执行情况
官方文档:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/