手把手构建基于 GBase8s 的 Flink connector

2021-08-12 11:51:21 浏览数 (1)

简介

本篇文章,首先会向大家阐述什么是 Flink connector 和 CDC , 然后会通过手把手的方式和大家一起构建一个简单的GBase8s的Flink connector,并完成实践项目,即通过Mysql CDC实时通过connector同步数据到GBase8s中。

什么是 Flink connector

Flink内置了一些基本数据源和接收器,这些数据源和接收器始终可用。该预定义的数据源包括文件、Mysql、RabbitMq、Kafka、ES等,同时也支持数据输出到文件、Mysql、RabbitMq、Kafka、ES等。

简单的说:flink连接器就是将某些数据源加载与数据输出做了封装(连接器),我们只要引入对应的连接器依赖,即可快速的完成对数据源的加载以及数据的输出。

什么是CDC(Change Data Capture)

首先什么是CDC ?它是Change Data Capture的缩写,即变更数据捕捉的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等操作。

其主要的应用场景:

  • 异构数据库之间的数据同步或备份 / 建立数据分析计算平台
  • 微服务之间共享数据状态
  • 更新缓存 / CQRS 的 Query 视图更新

CDC 它是一个比较广义的概念,只要能捕获变更的数据,我们都可以称为 CDC 。业界主要有基于查询的 CDC 和基于日志的 CDC ,可以从下面表格对比他们功能和差异点。

基于查询的 CDC

基于日志的 CDC

概念

每次捕获变更发起 Select 查询进行全表扫描,过滤出查询之间变更的数据

读取数据存储系统的 log ,例如 MySQL 里面的 binlog持续监控

开源产品

Sqoop, Kafka JDBC Source

Canal, Maxwell, Debezium

执行模式

Batch

Streaming

捕获所有数据的变化

低延迟,不增加数据库负载

不侵入业务(LastUpdated字段)

捕获删除事件和旧记录的状态

捕获旧记录的状态

flink-connector-gbasedbt

我们其实是可以自己手写Sink将CDC的数据直接汇入我们的目标数据库的。这样是不是不够优雅?我们是不是可以通过Flink SQL的方式将数据汇入到GBase8s呢?答案是肯定的,接下来我们就来实现一个简单的GBase8s的Flink connector

  1. 构建 行转换器(RowConverter)
  2. 构建 方言(Dialect)
  3. 注册动态表工厂(DynamicTableFactory),以及相关Sink程序

经过上面三步,就可以实现一个简单的connector了。接下来我们就来看,如何实现:

构建 行转换器(RowConverter)

代码语言:javascript复制
package wang.datahub.converter;

import org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter;
import org.apache.flink.table.types.logical.RowType;

/**
 * @author lijiaqi
 */
public class GBasedbtRowConverter extends AbstractJdbcRowConverter {

    public GBasedbtRowConverter(RowType rowType) {
        super(rowType);
    }

    private static final long serialVersionUID = 1L;

    @Override
    public String converterName() {
        return "gbasedbt";
    }

}

构建 方言(Dialect)

代码语言:javascript复制
package wang.datahub.dialect;

import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.RowType;
import wang.datahub.converter.GBasedbtRowConverter;

import java.util.Optional;

/**
 *
 * @author lijiaqi
 */
public class GBasedbtDialect implements JdbcDialect {

    private static final long serialVersionUID = 1L;

    @Override
    public String dialectName() {
        return "gbasedbt";
    }

    @Override
    public boolean canHandle(String url) {
        return url.startsWith("jdbc:gbasedbt-sqli:");
    }

    @Override
    public JdbcRowConverter getRowConverter(RowType rowType) {
        return new GBasedbtRowConverter(rowType);
    }

    @Override
    public String getLimitClause(long l) {
        return null;
    }

    @Override
    public void validate(TableSchema schema) throws ValidationException {
        JdbcDialect.super.validate(schema);
    }

    @Override
    public Optional<String> defaultDriverName() {
        return Optional.of("com.gbasedbt.jdbc.Driver");
    }

    @Override
    public String quoteIdentifier(String identifier) {
        return "'"   identifier   "'";
    }

