flink中如何自定义Source和Sink?

2021-02-19 10:47:23 浏览数 (1)

译自:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html

动态表(Dynamic tables)是Flink的Table&SQL API的核心概念,用于以统一方式处理有界和无界数据。

因为动态表只是一个逻辑概念,所以Flink并不拥有数据本身。相反,动态表的内容存储在外部系统(例如数据库,键值存储,消息队列)或文件中。

动态源(dynamic sources)和动态接收器(dynamic sinks)可用于从外部系统读取和写入数据。在文档中,source和sink通常在术语“connector(连接器)”下进行概述。

Flink为Kafka,Hive和其他文件系统提供了预定义的连接器。有关内置table sources和table sinks的信息,请参见连接器部分[1]。

该页面重点介绍如何开发自定义的,用户定义的连接器。

注意在Flink 1.11中,作为FLIP-95的[2]一部分引入了新的 table source和table sink接口。工厂类接口也已重新设计。如有必要,请查看旧的table sources和table sinks页面[3]。仍支持这些接口以实现向后兼容。

•总览•Metadata[4]•Planning[5]•Runtime[6]•延伸点•动态Table Factories[7]•动态Table Source[8]•动态Table Sink[9]•Encoding/Decoding Formats[10]•全栈示例•Factories[11]•Table Source和Decoding Format[12]•Runtime[13]

总览

在许多情况下,实现者不需要从头开始创建新的连接器,而是想要略微修改现有的连接器或在现有的堆栈中添加钩子。在其他情况下,实现者想创建专门的连接器。

本节对两种使用场景都提供帮助。它说明了表连接器(Table connectors)的一般体系结构,从API中的纯声明到在集群上执行的运行时代码。

实心箭头表示在转化过程中如何将对象从一个阶段转换到另一阶段。

Metadata

表API和SQL都是声明性API。这包括表的声明。因此,执行CREATE TABLE语句会导致目标catalog中的元数据更新。

对于大多数catalog实现,此类操作不会修改外部系统中的物理数据。特定于连接器的依赖关系不必在类路径中存在。WITH子句中声明的选项既未经验证也未经其他解释。

动态表(通过DDL创建或由catalog提供)的元数据会实例化为CatalogTable对象。表名称将在必要时解析为CatalogTable内部名称。

Planning

在计划和优化表程序时,CatalogTable需要将需求解析为DynamicTableSource(用于读取SELECT查询)和DynamicTableSink(用于编写INSERT INTO语句)。

DynamicTableSourceFactoryDynamicTableSinkFactory提供特定于连接器的逻辑,用于将元数据CatalogTable转换为DynamicTableSourceDynamicTableSink的实例。在大多数情况下,工厂的目的是验证选项(例如在示例中'port' = '5022'),配置编码/解码格式(如果需要),以及创建表连接器(Table connectors)的参数化实例。

默认情况下,DynamicTableSourceFactoryDynamicTableSinkFactory的实例是使用Java的服务提供商接口(SPI)发现的[14]。该 connector选项(例如在示例中'connector' = 'custom')必须对应于有效的工厂标识符。

虽然它可能在类命名上不是很明显,DynamicTableSourceDynamicTableSink 也可以被看作是有状态的工厂,它最终产生读取/写入的实际数据的具体运行时实现。

planner使用Source实例和Sink实例执行特定于连接器的双向通信,直到找到最佳逻辑计划为止。根据可选地声明的功能接口(例如 SupportsProjectionPushDownSupportsOverwrite),planner可以将更改应用于实例,从而使生成的运行时实现发生变化。

Runtime

逻辑计划完成后,planner将从表连接器(Table connectors)获取运行时实现。运行时逻辑在Flink的核心连接器的接口如InputFormat或者SourceFunction中实现。

这些接口作为ScanRuntimeProviderLookupRuntimeProviderSinkRuntimeProvider的子类被划归为另一个级别的抽象。

