记一次Flink写入Kafka坑点

2022-04-18 11:53:23 浏览数 (1)

最近做了一个将结果数据写入到Kafka的需求,sink部分代码如下:

代码语言:javascript复制
val kafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](

        sinkTopic, new StringKeyedSerializationSchema,producerConfig, sinkSemantic)

      ds.addSink(kafkaProducer).setParallelism(sinkParallelism)
 

其中StringKeyedSerializationSchema是自定义的实现KeyedSerializationSchema的序列化器,用于序列化写入kafka的key/value, 任务也上线了,在flink web页面看到任务各项指标一切正常,也测试消费写入kafka的数据,得到的结果也如预期一样,想着万事大吉了,so easy~ 过了一会kafka中间件的同事找过来说:你这个写入topic的数据怎么只有这几个分区,其他分区都没有数据写入~

什么情况?任务看着一切都ok啊,怎么就有分区没有数据写入呢?马上google一下数据写入kafka的分区策略:

  1. 如果指定写入分区,就将数据写入分区
  2. 如果没有指定分区,指定了key, 那么就会按照key hash对分区取模方式发送
  3. 如果既没指定分区又没指定key,那么就会以轮序的方式发送

而实际情况是有几个分区一条数据都没有写入,并且在StringKeyedSerializationSchema也指定了每条写入数据的key, 那么就一定是第一种情况了,在FlinkKafkaProducer011中指定了数据写入的分区,马上翻看源码,在FlinkKafkaProducer011的invoke方法里面有这么一个逻辑:

代码语言:javascript复制
if (flinkKafkaPartitioner != null) {

            record = new ProducerRecord<>(

                targetTopic,

                flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),

                timestamp,

                serializedKey,

                serializedValue);

        } else {

            record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);

        }
 

很明显就是执行了if逻辑,也是就flinkKafkaPartitioner不为空,在构建ProducerRecord时调用了flinkKafkaPartitioner.partition的方法,指定写入的partition,而flinkKafkaPartitioner是在FlinkKafkaProducer011初始化的时候给的默认值FlinkFixedPartitioner,在看下其partition方式:

代码语言:javascript复制
partitions[parallelInstanceId % partitions.length]

parallelInstanceId表示当前task的index,partitions表示kafka的topic的分区,该逻辑求得的分区就是根据当前task index 对partition取余得到的,而我设置的sinkParallelism是4,topic的分区数是6,到这里就比较明朗,取余永远不会得到4、5,所以就导致分区4、5一直没有数据写入。如果设置的parallism设置比kafka的分区数还要大,就会导致得到的partition值大于topic实际partition。 那么解决方式有一下几种:

  1. parallism设置成为与kafka topic 分区数一致大小
  2. 将flinkKafkaPartitioner指定为空,并且制定写入kafka的key
  3. 将flinkKafkaPartitioner与写入的key都置为空
  4. 自定义一个FlinkKafkaPartitioner,重写partition方法

最终选择第三种较为简单的方案,修改代码:

代码语言:javascript复制
val kafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](

        sinkTopic, new StringKeyedSerializationSchema,producerConfig,Optional.ofNullable(null), sinkSemantic,5)
 

同时将StringKeyedSerializationSchema的serializeKey返回值设置为null. 再次运行任务,查看kafka 数据写入情况,所有分区都有数据写入。最终破案。

a little note : 如果需要将数据写入多个topic, 重写KeyedSerializationSchema中getTargetTopic方法根据数据判断写入的topic,默认写入的topic是 FlinkKafkaProducer011 传入的topic。

END

0 人点赞