flink的catalog介绍

2024-07-08 11:26:28 浏览数 (1)

Catalog 提供元数据,如数据库、表、分区、视图,以及访问存储在数据库或其他外部系统中的数据所需的函数和信息。

数据处理中最关键的一个方面是管理元数据。它可能是暂时性的元数据,如临时表,或针对表环境注册的 UDFs。或者是永久性的元数据,比如Hive元存储中的元数据。Catalog提供了一个统一的API来管理元数据,并使其可以从表API和SQL查询中访问。

Catalog使用户能够引l用他们数据系统中的现有元数据,并自动将它们映射到Flink的相应元数据。

例如,Flink可以将JDBC表自动映射到Flink表,用户不必在Flink中手动重写DDL。Catalog大大简化了用户现有系统开始使用Flink所需的步骤,并大大增强了用户体验。

此外,还可以自己开发自定义的catalog。

目前常用的catalog有

GenericInMemoryCatalog # 内存模式,重启丢失

JdbcCatalog # 目前支持pg和mysql这2种类型数据库

HiveCatalog # 作为纯 Flink 元数据的持久存储,以及作为读取和写入现有 Hive 元数据的接口

此外,用户还可以自行开发自定义的catalog

创建hive类型的catalog的SQL写法:

代码语言:txt复制
    // the catalog should have been registered via yaml file
    Flink SQL> CREATE DATABASE mydb WITH (...);

    Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...);

    Flink SQL> SHOW TABLES;
    mytable

JDBC catalogs示例

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/jdbc/#jdbc-catalog

代码语言:txt复制
    CREATE CATALOG my_catalog WITH(
        'type' = 'jdbc',
        'default-database' = 'flink_catalog',
        'username' = 'dts',
        'password' = 'Abcd@1234',
        'base-url' = 'jdbc:mysql://127.0.0.1:3306'
    );

说明

1、catalog必须要的参数:

代码语言:txt复制
        name: required, name of the catalog.
        default-database: required, default database to connect to.
        username: required, username of Postgres/MySQL account.
        password: required, password of the account.
        base-url: required (should not contain the database name)
            for Postgres Catalog this should be "jdbc:postgresql://<ip>:<port>"
            for MySQL Catalog this should be "jdbc:mysql://<ip>:<port>"

2、pg类型的catalog,稍微有些不一样,因为pg里面有schema的概念,具体可以参考

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/jdbc/#jdbc-catalog-for-postgresql

3、mysql类型的catalog,具体可以参考

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/jdbc/#jdbc-catalog-for-mysql

代码语言:txt复制
列出当前的catalogs:
    Flink SQL> show catalogs;
     ----------------- 
    |    catalog name |
     ----------------- 
    | default_catalog |
     ----------------- 
    1 row in set


列出当前可用的数据库:
    Flink SQL> show catalogs;
     ----------------- 
    |    catalog name |
     ----------------- 
    | default_catalog |
    |      my_catalog |
     ----------------- 
    2 rows in set


列出当前可用的表:
    Flink SQL> show tables;
     ------------ 
    | table name |
     ------------ 
    |         t1 |
    |         t2 |
    |    t_total |
     ------------ 
    3 rows in set


切换catalog
    Flink SQL> USE CATALOG my_catalog;


查询该catalog下面的表:

    Flink SQL> show tables;
     ------------ 
    | table name |
     ------------ 
    |         t1 |
     ------------ 
    1 row in set
    
    Flink SQL> select * from t1;

catatalog的优势

例如我们在远程的mysql的flink_catalog库里里面已经创建好了3张表: t1 t2 t_total ,需要用flink进行洗数据操作。

原先的方法是:进到flink sql client中,先create table定义这3张表,然后执行insert select操作。

而用了catalog后,我们步骤可以简化为如下:

代码语言:txt复制

    0、进入flink sql client 命令行

    1、创建包含待处理的表的catalog
    CREATE CATALOG my_catalog WITH(
        'type' = 'jdbc',
        'default-database' = 'flink_catalog',
        'username' = 'dts',
        'password' = 'Abcd@1234',
        'base-url' = 'jdbc:mysql://127.0.0.1:3306'
    );

    2、切到新建的catalog中
    USE CATALOG my_catalog;

    3、执行insert select命令
    SET 'pipeline.name' = 'mysql-test';
    INSERT INTO t_total select t1.id,t2.product from t1 inner join t2 on t1.id=t2.id;

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 09747e9627193060dce79a69aba816e3

    4、在 flink web ui 上,也可以看到相关的job执行情况    

官方文档:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/

0 人点赞