一文搞懂 FlinkSQL 的 KafkaSource

2021-02-01 12:02:10 浏览数 (1)

背景

前面我们了解了 写给大忙人看的Flink 消费 Kafka,今天我们一起来看一下 FlinkSQL Kafka 是如何与 Flink Streaming Kafka 结合起来的

正文

创建 kafka source

代码语言:javascript复制
CREATE TABLE orders
(
    status      int,
    courier_id  bigint,
    id          bigint,
    finish_time BIGINT,
    place_time  BIGINT,
    PRIMARY KEY (id) NOT ENFORCED
)
    WITH (
        'connector' = 'kafka','topic' = 'test',
        'properties.bootstrap.servers' = 'xxx','properties.group.id' = 'testGroup',
        'format' = 'ss-canal-json','ss-canal-json.table.include' = 'orders','scan.startup.mode' = 'latest-offset');

经过 Apache Calcite 的一系列转化( 具体转化的过程后续会写 ),最终达到 CatalogSourceTable 类,此类继承自 FlinkPreparingTableBase,负责将 Calcite 的 RelOptTable 转化为 Flink 的 TableSourceTable

代码语言:javascript复制
@Override
    //入口方法  SqlToRelConverter toRel 方法
    public RelNode toRel(ToRelContext toRelContext) {
        final RelOptCluster cluster = toRelContext.getCluster();
        final List<RelHint> hints = toRelContext.getTableHints();// sql Hint
        final FlinkContext context = ShortcutUtils.unwrapContext(cluster);
        final FlinkTypeFactory typeFactory = ShortcutUtils.unwrapTypeFactory(cluster);
        final FlinkRelBuilder relBuilder = FlinkRelBuilder.of(cluster, relOptSchema);

        // 0. finalize catalog table
        final Map<String, String> hintedOptions = FlinkHints.getHintedOptions(hints);
        final CatalogTable catalogTable = createFinalCatalogTable(context, hintedOptions);

        // 1. create and prepare table source
        final DynamicTableSource tableSource = createDynamicTableSource(context, catalogTable);
        prepareDynamicSource(
                schemaTable.getTableIdentifier(),
                catalogTable,
                tableSource,
                schemaTable.isStreamingMode(),
                context.getTableConfig());

        // 2. push table scan
        pushTableScan(relBuilder, cluster, catalogTable, tableSource, typeFactory, hints);

        // 3. push project for non-physical columns
        final TableSchema schema = catalogTable.getSchema();
        if (!TableSchemaUtils.containsPhysicalColumnsOnly(schema)) {
            pushMetadataProjection(relBuilder, typeFactory, schema);
            pushGeneratedProjection(context, relBuilder, schema);
        }

        // 4. push watermark assigner
        if (schemaTable.isStreamingMode() && !schema.getWatermarkSpecs().isEmpty()) {
            pushWatermarkAssigner(context, relBuilder, schema);
        }

        return relBuilder.build();
    }

0-4 转化完成。这篇 blog 主要关心部分是 1 ,我们继续追踪到 FactoryUtil.createTableSource 方法

代码语言:javascript复制
public static DynamicTableSource createTableSource(
            @Nullable Catalog catalog, //GenericlnMemoryCatalog
            ObjectIdentifier objectIdentifier,//`default_catalog`.`default_database`.`orders`
            CatalogTable catalogTable,//CatalogTableImpl
            ReadableConfig configuration,
            ClassLoader classLoader,
            boolean isTemporary) {
        final DefaultDynamicTableContext context =
                new DefaultDynamicTableContext(
                        objectIdentifier, catalogTable, configuration, classLoader, isTemporary);
        try {
            final DynamicTableSourceFactory factory = // 找到 KafkaDynamicTableFactory
                    getDynamicTableFactory(DynamicTableSourceFactory.class, catalog, context);
            return factory.createDynamicTableSource(context);
        } catch (Throwable t) {
            throw new ValidationException(
                    String.format(
                            "Unable to create a source for reading table '%s'.nn"
                                      "Table options are:nn"
                                      "%s",
                            objectIdentifier.asSummaryString(),
                            catalogTable.getOptions().entrySet().stream()
                                    .map(e -> stringifyOption(e.getKey(), e.getValue()))
                                    .sorted()
                                    .collect(Collectors.joining("n"))),
                    t);
        }
    }

我们到 KafkaDynamicTableFactory 的 createDynamicTableSource 方法

代码语言:javascript复制
@Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

        final ReadableConfig tableOptions = helper.getOptions();//with 里的配置信息

        // 通过 format (SPI)
        final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
                getKeyDecodingFormat(helper);

        final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =//SSCanalJsonFormatFactory
                getValueDecodingFormat(helper);

        // 一些类的校验 validate
        helper.validateExcept(PROPERTIES_PREFIX);

        validateTableSourceOptions(tableOptions);

        validatePKConstraints(
                context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);

        final StartupOptions startupOptions = getStartupOptions(tableOptions);

        //获取 kafka 本身的一些配置 servers、group.id 等
        final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());

        // add topic-partition discovery
        properties.setProperty(
                FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
                String.valueOf(
                        tableOptions
                                .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY)
                                .map(Duration::toMillis)
                                .orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED)));

        final DataType physicalDataType =//ROW<`status` INT, `courier_id` BIGINT, `id` BIGINT, `finish_time` BIGINT> NOT NULL
                context.getCatalogTable().getSchema().toPhysicalRowDataType();

        final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);

        final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);

        final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);

        return createKafkaTableSource(
                physicalDataType,
                keyDecodingFormat.orElse(null),
                valueDecodingFormat,
                keyProjection,
                valueProjection,
                keyPrefix,
                KafkaOptions.getSourceTopics(tableOptions),
                KafkaOptions.getSourceTopicPattern(tableOptions),
                properties,
                startupOptions.startupMode,
                startupOptions.specificOffsets,
                startupOptions.startupTimestampMillis);
    }

首先做了一些校验,然后传入一些配置来创建 tableSource ,如下

代码语言:javascript复制
protected KafkaDynamicSource createKafkaTableSource(
            DataType physicalDataType,//要查询的字段 ROW<`status` INT, `courier_id` BIGINT, `id` BIGINT, `finish_time` BIGINT> NOT NULL
            @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
            DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,//SSCanalJsonFormatFactory
            int[] keyProjection,
            int[] valueProjection,
            @Nullable String keyPrefix,
            @Nullable List<String> topics,// topics
            @Nullable Pattern topicPattern,//topicPattern
            Properties properties,// kafka 的一些配置信息,servers、group.id 等
            StartupMode startupMode,
            Map<KafkaTopicPartition, Long> specificStartupOffsets,
            long startupTimestampMillis) {
        return new KafkaDynamicSource(
                physicalDataType,
                keyDecodingFormat,
                valueDecodingFormat,
                keyProjection,
                valueProjection,
                keyPrefix,
                topics,
                topicPattern,
                properties,
                startupMode,
                specificStartupOffsets,
                startupTimestampMillis,
                false);
    }

继续执行

代码语言:javascript复制
 prepareDynamicSource(
                schemaTable.getTableIdentifier(),
                catalogTable,
                tableSource,
                schemaTable.isStreamingMode(),
                context.getTableConfig());

会调用 KafkaDynamicSource.getScanRuntimeProvider 方法,创建 FlinkKafkaConsumer 成功

其他

关于 'format' = 'ss-canal-json' 的一些事情可以参考 FlinkSQL 平台

0 人点赞