    @Override
    public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
        return JdbcDialect.super.getUpsertStatement(tableName, fieldNames, uniqueKeyFields);
    }

    @Override
    public String getRowExistsStatement(String tableName, String[] conditionFields) {
        return JdbcDialect.super.getRowExistsStatement(tableName, conditionFields);
    }

    @Override
    public String getInsertIntoStatement(String tableName, String[] fieldNames) {
        return JdbcDialect.super.getInsertIntoStatement(tableName, fieldNames);
    }

    @Override
    public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
        return JdbcDialect.super.getUpdateStatement(tableName, fieldNames, conditionFields);
    }

    @Override
    public String getDeleteStatement(String tableName, String[] conditionFields) {
        return JdbcDialect.super.getDeleteStatement(tableName, conditionFields);
    }

    @Override
    public String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields) {
        return JdbcDialect.super.getSelectFromStatement(tableName, selectFields, conditionFields);
    }

}

注册动态表工厂(DynamicTableFactory),以及相关Sink程序

首先创建 GBasedbtSinkFunction 用于接受RowData数据输入,并将其Sink到配置的数据库中

代码语言:javascript复制
package wang.datahub.table;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;

/**
 * @author lijiaqi
 */
public class GBasedbtSinkFunction extends RichSinkFunction<RowData> {

    private static final long serialVersionUID = 1L;

    private final JdbcOptions jdbcOptions;
    private final SerializationSchema<RowData> serializationSchema = null;
    private DataType dateType;

    private Connection conn;
    private Statement stmt;

    public GBasedbtSinkFunction(JdbcOptions jdbcOptions) {
        this.jdbcOptions = jdbcOptions;
    }

    public GBasedbtSinkFunction(JdbcOptions jdbcOptions, DataType dataType) {
        this.jdbcOptions = jdbcOptions;
        this.dateType = dataType;
    }

    @Override
    public void open(Configuration parameters) {
        System.out.println("open connection !!!!!");
        try {
            if (null == conn) {
                Class.forName(jdbcOptions.getDriverName());
                conn = DriverManager.getConnection(jdbcOptions.getDbURL(),jdbcOptions.getUsername().orElse(null),jdbcOptions.getPassword().orElse(null));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void invoke(RowData value, Context context) throws Exception {

        try {
            stmt = conn.createStatement();
            String sql = "insert into "   this.jdbcOptions.getTableName()   " values ( ";
            for (int i = 0; i < value.getArity(); i  ) {
                //这里需要根据事情类型进行匹配
                if(dateType.getChildren().get(i).getConversionClass().equals(Integer.class)){
                    sql  =  value.getInt(i)  " ,";
                }else {
                    sql  = "'" value.getString(i)   "' ,";
                }
            }
            sql = sql.substring(0, sql.length() - 1);
            sql  = " ); ";

            System.out.println("sql ==>"   sql);

            stmt.execute(sql);
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void close() throws Exception {
        if (stmt != null) {
            stmt.close();
        }
        if (conn != null) {
            conn.close();
        }
    }

}

构建 GBasedbtDynamicTableSink

代码语言:javascript复制
package wang.datahub.table;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

/**
 * @author lijiaqi
 */
public class GBasedbtDynamicTableSink implements DynamicTableSink {

    private final JdbcOptions jdbcOptions;
    private final EncodingFormat<SerializationSchema<RowData>> encodingFormat;
    private final DataType dataType;

    public GBasedbtDynamicTableSink(JdbcOptions jdbcOptions, EncodingFormat<SerializationSchema<RowData>> encodingFormat, DataType dataType) {
        this.jdbcOptions = jdbcOptions;
        this.encodingFormat = encodingFormat;
        this.dataType = dataType;
    }

    @Override
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        return requestedMode;
    }

    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        System.out.println("SinkRuntimeProvider");
        System.out.println(dataType);
        GBasedbtSinkFunction gbasedbtSinkFunction = new GBasedbtSinkFunction(jdbcOptions,dataType);
        return SinkFunctionProvider.of(gbasedbtSinkFunction);
    }

    @Override
    public DynamicTableSink copy() {
        return new GBasedbtDynamicTableSink(jdbcOptions, encodingFormat, dataType);
    }

    @Override
    public String asSummaryString() {
        return "gbasedbt Table Sink";
    }

}

构建GBasedbtDynamicTableFactory

代码语言:javascript复制
package wang.datahub.table;


import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableSchemaUtils;
import wang.datahub.dialect.GBasedbtDialect;

import java.util.HashSet;
import java.util.Set;

/**
 * @author lijiaqi
 */
public class GBasedbtDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {

    public static final String IDENTIFIER = "gbasedbt";

    private static final String DRIVER_NAME = "com.gbasedbt.jdbc.Driver";

    public static final ConfigOption<String> URL = ConfigOptions
            .key("url")
            .stringType()
            .noDefaultValue()
            .withDescription("the jdbc database url.");

    public static final ConfigOption<String> DRIVER = ConfigOptions
            .key("driver")
            .stringType()
            .defaultValue(DRIVER_NAME)
            .withDescription("the jdbc driver.");

    public static final ConfigOption<String> TABLE_NAME = ConfigOptions
            .key("table-name")
            .stringType()
            .noDefaultValue()
            .withDescription("the jdbc table name.");

    public static final ConfigOption<String> USERNAME = ConfigOptions
            .key("username")
            .stringType()
            .noDefaultValue()
            .withDescription("the jdbc user name.");

    public static final ConfigOption<String> PASSWORD = ConfigOptions
            .key("password")
            .stringType()
            .noDefaultValue()
            .withDescription("the jdbc password.");

//    public static final ConfigOption<String> FORMAT = ConfigOptions
//            .key("format")
//            .stringType()
//            .noDefaultValue()
//            .withDescription("the format.");

    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        Set<ConfigOption<?>> requiredOptions = new HashSet<>();
        requiredOptions.add(URL);
        requiredOptions.add(TABLE_NAME);
        requiredOptions.add(USERNAME);
        requiredOptions.add(PASSWORD);
//        requiredOptions.add(FORMAT);
        return requiredOptions;
    }

    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        return new HashSet<>();
    }

    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        
        final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

        final ReadableConfig config = helper.getOptions();
        
        helper.validate();

        JdbcOptions jdbcOptions = getJdbcOptions(config);
        
        TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());

