Flink学习笔记(8) -- Flink Kafka-Connector详解

2021-04-13 14:40:39 浏览数 (1)

  Kafka中的partition机制和Flink的并行度机制深度结合

  Kafka可以作为Flink的source和sink

  任务失败,通过设置kafka的offset来恢复应用

setStartFromGroupOffsets()【默认的消费策略】

代码语言:javascript复制
	默认读取上次保存的offset信息;
	如果是应用第一次启动,读取不到上次的offset信息,则会根据这个参数auto.offset.reset的值来进行消费数据;
	
	setStartFromSpecificOffsets(Map)

setStartFromEarliest()

代码语言:javascript复制
	从最早的数据开始进行消费,忽略存储的offset信息

setStartFromLatest()

代码语言:javascript复制
	从最新的数据进行消费,忽略存储的offset信息

  当checkpoint机制开启的时候,Kafka Consumer会定期把kafka的offset信息还有其他operator的状态信息一块保存起来。当job失败重启的时候,Flink会从最近一次的checkpoint中进行恢复数据,重新消费kafka中的数据。

  为了能够使用支持容错的kafka Consumer,需要开启checkpoint

代码语言:javascript复制
	env.enableCheckpointing(5000); // 每5s checkpoint一次

  针对job是否开启checkpoint来区分;

  Checkpoint关闭时: 可以通过下面两个参数配置

代码语言:javascript复制
	enable.auto.commit   设置是否开启自动提交Offset,默认为true
	
	auto.commit.interval.ms  设置自动提交的时间间隔,以毫秒为单位

  Checkpoint开启时:当执行checkpoint的时候才会保存offset,这样保证了kafka的offset和checkpoint的状态偏移量保持一致。

代码语言:javascript复制
	可以通过这个参数设置setCommitOffsetsOnCheckpoints(boolean),这个参数默认就是true。表示在checkpoint的时候提交offset,
此时,kafka中的自动提交机制就会被忽略

  如果Flink开启了checkpoint,针对FlinkKafkaProducer09 和FlinkKafkaProducer010 可以提供 at-least-once的语义,还需要配置下面两个参数:

代码语言:javascript复制
setLogFailuresOnly(false)
setFlushOnCheckpoint(true)

注意:建议修改kafka 生产者的重试次数
	  retries【这个参数的值默认是0】

  如果Flink开启了checkpoint,针对FlinkKafkaProducer011 就可以提供 exactly-once的语义,但是需要选择具体的语义:

代码语言:javascript复制
	Semantic.NONE
	Semantic.AT_LEAST_ONCE【默认】
	Semantic.EXACTLY_ONCE

	注意:在这里我们使用的kafka是基于0.11这个版本,如果是低版本的话,有一些新特性是不支持的。具体的可以参考官方文档
	https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/connectors/kafka.html

0 人点赞