诞生背景
- 老版本的Kafka会把位移信息保存在Zk中,当Consumer重启后,自动从Zk中读取位移信息。这种设计使Kafka Broker不需要保存位移数据,可减少Broker端需要持有的状态空间,有利于实现高伸缩性。
- 但zk不适用于高频的写操作,这令zk集群性能严重下降,在新版本中将消费者的位移数据作为一条条普通的Kafka消息,提交至内部主题(_consumer_offsets)中保存。实现高持久性和高频写操作。特点:
- 位移主题是一个普通主题,同样可以被手动创建,修改,删除。
- 位移主题的消息格式是kafka定义的,不可以被手动修改,若修改格式不正确,kafka将会崩溃。
- 位移主题保存了三部分内容:Group ID,主题名,分区号。 创建:
- 当Kafka集群中的第一个Consumer程序启动时,Kafka会自动创建位移主题。也可以手动创建
- 分区数依赖于Broker端的offsets.topic.num.partitions的取值,默认为50
- 副本数依赖于Broker端的offsets.topic.replication.factor的取值,默认为3 使用:
- 当Kafka提交位移消息时会使用这个主题
- 位移提交得分方式有两种:手动和自动提交位移。
- 推荐使用手动提交位移,自动提交位移会存在问题:只有consumer一直启动设置,他就会无限期地向主题写入消息。清理:
- Kafka使用Compact策略来删除位移主题中的过期消息,避免位移主题无限膨胀。
- kafka提供专门的后台线程定期巡检待compcat的主题,查看是否存在满足条件的可删除数据。
注意事项:
- 建议不要修改默认分区数,在kafka中有些许功能写死的是50个分区
- 建议不要使用自动提交模式,采用手动提交,避免消费者无限制的写入消息。
- 后台定期巡检线程叫Log Cleaner,若线上遇到位移主题无限膨胀占用过多磁盘,应该检查此线程的工作状态。
消费者提了异步 commit 实际还没更新完offset,消费者再不断地poll,其实会有重复消费的情况?
- 只要consumer没有重启,不会发生重复消费。因为在运行过程中consumer会记录已获取的消息位移
Topic是由Partition构成的。构成Partition的Segment文件里有以下两个文件
- *.index:这个文件记录了Message Offset,可以让Kafka通过Message Offset快速定位到Message。
- *.timeindex:这个文件记录了Message的时间戳,可以让Kafka通过绝对时间定位到Message。
评论再过一遍
消费端可以通过设置参数 enable.auto.commit 来控制是自动提交还是手动,如果值为 true 则表示自动提交,在消费端的后台会定时的提交消费位移信息,时间间隔由 auto.commit.interval.ms(默认为5秒)。可能存在重复的位移数据提交到消费位移主题中,因为每隔5秒会往主题中写入一条消息,不管是否有新的消费记录,这样就会产生大量的同 key 消息,其实只需要一条,因此需要依赖前面提到日志压缩策略来清理数据。重复消费,假设位移提交的时间间隔为5秒,那么在5秒内如果发生了 rebalance,则所有的消费者会从上一次提交的位移处开始消费,那么期间消费的数据则会再次被消费。我们来看看集中 Delivery Guarantee:读完消息先 commit 再处理消息。这种模式下,如果 Consumer 在 commit 后还没来得及处理消息就 crash 了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于 At most once。读完消息先处理再 commit。这种模式下,如果在处理完消息之后 commit 之前 Consumer crash 了,下次重新开始工作时还会处理刚刚未 commit 的消息,实际上该消息已经被处理过了。这就对应于At least once。
1 概念区分 A :Consumer端的位移概念和消息分区的位移概念不是一回事。 B :Consumer的消费位移,记录的是Consumer要消费的下一条消息的位移。 2 提交位移 A :Consumer 要向Kafka汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。 B :Consumer需要为分配给它的每个分区提交各自的位移数据。 3提交位移的作用 A :提交位移主要是为了表征Consumer的消费进度,这样当Consumer发生故障重启后,能够从kafka中读取之前提交的位移值,从相应的位置继续消费,避免从头在消费一遍。 4 位移提交的特点 A :位移提交的语义保障是由你来负责的,Kafka只会“无脑”地接受你提交的位移。位移提交错误,就会消息消费错误。 5 位移提交方式 A :从用户的角度讲,位移提交分为自动提交和手动提交;从Consumer端的角度而言,位移提交分为同步提交和异步提交。 B :自动提交:由Kafka consumer在后台默默的执行提交位移,用户不用管。开启简单,使用方便,但可能会出现重复消费。 C :手动提交:好处在更加灵活,完全能够把控位移提交的时机和频率。 (1)同步提交:在调用commitSync()时,Consumer程序会处于阻塞状态,直到远端Broker返回提交结果,这个状态才会结束。对TPS影响显著 (2)异步提交:在调用commitAsync()时,会立即给响应,但是出问题了它不会自动重试。 (3)手动提交最好是同步和异步结合使用,正常用异步提交,如果异步提交失败,用同步提交方式补偿提交。 D :批次提交:对于一次要处理很多消费的Consumer而言,将一个大事务分割成若干个小事务分别提交。这可以有效减少错误恢复的时间,避免大批量的消息重新消费。 (1)使用commitSync(Map<TopicPartition,Offset>)和commitAsync(Map<TopicPartition,OffsetAndMetadata>)。
Consumer Exactly Once
Flink 提供的 checkpoint 机制,结合 Source/Sink 端配合支持 Exactly Once 语义,以 Hive 为例:从 Kafka 消费数据,写入到临时目录ck snapshot 阶段,将 Offset 存储到 State 中,Sink 端关闭写入的文件句柄,以及保存 ckid 到 State 中ck complete 阶段,commit kafka offset,将临时目录中的数据移到正式目录ck recover 阶段,恢复 state 信息,reset kafka offset;恢复 last ckid,将临时目录的数据移动到正式目录
CommitFailedException异常
本次提交位移失败了,原因是消费者组已经开启了 Rebalance 过程,并且将要提交位移的分区分配给了另一个消费者实例。出现这个情况的原因是,你的消费者实例连续两次调用 poll 方法的时间间隔超过了期望的 max.poll.interval.ms 参数值。这通常表明,你的消费者实例花费了太长的时间进行消息处理,耽误了调用 poll 方法。
社区给出了两个相应的解决办法
- 增加期望的时间间隔 max.poll.interval.ms 参数值。
- 减少 poll 方法一次性返回的消息数量,即减少 max.poll.records 参数值。
场景1
当消息处理的总时间超过预设的 max.poll.interval.ms 参数值时,Kafka Consumer 端会抛出 CommitFailedException 异常。
办法
- 缩短单条消息处理的时间。比如,之前下游系统消费一条消息的时间是 100 毫秒,优化之后成功地下降到 50 毫秒,那么此时 Consumer 端的 TPS 就提升了一倍。
- 增加 Consumer 端允许下游系统消费一批消息的最大时长。
- 减少下游系统一次性消费的消息总数。这取决于 Consumer 端参数 max.poll.records 的值。当前该参数的默认值是 500 条,表明调用一次 KafkaConsumer.poll 方法,最多返回 500 条消息。可以说,该参数规定了单次 poll 方法能够返回的消息总数的上限。如果前两种方法对你都不适用的话,降低此参数值是避免 CommitFailedException 异常最简单的手段。
- 下游系统使用多线程来加速消费。这应该算是“最高级”同时也是最难实现的解决办法了。具体的思路就是,让下游系统手动创建多个消费线程处理 poll 方法返回的一批消息。之前你使用 Kafka Consumer 消费数据更多是单线程的,所以当消费速度无法匹及 Kafka Consumer 消息返回的速度时,它就会抛出 CommitFailedException 异常。如果是多线程,你就可以灵活地控制线程数量,随时调整消费承载能力,再配以目前多核的硬件条件,该方法可谓是防止 CommitFailedException 最高档的解决之道。事实上,很多主流的大数据流处理框架使用的都是这个方法,比如 Apache Flink 在集成 Kafka 时,就是创建了多个 KafkaConsu
除了调整 max.poll.interval.ms 之外,你还可以选择调整 max.poll.records 值,减少每次 poll 方法返回的消息数。
重置位移策略
我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!