事情原委:由于业务量暴涨,kafka硬盘不够保存7天的数据,所以希望升级一下硬盘,能保存7天的日志,之后确认某一天进行升级,升级完了之后发现两三天之前的数据也被重新消费提交到数据库。发现有此问题之后,然后只能修复数据了,苦逼的修复线上最近3天的超过400G的数据。
关于上面出现的问题,需要查证为什么三天前已经提交过offset的数据还被重新消费?和阿里云的技术支持对接之后,他们让我给一下Kafka消费客户端配置。
代码语言:javascript复制clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest
他们给的解释是OffsetOldest会拉到最旧的消息进行消费,而我们业务代码中的配置是拉取最旧的消息,并没有做幂等处理,所以造成重复消费。但是其实我这两三天前的被提交的offset也被消费了,我就很不接受他说的了,后面那个阿里云的技术工程师给了一些可能出现的情况会造成这个问题,具体我就这里先不阐述了。
为什么我两三天的数据已经被提交过offset的也被重复消费了?这个我总结了大概几个原因
- kafka磁盘扩容,kafka客户端没有正确获取到__consumer_offsets的位移,造成被重新消费。
- kafka客户端有bug,kafka重启触发了消费被重置。
我们先来看下kafka关于
代码语言:javascript复制auto.offset.reset
的参数值:latest和earliest的详解。
latest和earliest区别
代码语言:javascript复制1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
下面我们来验证一下:
这个是生成zzh_test的topic,生成a1到a11,然后我通过设置
代码语言:javascript复制clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest
和
代码语言:javascript复制clusterCfg.Consumer.Offsets.Initial = sarama.OffsetNewest
两个去消费,根据不同的情况打印一下结果,结果如下。
- 两个消费端都未提交offset,latest启动之后只会读新产生该分区下的数据,earliest会从头开始消费
- 两个消费端都提交了offset,并且offset是有序提交的,latest和earliest都是从提交的offset开始消费
- 两个消费端都提交offset,但是两个端都存在a1,a2,a9的offset被提交了,但是a3,a4,a5,a6,a7,a8的offset没有被提交,latest和earliest配置也是从a10开始消费。中间的提交的也会被忽略。
关于offset提交的位移配置我们清楚了,所以上面的问题肯定不是这个参数造成的。我们重新回到"为什么我两三天的数据已经被提交过offset的也被重复消费了"这个问题。这个其实主要是阿里云kafka的硬盘升级涉及到数据的迁移,kafka机器是一台一台升级然后重启,造成大量的rebalance,触发到了Sarama Go客户端的OutOfRange机制,然后消费位点重置。
吐槽一下:关于阿里云的文档,这样的注意事项相当于什么都没说。
总结:
- 提交过offset,latest和earliest没有区别,但是在没有提交offset情况下,用latest直接会导致无法读取旧数据。
- 服务的升级还是应该采取保守的方式,不要片面听信某云的理论,人总是会犯错的。比如Kafka磁盘扩容可以把消费服务停止,然后扩容完成再启动消费端,可能就不会触发到Sarama Go客户端的bug了。