对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。对于消费者而言,它也有一个offset的概念,消费者使用offset来表示消费到分区中某个消息所在的位置。在Kafka中其实有关于offset有两个含义。我这里主要分享消费者提交offset问题。首先我们来看一下issue:https://github.com/confluentinc/confluent-kafka-go/issues/195,涉及到问题:Why CommitOffsets() does not increment the offset like Commit()【为什么CommitOffsets和Commit方法一样不能增加offset】。里面有个正确答案:提交的offset等于msg.offset 1。为什么是这样的呢?我们来深入了解一下。
对于消息在分区中的位置,我们将offset称为“偏移量”;对于消费者消费到的位置,将 offset 称为“位移”,有时候也会更明确地称之为“消费位移”。对于一条消息而言,它的偏移量和消费者消费它时的消费位移是相等的。在旧消费者客户端中,消费位移是存储在ZooKeeper中的。而在新消费者客户端中,消费位移存储在Kafka内部的主题__consumer_offsets中。这里把将消费位移存储起来(持久化)的动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交。
参考图1的消费位移,x表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了 x 位置的消息,那么我们就可以说消费者的消费位移为 x,图中也用了lastConsumedOffset这个单词来标识它。
不过需要非常明确的是,当前消费者需要提交的消费位移并不是 x,而是 x 1,对应于图中的position,它表示下一条需要拉取的消息的位置。读者可能看过一些相关资料,里面所讲述的内容可能是提交的消费位移就是当前所消费到的消费位移,即提交的是 x,这明显是错误的。在消费者中还有一个committed offset的概念,它表示已经提交过的消费位移。要确认提交的消费位移不是当前所消费到的位移我们要看一下kafka的日志设计。
如图2所示,在存储结构上每个分区副本对应一个目录,每个分区副本由一个或多个日志段(LogSegment)组成。每个日志段在物理结构上对应一个以“.index”为文件名后缀的偏移量索引文件、一个以“.timeindex”为文件名后缀的时间戳索引文件以及一个以“.log”为文件名后缀的消息集文件(FileMessageSet),消息集文件即日志文件或数据文件。
由图也可以看出,分区所对应目录的命名规则为:主题名-分区编号,分区编号从0开始,顺序递增,分区编号最大值为分区总数减1,例如,对“log-format”主题,其分区目录依次为log-format-0、log-format-1和log-format-2。数据文件命名规则为:由数据文件的第一条消息偏移量,也称为基准偏移量(BaseOffset),左补0构成20位数字字符组成,每个分区第一个数据文件的基准偏移量为0,因此每个分区第一个数据文件对应的日志文件为0000000000000000000.log,两个索引文件分别为0000000000000000000.index和0000000000000000000.timeindex。后续每个数据文件的基准偏移量为上一个数据文件最后一条消息对应的偏移量(log end offset,LEO)值加1。
有了上面的解释,我们可以知道position=committed offset=lastConsumedOffset 1,当然position和committed offset并不会一直相同,这一点会在下面的示例中有所体现。
对于位移提交的具体时机的把握也很有讲究,有可能会造成重复消费和消息丢失的现象。参考图3,当前一次poll()操作所拉取的消息集为[x 2,x 7],x 2代表上一次提交的消费位移,说明已经完成了x 1之前(包括x 1在内)的所有消息的消费,x 5表示当前正在处理的位置。如果拉取到消息之后就进行了位移提交,即提交了x 8,那么当前消费x 5的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x 8开始的。也就是说,x 5至x 7之间的消消息并未能被消费,如此便发生了消息丢失的现象。
再考虑另外一种情形,位移提交的动作是在消费完所有拉取到的消息之后才执行的,那么当消费x 5的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x 2开始的。也就是说,x 2至x 4之间的消息又重新消费了一遍,故而又发生了重复消费的现象。
出现这两类的情况,这个时候position=committed offset=lastConsumedOffset 1这个等式是不完全相同的。
总结:
1.提交的offset是msg.offset 1
2.对于位移的提交的时机需要把控好,防止重复消费或者消费丢失的问题。