        return new GBasedbtDynamicTableSource(jdbcOptions, physicalSchema);

    }

    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        
        final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

//        final EncodingFormat<SerializationSchema<RowData>> encodingFormat = helper.discoverEncodingFormat(
//                SerializationFormatFactory.class,
//                FactoryUtil.FORMAT);

        final ReadableConfig config = helper.getOptions();

        helper.validate();

        JdbcOptions jdbcOptions = getJdbcOptions(config);

        final DataType dataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();

        return new GBasedbtDynamicTableSink(jdbcOptions, null, dataType);
    }

    private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
        final String url = readableConfig.get(URL);
        final JdbcOptions.Builder builder = JdbcOptions.builder()
                .setDriverName(DRIVER_NAME)
                .setDBUrl(url)
                .setTableName(readableConfig.get(TABLE_NAME))
                .setDialect(new GBasedbtDialect());

        readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
        readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
        return builder.build();
    }

}

接下来通过SPI注册动态表:创建文件resourcesMETA-INFservicesorg.apache.flink.table.factories.Factory内容注册为wang.datahub.table.GBasedbtDynamicTableFactory

至此,我们的Flink connector 就构建完成,接下来,我们要使用其,来完成一个真正的项目。

实战项目

下面是项目的整体架构图,我们通过flink cdc 从mysql获取变更数据,然后通过 flink sql 将数据 sink 到 gbase8s 里

接下来,我们看一下如何通过Flink SQL实现CDC ,只需3条SQL语句即可。

创建数据源表

代码语言:javascript复制
        // 数据源表
        String sourceDDL =
                "CREATE TABLE mysql_binlog (n"  
                        " id INT NOT NULL,n"  
                        " name STRING,n"  
                        " description STRINGn"  
                        ") WITH (n"  
                        " 'connector' = 'mysql-cdc',n"  
                        " 'hostname' = 'localhost',n"  
                        " 'port' = '3306',n"  
                        " 'username' = 'flinkcdc',n"  
                        " 'password' = '123456',n"  
                        " 'database-name' = 'test',n"  
                        " 'table-name' = 'test_cdc'n"  
                        ")";