例如,OutputFormatProvider(提供org.apache.flink.api.common.io.OutputFormat)和SinkFunctionProvider(提供org.apache.flink.streaming.api.functions.sink.SinkFunction)都是planner能处理的SinkRuntimeProvider的具体实现。

延伸点

本节说明用于扩展Flink的Table connectors(表连接器)的可用接口。

动态表工厂(Dynamic Table Factories)

动态表工厂用于根据catalog和session(会话)信息为外部存储系统配置动态表连接器(Table connectors)。

可以通过实现org.apache.flink.table.factories.DynamicTableSourceFactory来构造一个DynamicTableSource

可以通过实现org.apache.flink.table.factories.DynamicTableSinkFactory来构造一个DynamicTableSink

默认情况下,使用作为connector选项值的工厂标识符和Java SPI机制来发现工厂。

在JAR文件中,可以将新实现的引用添加到服务文件中:

代码语言:javascript复制
META-INF/services/org.apache.flink.table.factories.Factory

框架将检查这个唯一匹配的工厂是否通过唯一的工厂标识符标识并且要求它们来自符合要求的基类(例如DynamicTableSourceFactory)。

如有必要,catalog的实现可以绕过工厂发现过程。为此,catalog需要返回一个实现了框架要求的org.apache.flink.table.catalog.Catalog#getFactory中基类的实例。

动态表数据源(Dynamic Table Source)

根据定义,动态表可以随时间变化。

读取动态表时,其内容可以视为:

•一个变更日志(有限或无限),所有变更都被连续消耗,直到耗尽变更日志为止。这由ScanTableSource接口表示。•一个不断变化或非常大的外部表,其内容通常从不完全读取,但在必要时会查询各个值。这由LookupTableSource 接口表示。

一个类可以同时实现这两个接口。planner根据指定的查询来决定其使用。

Scan Table Source

一个ScanTableSource在运行时扫描来自外部存储系统的所有行。

扫描的行不仅可以包含插入,还可以包含更新和删除。因此,table source可用于读取(有限或无限)变更日志。返回的变更日志模式标识着planner在运行时可以预期的变更集。

对于常规的批处理方案,source可以发出仅插入行的有限流。

对于常规流方案,source可以发出仅插入行的无限流。

对于更改数据捕获(CDC)方案,source可以发出带有插入,更新和删除行的有界或无界流。

Table source可以实现其他功能接口如SupportsProjectionPushDown,这可能会在执行planning过程中改变实例。所有功能都可以在org.apache.flink.table.connector.source.abilities 包中找到,并在源功能表中列出[15]。

运行时ScanTableSource的实现必须产生内部数据结构。因此,记录必须org.apache.flink.table.data.RowData格式发出。该框架提供了运行时转换器,这样Source仍然可以在公共数据结构上工作,并在最后执行转换。

Lookup Table Source

一个LookupTableSource在运行时通过一个或多个键查找外部存储系统的行。

ScanTableSource相比,该Source不必读取整个表,并且可以在需要时从(可能不断变化的)外部表中延迟获取各个值。

与相比ScanTableSource,一个LookupTableSource目前仅支持发出插入操作所产生的变化。

其他功能目前并不支持。请参阅org.apache.flink.table.connector.source.LookupTableSource 的文档以获取更多信息。

一个LookupTableSource的运行时实现是TableFunctionor AsyncTableFunction类型的。在运行期间,将使用给定查找键的值调用该函数。

Source功能
代码语言:javascript复制
描述

注意上面的接口当前仅适用于 ScanTableSource,不适用于LookupTableSource

Dynamic Table Sink

根据定义,动态表可以随时间变化。

编写动态表时,内容始终可以被视为变更日志(有限或无限),所有变更都将连续写出,直到耗尽变更日志为止。返回 的变更日志模式指示Sink(接收器)在运行时接受的变更集。

对于常规的批处理方案,接收器只能接受仅插入的行并写出有界流。

对于常规流方案,接收器只能接受仅插入的行,并且可以写出无限制的流。

