如果对Kafka不了解的话,可以先看这篇博客《一文快速了解Kafka》。
在解释Kafka重复消费出现原因之前,列举一下Kafka中与消费者有关的几个重要配置参数。
enable.auto.commit
:表示消费者会周期性自动提交消费的offset。默认值true。auto.commit.interval.ms
:在enable.auto.commit
为true的情况下, 自动提交的间隔。默认值5秒。max.poll.records
:单次消费者拉取的最大数据条数,默认值500。max.poll.interval.ms
:表示若在阈值时间之内消费者没有消费完上一次poll的消息,consumer client会主动向coordinator发起LeaveGroup请求,触发Rebalance;然后consumer重新发送JoinGroup请求。session.timeout.ms
:group Coordinator检测consumer发生崩溃所需的时间。在这个时间内如果Coordinator未收到Consumer的任何消息,那Coordinator就认为Consumer挂了。默认值10秒。heartbeat.interval.ms
:标识Consumer给Coordinator发一个心跳包的时间间隔。heartbeat.interval.ms
越小,发的心跳包越多。默认值3秒。
Group Coordinator
基于Zookeeper的Rebalance存在不可避免的羊群效应和脑裂问题,如何不用Zookeeper来协调,而是将失败探测和Rebalance的逻辑放到一个高可用的中心,那么上述问题就能得以解决。因此Kafka0.9的版本重新设计了Consumer端,设计了Coordinator机制,大大减少了Zookeeper负载。
对于每一个Consumer Group,Kafka集群为其从Broker集群中选择一个Broker作为其Coordinator。Coordinator主要做两件事:
- 维持Group成员的组成。这包括加入新的成员,检测成员的存活性,清除不再存活的成员。
- 协调Group成员的行为。
羊群效应:以Zookeeper为例,由于一个被watch的znode变化,导致大量的通知需要被发送,将会导致在这个通知期间的其他操作提交的延迟。
重复消费的原因
原因1:消费者宕机、重启或者被强行kill进程,导致消费者消费的offset没有提交。
原因2:设置enable.auto.commit
为true,如果在关闭消费者进程之前,取消了消费者的订阅,则有可能部分offset没提交,下次重启会重复消费。
原因3:消费后的数据,当offset还没有提交时,Partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout.ms
时间,那么就会触发reblance重平衡,此时可能存在消费者offset没提交,会导致重平衡后重复消费。
重复消费的解决方法
- 提高消费者的处理速度。例如:对消息处理中比较耗时的步骤可通过异步的方式进行处理、利用多线程处理等。在缩短单条消息消费的同时,根据实际场景可将
max.poll.interval.ms
值设置大一点,避免不必要的Rebalance。可根据实际消息速率适当调小max.poll.records
的值。 - 引入消息去重机制。例如:生成消息时,在消息中加入唯一标识符如消息id等。在消费端,可以保存最近的
max.poll.records
条消息id到redis或mysql表中,这样在消费消息时先通过查询去重后,再进行消息的处理。 - 保证消费者逻辑幂等。可以查看博客《一文理解如何实现接口的幂等性》