Oceanus 开发自定义SQL Connector指南

2021-08-20 14:48:57 浏览数 (2)

Oceanus兼容原生的Flink 框架,基于Flink开发的Connector能够实现100%兼容。

原文:https://blog.csdn.net/leiline/article/details/106925864

Flink 提供了丰富的connector组件帮助用户连接外部系统。但是很多时候原生的connector并不能够完全满足用户的需求,因此需要自定义开发connector组件。本文介绍如何进行Flink1.10 SQL CONNECTOR的开发工作。

通过SPI去加载不同的factory,实现了Connector的统一。

SPI机制

代码语言:txt复制
SPI,全称为Service Provider Interface,是Java提供的一套用于第三方实现或拓展的API。

基于工厂模式的任务提交

代码语言:txt复制
public interface PipelineExecutor{
		/**
		 * 执行任务
		 */
		CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration)throws Exception;
}

PipelineExecutor 提供多种实现方式:

代码语言:txt复制
RemoteExecutor(standalone)

LocalExecutor (local)

YanrJobClusterExecutor (per-job)

YarnSessionClusterExecutor (yarn-session)

支持用户在多种场景下提交flink任务。

Flink 提供PipelineExecutorServiceLoader接口,其中实现类DefaultExecutorServiceLoader支持通过名称去加载类。它的原理就是通过SPI机制去查找flink所提供的所有的工厂类,找到合适的类,进行加载。

基于工厂模式的SQL CONNECTOR设计

注:Flink 1.10提供SPI方式支持与connector进行交互,Flink会去扫描包中resources/META-INF/services目录下的org.apache.flink.table.factories.TableFactory,获取所有Factory类,根据sql中with传进来的参数(k=v)进行匹配,找到匹配到的那个Factory类,如果没有找到的话,则会报错。自定义connector没有此文件的话需创建,并将自定connector类路径写入此文件。

代码语言:txt复制
public class SqlHdfsSinkFactory implements StreamTableSinkFactory<Row> {

    @Override
    public List<String> supportedProperties() {
        List<String> properties = new ArrayList<>();
        properties.add(SqlConstants.HIVE_CONNECTOR_PROPERTY_DB_NAME);
        properties.add(SqlConstants.HIVE_CONNECTOR_PROPERTY_TABLE_NAME);
        properties.add(SqlConstants.HIVE_CONNECTOR_PROPERTY_CONFIG);

        // schema
        properties.add(SCHEMA   ".#."   SCHEMA_DATA_TYPE);
        properties.add(SCHEMA   ".#."   SCHEMA_NAME);

        return properties;
    }

    @Override
    public Map<String, String> requiredContext() {
        Map<String, String> context = new HashMap<>();
        context.put(SqlConstants.HIVE_CONNECTOR_PROPERTY_CONNECTOR_TYPE, SqlConstants.CONNECTOR_TYPE_VALUE_HIVE); // hive
        context.put(SqlConstants.HIVE_CONNECTOR_PROPERTY_PACKAGE_VERSION, hiveVersion()); // version
        context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility

        return context;
    }

    @Override
    public TableSink createTableSink(Map properties) {
        return new SqlHdfsSink(properties
                .get(SqlConstants.HIVE_CONNECTOR_PROPERTY_CONFIG).toString());
    }

    @Override
    public StreamTableSink createStreamTableSink(Map properties) {

        return new SqlHdfsSink(properties
                .get(SqlConstants.HIVE_CONNECTOR_PROPERTY_CONFIG).toString());

    }

    private String hiveVersion() {
        return SqlConstants.CONNECTOR_HIVE_VERSION_VALUE_211;
    }
}

其中对几个方法进行了重写:

supportedProperties() 记录sink参数对应的k-v值,这里参数值是固定的

requiredContext() 记录sink参数对应的k-v值,这里参数值是支持动态的

createTableSink() 创建sink对象的入口, 其中properties参数表示sql with语句中的k-v。

在createTableSink()方法中,实例化了SqlHdfsSink类,这个类实现了AppendStreamTableSink接口,是真正将数据写入到HDFS中的核心类。

public class SqlHdfsSink implements AppendStreamTableSink<Row> { }

编译打包

代码语言:txt复制
mvn clean package -DskipTests -Dtest.skip=true -Drat.skip=true -Dcheckstyle.skip=true -Dskip.npm=true -Denforcer.skip=true

使用自定义的SQL Sink

Flink 1.10使用classLoader方式加载sql sink,对于用户来讲使用起来比较简单,定义好StreamTableEnvironment stEnv = StreamTableEnvironment.create(env, settings) 即可将SQL语句传到stEnv中,自动加载sink。

代码语言:txt复制
stEnv.sqlUpdate("CREATE TABLE gis_log_sink (n"  
                "   deptSiteId STRING,n"  
                "   latitude STRING,n"  
                "   passZoneCode STRINGn"  
                "   ) n"  
                "with (n"  
                "    'version' = '2.1.1',n"  
                "    'dbName' = 'dm_drcs',n"  
                "    'connectorType' = 'sink',n"  
                "    'connectorName' = 'HIVE',n"  
                "    'columnDelimiter' = '\u0001',n"  
                "    'hadoopClusterName' = 'bdp_sit',n"  
                "    'timeoutRollInterval' = '1000',n"  
                "    'configuration' = '{"properties":{"batchSize":10485760,"columnDelimiter":"\\u0001","dateTimeBucketer":"yyyyMMdd","dbNamePrefix":"","dbName":"dm_lxb001","tableName":"17757_tjn_gis_table110","timeoutRollInterval":8,"tmpPath":"hdfs://10.202.77.200/hive/tmp/dm/dm_lxb001/tjn_gis_table111","finalPath":"hdfs://10.202.77.200/hive/warehouse/dm/dm_lxb001/tjn_gis_table110","tableColumns":[{"name":"deptSite","type":"string"},{"name":"gpsTime","type":"string"},{"name":"latitude","type":"string"}],"hdfsConfigInfo":{"namenodeInfoList":[{"nodeName":"namenode1","nodeAddr":"10.202.77.200","nodePort":"8020"},{"nodeName":"namenode2","nodeAddr":"10.202.77.201","nodePort":"8020"}],"nameservices":"test-cluster"},"inputFormat":"text","insertFieldNames":"deptsite, gpstime, latitude"}}'n"  
                ")");

总结

Flink 1.10基于SPI机制加载CONNECTOR ,统一了Flink 与外部系统的交互,也降低了用户实现自定义CONNECTOR的门槛。

开发自定义SQL Connector,本质上是在原有的Sink类前面封装一个factory类,Flink会去读取这个factory 类,并将其加载到runtime中执行。

开源Connector参考

apache-flink connectors参考列表:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/

bahir-flink connectors参考列表:

https://github.com/apache/bahir-flink

ververica/flink connectors参考列表:

https://github.com/ververica/flink-cdc-connectors

图数据库:

neo4j:

https://github.com/albertodelazzari/flink-neo4j

Nebula Graph:

https://github.com/vesoft-inc/nebula-flink-connector

0 人点赞