对于更改数据捕获(CDC)方案,接收器可以写出具有插入,更新和删除行的有界或无界流。

表接收器(Table Sink) 可以实现其他功能接口如SupportsOverwrite,这可能会在执行planning过程中改变实例。所有功能都可以在org.apache.flink.table.connector.sink.abilities 包中找到,并在接收器功能表中列出[22]。

运行时实现DynamicTableSink时必须使用内部数据结构。因此,记录必须以org.apache.flink.table.data.RowData格式进行接收。框架提供了运行时转换器,因此接收器(Sink)仍可以在通用数据结构上工作并在开始时执行转换。

Sink 功能
代码语言:javascript复制
描述

Encoding / Decoding Formats

某些表连接器(Table connectors)接受编码和解码键和/或值的不同格式。

Formats的工作方式与DynamicTableSourceFactory -> DynamicTableSource -> ScanRuntimeProvider工作模式类似,其中工厂负责转换选项,source负责创建运行时逻辑。

由于Formats可能位于不同的模块中,因此可以使用类似于表工厂的[26]Java SPI机制来发现Formats。为了发现format工厂,动态表工厂搜索与工厂标识符和特定于连接器的基类相对应的工厂。

例如,Kafka 源表要求将DeserializationSchema作为解码格式的运行时接口。因此,Kafka源表工厂使用该value.format选项的值来发现DeserializationFormatFactory

当前支持以下格式工厂:

代码语言:javascript复制
org.apache.flink.table.factories.DeserializationFormatFactory
org.apache.flink.table.factories.SerializationFormatFactory

format工厂将选项转换为EncodingFormatDecodingFormat。这些接口是另一种针对给定数据类型生成专用格式运行时逻辑的工厂。

例如,对于Kafka源表工厂,DeserializationFormatFactory会返回EncodingFormat<DeserializationSchema> ,可以将其传递到Kafka源表中。

全栈示例

本节概述了如何使用支持更改日志语义的解码格式来实现扫描源表。该示例说明了所有上述组件如何一起发挥作用。它可以作为参考实现。

特别地,它展示了如何:

•创建可以解析和验证选项的工厂,•实现table connectors,•实现和发现自定义格式,•并使用提供的工具,如数据结构转换器和FactoryUtil

源表使用一个简单的单线程SourceFunction打开一个套接字,以侦听传入的字节。原始字节通过可插拔的格式解码为行。格式(format)要求将changelog标志作为第一列。

我们将使用上面提到的大多数接口来启用以下DDL:

代码语言:javascript复制
CREATE TABLE UserScores (name STRING, score INT)
WITH (
  'connector' = 'socket',
  'hostname' = 'localhost',
  'port' = '9999',
  'byte-delimiter' = '10',
  'format' = 'changelog-csv',
  'changelog-csv.column-delimiter' = '|'
);

因为该格式支持changelog语义,所以我们能够在运行时提取更新并创建可以连续评估变化数据的更新视图:

代码语言:javascript复制
SELECT name, SUM(score) FROM UserScores GROUP BY name;

使用以下命令在终端中提取数据:

代码语言:javascript复制
> nc -lk 9999
INSERT|Alice|12
INSERT|Bob|5
DELETE|Alice|12
INSERT|Alice|18

Factories

本节说明如何将来自catalog的元数据转换为具体的connector(连接器)实例。

两个工厂都已添加到META-INF/services目录中。

SocketDynamicTableFactory

SocketDynamicTableFactorycatalog表转换为源表。由于源表需要解码格式,为了方便起见,我们使用提供的FactoryUtil来发现格式。

代码语言:javascript复制
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;

public class SocketDynamicTableFactory implements DynamicTableSourceFactory {

  // define all options statically
  public static final ConfigOption<String> HOSTNAME = ConfigOptions.key("hostname")
    .stringType()
    .noDefaultValue();

  public static final ConfigOption<Integer> PORT = ConfigOptions.key("port")
    .intType()
    .noDefaultValue();

