1. 依赖
Flink版本:1.11.2
Apache Flink 内置了多个 Kafka Connector:通用、0.10、0.11等。这个通用的 Kafka Connector 会尝试追踪最新版本的 Kafka 客户端。不同 Flink 发行版之间其使用的客户端版本可能会发生改变。现在的 Kafka 客户端可以向后兼容 0.10.0 或更高版本的 Broker。对于大多数用户使用通用的 Kafka Connector 就可以了。但对于 0.11.x 和 0.10.x 版本的 Kafka 用户,我们建议分别使用专用的 0.11 和 0.10 Connector。有关 Kafka 兼容性的详细信息,请参阅 Kafka官方文档。
通用 Connector:
代码语言:javascript复制<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.11.2</version>
</dependency>
0.11 Connector:
代码语言:javascript复制<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-011_2.11</artifactId>
<version>1.11.2</version>
</dependency>
0.10 Connector:
代码语言:javascript复制<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-010_2.11</artifactId>
<version>1.11.2</version>
</dependency>
0.10 Connector 不支持对 Kafka 的 Exactly-once 写入。
下面是老版本的 Connector 介绍:
Maven | 开始支持版本 | 消费者与生产者类名 | Kafka版本 | 备注 |
---|---|---|---|---|
flink-connector-kafka-0.8_2.11 | 1.0.0 | FlinkKafkaConsumer08、FlinkKafkaProducer08 | 0.8.x | 使用 SimpleConsumer API。偏移量被提交到ZooKeeper上。 |
flink-connector-kafka-0.9_2.11 | 1.0.0 | FlinkKafkaConsumer09、FlinkKafkaProducer09 | 0.9.x | 使用新版 Consumer API。 |
flink-connector-kafka-0.10_2.11 | 1.2.0 | FlinkKafkaConsumer010、FlinkKafkaProducer010 | 0.10.x | 这个连接器支持生产与消费的带时间戳的Kafka消息。 |
flink-connector-kafka-0.11_2.11 | 1.4.0 | FlinkKafkaConsumer011、FlinkKafkaProducer011 | 0.11.x | Kafka 0.11.x 版本不支持 scala 2.10 版本。此连接器支持 Kafka 事务消息 可以为生产者提供 Exactly-Once 语义。 |
flink-connector-kafka_2.11 | 1.7.0 | FlinkKafkaConsumer、FlinkKafkaProducer | >= 1.0.0 | 这是一个通用的 Kafka 连接器,会追踪最新版本的 Kafka 客户端。 |
2. Kafka消费者
Flink 的 Kafka 消费者:FlinkKafkaConsumer(对于 Kafka 0.11.x 版本为 FlinkKafkaConsumer011,对于 Kafka 0.10.x 版本为 FlinkKafkaConsumer010) 提供了可以访问一个或多个 Kafka Topic 的功能。
Kafka 消费者的构造函数接受如下参数:
- Kafka Topic 名称或者 Kafka Topic 名称列表
- 用于反序列化 Kafka 数据的 DeserializationSchema / KafkaDeserializationSchema
- Kafka 消费者的配置。需要以下属性:
bootstrap.servers
(逗号分隔的 Kafka broker 列表、zookeeper.connect
(逗号分隔的 Zookeeper 服务器)(对于 Kafka 0.8 是必需的)、group.id
(消费组的Id)。
Java版本:
代码语言:javascript复制Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// 根据版本判断是否使用
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(consumer);
Scala版本:
代码语言:javascript复制val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
stream = env
.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))
2.1 DeserializationSchema
Flink Kafka 消费者需要知道如何将 Kafka 中的二进制数据转换为 Java/Scala 对象。DeserializationSchema 可以允许用户指定这样的一个 Schema。每个 Kafka 消息都会调用 T deserialize(ConsumerRecord<byte[], byte[]> record)
方法。
为了使用方便,Flink 提供如下开箱即用的 Schema:
- TypeInformationSerializationSchema(以及 TypeInformationKeyValueSerializationSchema) 会基于 Flink 的 TypeInformation 创建 Schema。对 Flink 读写数据会非常有用。这个 Schema 是其他通用序列化方法的高性能替代方案。
- JsonDeserializationSchema(以及 JSONKeyValueDeserializationSchema)将序列化的 JSON 转换为 ObjectNode 对象,可以使用
objectNode.get("field").as(Int/String/...)()
方法访问某个字段。KeyValue objectNode 包含一个”key”和”value”字段,这包含了所有字段,以及一个可选的”metadata”字段,可以用来查询此消息的偏移量/分区/主题。 - AvroDeserializationSchema 使用静态 Schema 读取 Avro 格式的序列化的数据。可以从 Avro 生成的类(
AvroDeserializationSchema.forSpecific(...)
) 中推断出 Schema,也可以使用 GenericRecords 手动提供 Schema(AvroDeserializationSchema.forGeneric(...)
)。这个反序列化 Schema 要求序列化记录不能包含嵌套 Schema。
如果要使用 Avro 这种 Schema,必须添加如下依赖:
代码语言:javascript复制<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.11.2</version>
</dependency>
当遇到由于某种原因无法反序列化某个损坏消息时,反序列化 Schema 会返回 null,这会导致这条记录被跳过。由于 Consumer 的容错能力,如果在损坏的消息上让作业失败,那么 Consumer 会再次尝试反序列化该消息。如果反序列化仍然失败,则 Consumer 会陷入该消息的不断重启与失败的循环中。
2.2 起始位置配置
Flink Kafka Consumer 可以配置如何确定 Kafka 分区的起始位置。
Java版本:
代码语言:javascript复制final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
// 从最早的记录开始消费
myConsumer.setStartFromEarliest();
// 从最近的记录开始消费
myConsumer.setStartFromLatest();
// 从指定时间戳(毫秒)开始消费
myConsumer.setStartFromTimestamp(...);
// 默认行为 从指定消费组偏移量开始消费
myConsumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(myConsumer);
...
Scala版本:
代码语言:javascript复制val env = StreamExecutionEnvironment.getExecutionEnvironment()
val myConsumer = new FlinkKafkaConsumer[String](...)
myConsumer.setStartFromEarliest() // start from the earliest record possible
myConsumer.setStartFromLatest() // start from the latest record
myConsumer.setStartFromTimestamp(...) // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets() // the default behaviour
val stream = env.addSource(myConsumer)
...
Flink 所有版本的 Kafka Consumer 都具有上述配置起始位置的方法:
- setStartFromGroupOffsets(默认行为):从消费者组(通过消费者属性
group.id
配置)提交到 Kafka Broker(Kafka 0.8 版本提交到 ZooKeeper)的偏移量开始读取分区。如果找不到分区的偏移量,会使用auto.offset.reset
属性中的配置。 - setStartFromEarliest()/setStartFromLatest():读取最早/最新记录。在这个模式下,提交到 Kafka 偏移量可以忽略,不用作起始位置。
- setStartFromTimestamp(long):从指定的时间戳开始读取。对于每个分区,第一个大于或者等于指定时间戳的记录会被用作起始位置。如果分区的最新记录早于时间戳,则分区简单的读取最新记录即可。在这个模式下,提交到 Kafka 偏移量可以忽略,不用作起始位置。
你还可以为 Consumer 指定每个分区应该开始的确切偏移量:
Java版本:
代码语言:javascript复制Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
Scala版本:
代码语言:javascript复制val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)
myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
上面的示例配置 Consumer 从 myTopic 主题的 0、1 和 2 分区的指定偏移量开始消费。偏移量是 Consumer 读取每个分区的下一条记录。需要注意的是如果 Consumer 需要读取的分区在提供的偏移量 Map 中没有指定偏移量,那么自动转换为默认的消费组偏移量。
当作业从故障中自动恢复或使用保存点手动恢复时,这些起始位置配置方法不会影响起始位置。在恢复时,每个 Kafka 分区的起始位置由存储在保存点或检查点中的偏移量确定。
2.3 容错
当 Flink 启动检查点时,Consumer 会从 Topic 中消费记录,并定期对 Kafka 偏移量以及其他算子的状态进行 Checkpoint。如果作业失败,Flink 会从最新检查点的状态恢复流处理程序,并从保存在检查点中的偏移量重新开始消费来自 Kafka 的记录。
因此,检查点间隔定义了程序在发生故障时最多可以回退多少。要使用容错的 Kafka Consumer,需要在作业中开启拓扑的检查点。如果禁用了检查点,Kafka Consumer 会定期将偏移量提交给 Zookeeper。
Java版本:
代码语言:javascript复制final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每5s进行一次checkpoint
env.enableCheckpointing(5000);
Scala版本:
代码语言:javascript复制val env = StreamExecutionEnvironment.getExecutionEnvironment()
// 每5s进行一次checkpoint
env.enableCheckpointing(5000)
如果有足够的 slot 可用于重新启动拓扑,那么 Flink 才能重新启动拓扑。因此,如果拓扑由于与 TaskManager 断开而失败,那么必须有足够的可用 slot。
2.4 分区与主题发现
2.4.1 分区发现
Flink Kafka Consumer 支持发现动态创建的 Kafka 分区,并使用 Exactly-Once 语义来消费。当作业开始运行,首次检索分区元数据后发现的所有分区会从最早的偏移量开始消费。
默认情况下,分区发现是禁用的。如果要启用它,需要设置 flink.partition-discovery.interval-millis
为一个非负值,表示发现间隔(以毫秒为单位的)。
当使用 Flink 1.3.x 之前的版本,消费者从保存点恢复时,无法在恢复的运行启用分区发现。如果要启用,恢复将失败并抛出异常。在这种情况下,为了使用分区发现,需要在 Flink 1.3.x 版本中生成保存点,然后再从中恢复。
2.4.2 主题发现
Flink Kafka Consumer 还能够使用正则表达式匹配 Topic 名称来自动发现 Topic。
Java 版本:
代码语言:javascript复制final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema(),
properties);
DataStream<String> stream = env.addSource(myConsumer);
...
Scala版本:
代码语言:javascript复制val env = StreamExecutionEnvironment.getExecutionEnvironment()
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val myConsumer = new FlinkKafkaConsumer[String](
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema,
properties)
val stream = env.addSource(myConsumer)
...
在上面的示例中,当作业开始运行时,Consumer 会订阅名称与正则表达式相匹配的所有主题(以 test-topic- 开头并以一位数字结尾)。
2.5 偏移量提交
Flink Kafka Consumer 可以配置如何将偏移量提交回 Kafka Broker。需要注意的是 Flink Kafka Consumer 不需要依赖提交的偏移量来提供容错保证。提交的偏移量仅是用来展示消费者的进度。
有不同的方式配置偏移量提交,具体取决于作业是否启用了检查点:
- 禁用检查点:如果禁用了检查点,那么 Flink Kafka Consumer 依赖于 Kafka 客户端的定期自动提交偏移量的功能。因此,要禁用或启用偏移量提交,只需在 Properties 配置中将
enable.auto.commit
/auto.commit.interval.ms
设置为适当的值。 - 启用检查点:如果启用检查点,那么 Flink Kafka Consumer 会在检查点完成时提交偏移量存储在检查点状态中。这样可以确保 Kafka Broker 中的已提交偏移量与检查点状态中的偏移量一致。用户可以通过调用
setCommitOffsetsOnCheckpoints(boolean)
方法来选择禁用或启用偏移提交(默认情况下为true)。请注意,在这种情况下,Properties 配置中的自动定期提交偏移设置将被忽略。
2.6 时间戳提取与Watermark输出
在许多情况下,记录的时间戳会存在记录本身中或在 ConsumerRecord 的元数据中。另外,用户可能希望周期性地或不定期地发出 Watermark。对于这些情况,Flink Kafka Consumer 可以指定 Watermark 策略。我们可以按照如下所述指定自定义策略,也可以使用内置策略。
Java版本:
代码语言:javascript复制Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> myConsumer =
new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(
WatermarkStrategy.
.forBoundedOutOfOrderness(Duration.ofSeconds(20)));
DataStream<String> stream = env.addSource(myConsumer);
3. Kafka生产者
Flink 的 Kafka 生产者:FlinkKafkaProducer(对于 Kafka 0.11.x 版本为 FlinkKafkaProducer011,对于 Kafka 0.10.x 版本为 FlinkKafkaProducer010) 提供了可以写入一个或多个 Kafka Topic 的功能。
Kafka 生产者的构造函数接受如下参数:
- 一个默认的输出Topic
- 用于序列数据到 Kafka 的 SerializationSchema / KafkaSerializationSchema
- Kafka 生产者的配置。需要以下属性:
bootstrap.servers
(逗号分隔的 Kafka broker 列表、zookeeper.connect
(逗号分隔的 Zookeeper 服务器)(对于 Kafka 0.8 是必需的) - 容错语义
Java版本:
代码语言:javascript复制DataStream<String> stream = ...
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
"my-topic", // target topic
new SimpleStringSchema(), // serialization schema
properties, // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
stream.addSink(myProducer);
Scala版本:
代码语言:javascript复制val stream: DataStream[String] = ...
Properties properties = new Properties
properties.setProperty("bootstrap.servers", "localhost:9092")
val myProducer = new FlinkKafkaProducer[String](
"my-topic", // target topic
new SimpleStringSchema(), // serialization schema
properties, // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // fault-tolerance
stream.addSink(myProducer)
3.1 SerializationSchema
Flink Kafka 生产者需要知道如何将 Java/Scala 对象转换为 Kafka 中的二进制数据。KafkaSerializationSchema 可以允许用户指定这样的一个 Schema。每个 Kafka 消息都会调用 ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp)
方法,生成一个 ProducerRecord 写入 Kafka。
用户可以对如何将数据写到 Kafka 进行细粒度的控制。通过生产者记录,我们可以:
- 设置标题值
- 为每个记录定义Key
- 指定数据的自定义分区
3.2 容错
当启用 Flink 的检查点后,FlinkKafkaProducer 与 FlinkKafkaProducer011(适用于Kafka >= 1.0.0 版本的 FlinkKafkaProducer)都可以提供 Exactly-once 的语义保证。FlinkKafkaProducer010 只能提供 At-Least-once 语义的保证。
除了启用 Flink 的检查点之外,我们还可以通过将语义参数传递给 FlinkKafkaProducer 与 FlinkKafkaProducer011(适用于Kafka >= 1.0.0 版本的FlinkKafkaProducer)来选择三种不同的操作模式:
- Semantic.NONE:Flink 不做任何保证。产生的记录可能会丢失或重复。
- Semantic.AT_LEAST_ONCE(默认设置):保证了不会丢失任何记录(可能重复)。
- Semantic.EXACTLY_ONCE:通过 Kafka 事务提供 Exactly-once 的语义。每当我们使用事务写入 Kafka 时,请不要忘记为所有使用 Kafka 记录的应用程序设置所需的隔离等级(read_committed 或 read_uncommitted,后者为默认值)。