最近做了一个将结果数据写入到Kafka的需求,sink部分代码如下:
代码语言:javascript复制val kafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](
sinkTopic, new StringKeyedSerializationSchema,producerConfig, sinkSemantic)
ds.addSink(kafkaProducer).setParallelism(sinkParallelism)
其中StringKeyedSerializationSchema是自定义的实现KeyedSerializationSchema的序列化器,用于序列化写入kafka的key/value, 任务也上线了,在flink web页面看到任务各项指标一切正常,也测试消费写入kafka的数据,得到的结果也如预期一样,想着万事大吉了,so easy~ 过了一会kafka中间件的同事找过来说:你这个写入topic的数据怎么只有这几个分区,其他分区都没有数据写入~
什么情况?任务看着一切都ok啊,怎么就有分区没有数据写入呢?马上google一下数据写入kafka的分区策略:
- 如果指定写入分区,就将数据写入分区
- 如果没有指定分区,指定了key, 那么就会按照key hash对分区取模方式发送
- 如果既没指定分区又没指定key,那么就会以轮序的方式发送
而实际情况是有几个分区一条数据都没有写入,并且在StringKeyedSerializationSchema也指定了每条写入数据的key, 那么就一定是第一种情况了,在FlinkKafkaProducer011中指定了数据写入的分区,马上翻看源码,在FlinkKafkaProducer011的invoke方法里面有这么一个逻辑:
代码语言:javascript复制if (flinkKafkaPartitioner != null) {
record = new ProducerRecord<>(
targetTopic,
flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),
timestamp,
serializedKey,
serializedValue);
} else {
record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
}
很明显就是执行了if逻辑,也是就flinkKafkaPartitioner不为空,在构建ProducerRecord时调用了flinkKafkaPartitioner.partition的方法,指定写入的partition,而flinkKafkaPartitioner是在FlinkKafkaProducer011初始化的时候给的默认值FlinkFixedPartitioner,在看下其partition方式:
代码语言:javascript复制partitions[parallelInstanceId % partitions.length]
parallelInstanceId表示当前task的index,partitions表示kafka的topic的分区,该逻辑求得的分区就是根据当前task index 对partition取余得到的,而我设置的sinkParallelism是4,topic的分区数是6,到这里就比较明朗,取余永远不会得到4、5,所以就导致分区4、5一直没有数据写入。如果设置的parallism设置比kafka的分区数还要大,就会导致得到的partition值大于topic实际partition。 那么解决方式有一下几种:
- parallism设置成为与kafka topic 分区数一致大小
- 将flinkKafkaPartitioner指定为空,并且制定写入kafka的key
- 将flinkKafkaPartitioner与写入的key都置为空
- 自定义一个FlinkKafkaPartitioner,重写partition方法
最终选择第三种较为简单的方案,修改代码:
代码语言:javascript复制val kafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](
sinkTopic, new StringKeyedSerializationSchema,producerConfig,Optional.ofNullable(null), sinkSemantic,5)
同时将StringKeyedSerializationSchema的serializeKey返回值设置为null. 再次运行任务,查看kafka 数据写入情况,所有分区都有数据写入。最终破案。
a little note : 如果需要将数据写入多个topic, 重写KeyedSerializationSchema中getTargetTopic方法根据数据判断写入的topic,默认写入的topic是 FlinkKafkaProducer011 传入的topic。
END