  public static final ConfigOption<Integer> BYTE_DELIMITER = ConfigOptions.key("byte-delimiter")
    .intType()
    .defaultValue(10); // corresponds to 'n'

  @Override
  public String factoryIdentifier() {
    return "socket"; // used for matching to `connector = '...'`
  }

  @Override
  public Set<ConfigOption<?>> requiredOptions() {
    final Set<ConfigOption<?>> options = new HashSet<>();
    options.add(HOSTNAME);
    options.add(PORT);
    options.add(FactoryUtil.FORMAT); // use pre-defined option for format
    return options;
  }

  @Override
  public Set<ConfigOption<?>> optionalOptions() {
    final Set<ConfigOption<?>> options = new HashSet<>();
    options.add(BYTE_DELIMITER);
    return options;
  }

  @Override
  public DynamicTableSource createDynamicTableSource(Context context) {
    // either implement your custom validation logic here ...
    // or use the provided helper utility
    final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

    // discover a suitable decoding format
    final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
      DeserializationFormatFactory.class,
      FactoryUtil.FORMAT);

    // validate all options
    helper.validate();

    // get the validated options
    final ReadableConfig options = helper.getOptions();
    final String hostname = options.get(HOSTNAME);
    final int port = options.get(PORT);
    final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);

    // derive the produced data type (excluding computed columns) from the catalog table
    final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();

    // create and return dynamic table source
    return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
  }
}

ChangelogCsvFormatFactory

ChangelogCsvFormatFactory将特定格式的选项转换为一种格式。SocketDynamicTableFactory中的FactoryUtil负责相应地调整选项键,并处理像changlog -csv.column-delimiter这样的前缀

由于此工厂实现DeserializationFormatFactory,因此它也可以用于支持反序列化格式的其他连接器,例如Kafka连接器。

代码语言:javascript复制
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;

public class ChangelogCsvFormatFactory implements DeserializationFormatFactory {

  // define all options statically
  public static final ConfigOption<String> COLUMN_DELIMITER = ConfigOptions.key("column-delimiter")
    .stringType()
    .defaultValue("|");

  @Override
  public String factoryIdentifier() {
    return "changelog-csv";
  }

  @Override
  public Set<ConfigOption<?>> requiredOptions() {
    return Collections.emptySet();
  }

  @Override
  public Set<ConfigOption<?>> optionalOptions() {
    final Set<ConfigOption<?>> options = new HashSet<>();
    options.add(COLUMN_DELIMITER);
    return options;
  }

  @Override
  public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
      DynamicTableFactory.Context context,
      ReadableConfig formatOptions) {
    // either implement your custom validation logic here ...
    // or use the provided helper method
    FactoryUtil.validateFactoryOptions(this, formatOptions);

    // get the validated options
    final String columnDelimiter = formatOptions.get(COLUMN_DELIMITER);

    // create and return the format
    return new ChangelogCsvFormat(columnDelimiter);
  }
}

Table Source and Decoding Format

本部分说明了如何从计划层(planning layer)实例转换为运行至集群的运行时实例。

SocketDynamicTableSource

SocketDynamicTableSource在planning过程中使用。在我们的示例中,我们没有实现任何可用的功能接口。因此,可以在getScanRuntimeProvider(…)中找到主逻辑,我们在其中为运行时实例化所需的SourceFunction及其反序列化模式。两个实例都被参数化以返回内部数据结构(即RowData)。

代码语言:javascript复制
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

public class SocketDynamicTableSource implements ScanTableSource {

  private final String hostname;
  private final int port;
  private final byte byteDelimiter;
  private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
  private final DataType producedDataType;

  public SocketDynamicTableSource(
      String hostname,
      int port,
      byte byteDelimiter,
      DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
      DataType producedDataType) {
    this.hostname = hostname;
    this.port = port;
    this.byteDelimiter = byteDelimiter;
    this.decodingFormat = decodingFormat;
    this.producedDataType = producedDataType;
  }