创建输出表,输出到GBase8s ,这里 connector设置成gbasedbt

代码语言:javascript复制
        String url = "jdbc:gbasedbt-sqli://172.31.95.133:9088/t1:GBASEDBTSERVER=myserver;NEWCODESET=UTF8,zh_cn.UTF8,57372;DATABASE=mydb;DB_LOCALE=en_US.819;";
        String userName = "gbasedbt";
        String password = "123456";
        String gbasedbtSinkTable = "ta";
        // 输出目标表
        String sinkDDL =
                "CREATE TABLE test_cdc_sink (n"  
                        " id INT NOT NULL,n"  
                        " name STRING,n"  
                        " description STRING,n"  
                        " PRIMARY KEY (id) NOT ENFORCED n "  
                        ") WITH (n"  
                        " 'connector' = 'gbasedbt',n"  
//                        " 'driver' = 'com.gbasedbt.jdbc.Driver',n"  
                        " 'url' = '"   url   "',n"  
                        " 'username' = '"   userName   "',n"  
                        " 'password' = '"   password   "',n"  
                        " 'table-name' = '"   gbasedbtSinkTable   "' n"  
                        ")";

这里我们直接将数据汇入

代码语言:javascript复制
        String transformSQL =
                "insert into test_cdc_sink select * from mysql_binlog";

完整参考代码

代码语言:javascript复制
package wang.datahub.cdc;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class MysqlToGBasedbtlMain {
    public static void main(String[] args) throws Exception {
        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);



        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);


        // 数据源表
        String sourceDDL =
                "CREATE TABLE mysql_binlog (n"  
                        " id INT NOT NULL,n"  
                        " name STRING,n"  
                        " description STRINGn"  
                        ") WITH (n"  
                        " 'connector' = 'mysql-cdc',n"  
                        " 'hostname' = 'localhost',n"  
                        " 'port' = '3306',n"  
                        " 'username' = 'flinkcdc',n"  
                        " 'password' = '123456',n"  
                        " 'database-name' = 'test',n"  
                        " 'table-name' = 'test_cdc'n"  
                        ")";


        String url = "jdbc:gbasedbt-sqli://172.31.95.133:9088/t1:GBASEDBTSERVER=myserver;NEWCODESET=UTF8,zh_cn.UTF8,57372;DATABASE=mydb;DB_LOCALE=en_US.819;";
        String userName = "gbasedbt";
        String password = "123456";
        String gbasedbtSinkTable = "ta";
        // 输出目标表
        String sinkDDL =
                "CREATE TABLE test_cdc_sink (n"  
                        " id INT NOT NULL,n"  
                        " name STRING,n"  
                        " description STRING,n"  
                        " PRIMARY KEY (id) NOT ENFORCED n "  
                        ") WITH (n"  
                        " 'connector' = 'gbasedbt',n"  
//                        " 'driver' = 'com.gbasedbt.jdbc.Driver',n"  
                        " 'url' = '"   url   "',n"  
                        " 'username' = '"   userName   "',n"  
                        " 'password' = '"   password   "',n"  
                        " 'table-name' = '"   gbasedbtSinkTable   "' n"  
                        ")";

        String transformSQL =
                "insert into test_cdc_sink select * from mysql_binlog";

        tableEnv.executeSql(sourceDDL);
        tableEnv.executeSql(sinkDDL);
        TableResult result = tableEnv.executeSql(transformSQL);

        result.print();
        env.execute("sync-flink-cdc");
    }

}

运行结果

查看数据,已经录入进数据库里

参考链接:

https://blog.csdn.net/zhangjun5965/article/details/107605396

https://cloud.tencent.com/developer/article/1745233?from=article.detail.1747773

https://segmentfault.com/a/1190000039662261

https://www.cnblogs.com/weijiqian/p/13994870.html

https://blog.csdn.net/dafei1288/article/details/118192917

0 人点赞