Oceanus兼容原生的Flink 框架,基于Flink开发的Connector能够实现100%兼容。
原文:https://blog.csdn.net/leiline/article/details/106925864
Flink 提供了丰富的connector组件帮助用户连接外部系统。但是很多时候原生的connector并不能够完全满足用户的需求,因此需要自定义开发connector组件。本文介绍如何进行Flink1.10 SQL CONNECTOR的开发工作。
通过SPI去加载不同的factory,实现了Connector的统一。
SPI机制
代码语言:txt复制SPI,全称为Service Provider Interface,是Java提供的一套用于第三方实现或拓展的API。
基于工厂模式的任务提交
代码语言:txt复制public interface PipelineExecutor{
/**
* 执行任务
*/
CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration)throws Exception;
}
PipelineExecutor 提供多种实现方式:
代码语言:txt复制RemoteExecutor(standalone)
LocalExecutor (local)
YanrJobClusterExecutor (per-job)
YarnSessionClusterExecutor (yarn-session)
支持用户在多种场景下提交flink任务。
Flink 提供PipelineExecutorServiceLoader接口,其中实现类DefaultExecutorServiceLoader支持通过名称去加载类。它的原理就是通过SPI机制去查找flink所提供的所有的工厂类,找到合适的类,进行加载。
基于工厂模式的SQL CONNECTOR设计
注:Flink 1.10提供SPI方式支持与connector进行交互,Flink会去扫描包中resources/META-INF/services目录下的org.apache.flink.table.factories.TableFactory
,获取所有Factory类,根据sql中with传进来的参数(k=v)进行匹配,找到匹配到的那个Factory类,如果没有找到的话,则会报错。自定义connector没有此文件的话需创建,并将自定connector类路径写入此文件。
public class SqlHdfsSinkFactory implements StreamTableSinkFactory<Row> {
@Override
public List<String> supportedProperties() {
List<String> properties = new ArrayList<>();
properties.add(SqlConstants.HIVE_CONNECTOR_PROPERTY_DB_NAME);
properties.add(SqlConstants.HIVE_CONNECTOR_PROPERTY_TABLE_NAME);
properties.add(SqlConstants.HIVE_CONNECTOR_PROPERTY_CONFIG);
// schema
properties.add(SCHEMA ".#." SCHEMA_DATA_TYPE);
properties.add(SCHEMA ".#." SCHEMA_NAME);
return properties;
}
@Override
public Map<String, String> requiredContext() {
Map<String, String> context = new HashMap<>();
context.put(SqlConstants.HIVE_CONNECTOR_PROPERTY_CONNECTOR_TYPE, SqlConstants.CONNECTOR_TYPE_VALUE_HIVE); // hive
context.put(SqlConstants.HIVE_CONNECTOR_PROPERTY_PACKAGE_VERSION, hiveVersion()); // version
context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility
return context;
}
@Override
public TableSink createTableSink(Map properties) {
return new SqlHdfsSink(properties
.get(SqlConstants.HIVE_CONNECTOR_PROPERTY_CONFIG).toString());
}
@Override
public StreamTableSink createStreamTableSink(Map properties) {
return new SqlHdfsSink(properties
.get(SqlConstants.HIVE_CONNECTOR_PROPERTY_CONFIG).toString());
}
private String hiveVersion() {
return SqlConstants.CONNECTOR_HIVE_VERSION_VALUE_211;
}
}
其中对几个方法进行了重写:
supportedProperties() 记录sink参数对应的k-v值,这里参数值是固定的
requiredContext() 记录sink参数对应的k-v值,这里参数值是支持动态的
createTableSink() 创建sink对象的入口, 其中properties参数表示sql with语句中的k-v。
在createTableSink()方法中,实例化了SqlHdfsSink类,这个类实现了AppendStreamTableSink接口,是真正将数据写入到HDFS中的核心类。
public class SqlHdfsSink implements AppendStreamTableSink<Row> { }
编译打包:
代码语言:txt复制mvn clean package -DskipTests -Dtest.skip=true -Drat.skip=true -Dcheckstyle.skip=true -Dskip.npm=true -Denforcer.skip=true
使用自定义的SQL Sink
Flink 1.10使用classLoader方式加载sql sink,对于用户来讲使用起来比较简单,定义好StreamTableEnvironment stEnv = StreamTableEnvironment.create(env, settings) 即可将SQL语句传到stEnv中,自动加载sink。
代码语言:txt复制stEnv.sqlUpdate("CREATE TABLE gis_log_sink (n"
" deptSiteId STRING,n"
" latitude STRING,n"
" passZoneCode STRINGn"
" ) n"
"with (n"
" 'version' = '2.1.1',n"
" 'dbName' = 'dm_drcs',n"
" 'connectorType' = 'sink',n"
" 'connectorName' = 'HIVE',n"
" 'columnDelimiter' = '\u0001',n"
" 'hadoopClusterName' = 'bdp_sit',n"
" 'timeoutRollInterval' = '1000',n"
" 'configuration' = '{"properties":{"batchSize":10485760,"columnDelimiter":"\\u0001","dateTimeBucketer":"yyyyMMdd","dbNamePrefix":"","dbName":"dm_lxb001","tableName":"17757_tjn_gis_table110","timeoutRollInterval":8,"tmpPath":"hdfs://10.202.77.200/hive/tmp/dm/dm_lxb001/tjn_gis_table111","finalPath":"hdfs://10.202.77.200/hive/warehouse/dm/dm_lxb001/tjn_gis_table110","tableColumns":[{"name":"deptSite","type":"string"},{"name":"gpsTime","type":"string"},{"name":"latitude","type":"string"}],"hdfsConfigInfo":{"namenodeInfoList":[{"nodeName":"namenode1","nodeAddr":"10.202.77.200","nodePort":"8020"},{"nodeName":"namenode2","nodeAddr":"10.202.77.201","nodePort":"8020"}],"nameservices":"test-cluster"},"inputFormat":"text","insertFieldNames":"deptsite, gpstime, latitude"}}'n"
")");
总结
Flink 1.10基于SPI机制加载CONNECTOR ,统一了Flink 与外部系统的交互,也降低了用户实现自定义CONNECTOR的门槛。
开发自定义SQL Connector,本质上是在原有的Sink类前面封装一个factory类,Flink会去读取这个factory 类,并将其加载到runtime中执行。
开源Connector参考
apache-flink connectors参考列表:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/
bahir-flink connectors参考列表:
https://github.com/apache/bahir-flink
ververica/flink connectors参考列表:
https://github.com/ververica/flink-cdc-connectors
图数据库:
neo4j:
https://github.com/albertodelazzari/flink-neo4j
Nebula Graph:
https://github.com/vesoft-inc/nebula-flink-connector