  @Override
  public ChangelogMode getChangelogMode() {
    // in our example the format decides about the changelog mode
    // but it could also be the source itself
    return decodingFormat.getChangelogMode();
  }

  @Override
  public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {

    // create runtime classes that are shipped to the cluster

    final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
      runtimeProviderContext,
      producedDataType);

    final SourceFunction<RowData> sourceFunction = new SocketSourceFunction(
      hostname,
      port,
      byteDelimiter,
      deserializer);

    return SourceFunctionProvider.of(sourceFunction, false);
  }

  @Override
  public DynamicTableSource copy() {
    return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
  }

  @Override
  public String asSummaryString() {
    return "Socket Table Source";
  }
}

ChangelogCsvFormat

ChangelogCsvFormat是运行时使用DeserializationSchema的一种解码格式。它支持发射INSERTDELETE变化。

代码语言:javascript复制
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;

public class ChangelogCsvFormat implements DecodingFormat<DeserializationSchema<RowData>> {

  private final String columnDelimiter;

  public ChangelogCsvFormat(String columnDelimiter) {
    this.columnDelimiter = columnDelimiter;
  }

  @Override
  @SuppressWarnings("unchecked")
  public DeserializationSchema<RowData> createRuntimeDecoder(
      DynamicTableSource.Context context,
      DataType producedDataType) {
    // create type information for the DeserializationSchema
    final TypeInformation<RowData> producedTypeInfo = (TypeInformation<RowData>) context.createTypeInformation(
      producedDataType);

    // most of the code in DeserializationSchema will not work on internal data structures
    // create a converter for conversion at the end
    final DataStructureConverter converter = context.createDataStructureConverter(producedDataType);

    // use logical types during runtime for parsing
    final List<LogicalType> parsingTypes = producedDataType.getLogicalType().getChildren();

    // create runtime class
    return new ChangelogCsvDeserializer(parsingTypes, converter, producedTypeInfo, columnDelimiter);
  }

  @Override
  public ChangelogMode getChangelogMode() {
    // define that this format can produce INSERT and DELETE rows
    return ChangelogMode.newBuilder()
      .addContainedKind(RowKind.INSERT)
      .addContainedKind(RowKind.DELETE)
      .build();
  }
}

Runtime

为了完整起见,本节说明SourceFunction和DeserializationSchema的运行时逻辑。

ChangelogCsvDeserializer

ChangelogCsvDeserializer包含一个简单的解析逻辑,用于将字节转换为整数行和行类型的字符串行。最后的转换步骤将这些转换为内部数据结构。

代码语言:javascript复制
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.RuntimeConverter.Context;
import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

public class ChangelogCsvDeserializer implements DeserializationSchema<RowData> {

  private final List<LogicalType> parsingTypes;
  private final DataStructureConverter converter;
  private final TypeInformation<RowData> producedTypeInfo;
  private final String columnDelimiter;

  public ChangelogCsvDeserializer(
      List<LogicalType> parsingTypes,
      DataStructureConverter converter,
      TypeInformation<RowData> producedTypeInfo,
      String columnDelimiter) {
    this.parsingTypes = parsingTypes;
    this.converter = converter;
    this.producedTypeInfo = producedTypeInfo;
    this.columnDelimiter = columnDelimiter;
  }

  @Override
  public TypeInformation<RowData> getProducedType() {
    // return the type information required by Flink's core interfaces
    return producedTypeInfo;
  }

  @Override
  public void open(InitializationContext context) {
    // converters must be open
    converter.open(Context.create(ChangelogCsvDeserializer.class.getClassLoader()));
  }

  @Override
  public RowData deserialize(byte[] message) {
    // parse the columns including a changelog flag
    final String[] columns = new String(message).split(Pattern.quote(columnDelimiter));
    final RowKind kind = RowKind.valueOf(columns[0]);
    final Row row = new Row(kind, parsingTypes.size());
    for (int i = 0; i < parsingTypes.size(); i  ) {
      row.setField(i, parse(parsingTypes.get(i).getTypeRoot(), columns[i   1]));
    }
    // convert to internal data structure
    return (RowData) converter.toInternal(row);
  }

  private static Object parse(LogicalTypeRoot root, String value) {
    switch (root) {
      case INTEGER:
        return Integer.parseInt(value);
      case VARCHAR:
        return value;
      default:
        throw new IllegalArgumentException();
    }
  }

  @Override
  public boolean isEndOfStream(RowData nextElement) {
    return false;
  }
}

SocketSourceFunction

SocketSourceFunction打开一个套接字用于消费数据包。它通过给定的字节定界符(n默认情况下)分割记录,并将解码委托给可插拔的DeserializationSchema。源函数只能在并行度为1的情况下工作。

代码语言:javascript复制
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.RowData;

public class SocketSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {

  private final String hostname;
  private final int port;
  private final byte byteDelimiter;
  private final DeserializationSchema<RowData> deserializer;

  private volatile boolean isRunning = true;
  private Socket currentSocket;

  public SocketSourceFunction(String hostname, int port, byte byteDelimiter, DeserializationSchema<RowData> deserializer) {
    this.hostname = hostname;
    this.port = port;
    this.byteDelimiter = byteDelimiter;
    this.deserializer = deserializer;
  }

  @Override
  public TypeInformation<RowData> getProducedType() {
    return deserializer.getProducedType();
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    deserializer.open(() -> getRuntimeContext().getMetricGroup());
  }

  @Override
  public void run(SourceContext<RowData> ctx) throws Exception {
    while (isRunning) {
      // open and consume from socket
      try (final Socket socket = new Socket()) {
        currentSocket = socket;
        socket.connect(new InetSocketAddress(hostname, port), 0);
        try (InputStream stream = socket.getInputStream()) {
          ByteArrayOutputStream buffer = new ByteArrayOutputStream();
          int b;
          while ((b = stream.read()) >= 0) {
            // buffer until delimiter
            if (b != byteDelimiter) {
              buffer.write(b);
            }
            // decode and emit record
            else {
              ctx.collect(deserializer.deserialize(buffer.toByteArray()));
              buffer.reset();
            }
          }
        }
      } catch (Throwable t) {
        t.printStackTrace(); // print and continue
      }
      Thread.sleep(1000);
    }
  }

  @Override
  public void cancel() {
    isRunning = false;
    try {
      currentSocket.close();
    } catch (Throwable t) {
      // ignore
    }
  }
}

References

[1] 连接器部分: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/ [2] FLIP-95的: https://cwiki.apache.org/confluence/display/FLINK/FLIP-95: New TableSource and TableSink interfaces [3] 旧的table sources和table sinks页面: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/legacySourceSinks.html [4] Metadata: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html#metadata [5] Planning: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html#planning [6] Runtime: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html#runtime [7] 动态Table Factories: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html#dynamic-table-factories [8] 动态Table Source: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html#dynamic-table-source [9] 动态Table Sink: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html#dynamic-table-sink [10] Encoding/Decoding Formats: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html#encoding--decoding-formats [11] Factories: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html#factories [12] Table Source和Decoding Format: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html#table-source-and-decoding-format [13] Runtime: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html#runtime-1 [14] 服务提供商接口(SPI)发现的: https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html [15] 源功能表中列出: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html#source-abilities [16] SupportsFilterPushDown: https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsFilterPushDown.java [17] SupportsLimitPushDown: https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsLimitPushDown.java [18] SupportsPartitionPushDown: https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsPartitionPushDown.java [19] SupportsProjectionPushDown: https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsProjectionPushDown.java [20] SupportsReadingMetadata: https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java [21] SupportsWatermarkPushDown: https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsWatermarkPushDown.java [22] 器功能表中列出: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html#sink-abilities [23] SupportsOverwrite: https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsOverwrite.java [24] SupportsPartitioning: https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsPartitioning.java [25] SupportsWritingMetadata: https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java [26] 表工厂的: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html#dynamic-table-factories

0 人点赞