厉害了!一文撕开Kafka Compact Topic神秘面纱

2024-08-06 15:18:06 浏览数 (2)

背景

随着平台Kafka的对接客户越来越多,我发现很多人只知道Kafka Topic可以根据设置保存大小和保存时间触发数据清理机制,但是并不熟悉Kafka Topic另一种清理策略compact。遂有此文,本文主要介绍compact原理、相关配置、实践案例操作记录、相关源码分析等内容。欢迎关注微信公众号:大数据从业者

Compact原理

Kafka数据清理策略是由log.cleanup.policy参数决定的,当前支持两种策略:delete(普通主题默认)、compact(系统主题默认)。两种策略可以同时使用,互不冲突。所以,log.cleanup.policy可以设置为delete或compact或delete,compact。本文暂不涉及delete清理策略,只讲述compact清理策略。Kafka系统主题__consumer_offsets默认清理策略就是compact。

强调一点:compact策略仅对Topic内同时携带key和value的消息有效。换句话说,如果需要使用compact策略,那么producer发送的消息需要同时携带key和value。

我们知道Topic是由Partition组成的,producer将消息写入Partition,每条消息都会被分配一个唯一且不可变的offset。如下图所示:

如果清理策略是delete,那么当满足保存大小或者保存时间的条件时,触发数据清理机制。指定offset之前的消息都将被删除,也就是Delete Retention Point之前的消息,如下图所示:

换句话说,delete策略不会考虑消息的key或value是什么,更不考虑有没有相同的Key消息存在。而compact策略则会考虑同一分区内的相同key的消息,最终只保留相同key的消息中最新的value对应的消息。如下图所示,原始数据中K1有三条消息,经过compact处理,只保留K1:V4这一条消息。个人感觉,该过程称为compact并不是很贴切,应该称为update之类的。

Compact策略适用于只想保留当前快照而不是完整修改历史的场景。比如:为了保存员工工资信息,可以创建主题employee-salary且设置compact策略,如下图所示:

Compact关键保证

代码语言:javascript复制
1.不影响没有消费延迟的consumer获取所有消息。换句话说,compact只会操作非active segment,而没有消费延迟的consumer正在消费active segment。    

2.Compact不会改变消息的offset值、key值、partition值、前后顺序。只是删除一些消息。

3.在log.cleaner.delete.retention.ms(默认24H)时间内,消费者仍能消费到待删除的消息。 

除了正常携带key和value的消息之外,compact还有一种特殊消息:key正常但value=null,这种消息称为tombstone消息。tombstone消息进行合没有意义,所以compact会删除这类消息。欢迎关注公众号:大数据从业者

Compact配置

代码语言:javascript复制
log.cleanup.policy:清理策略(delete或compact或delete,compact)

log.cleaner.enable:是否启用compact清理任务

log.cleaner.threads:compact清理任务的线程数

log.segment.bytes:segmemnt文件的最大字节

log.segment.ms:segment保持active的最大时间

log.cleaner.backoff.ms: 清理任务闲时休眠时间

log.cleaner.min.compaction.lag.ms:触发compact的最小延迟时间

log.cleaner.max.compaction.lag.ms:触发compact的最大延迟时间

log.cleaner.dedupe.buffer.size:清理任务线程用于去重的内存

log.cleaner.delete.retention.ms:compact删除消息延迟删除时间    

log.cleaner.io.buffer.load.factor:清理任务线程 IO buffer负载率

log.cleaner.io.buffer.size:清理任务线程IO buffer内存

log.cleaner.io.max.bytes.per.second:清理任务线程IO限速

log.cleaner.min.cleanable.ratio:触发compact的脏数据比例

实践案例

1.创建测试主题

为便于测试,设置min.cleanable.dirty.ratio=0.001、segment.ms=5000以保证compact清理任务尽快执行,设置partitions=1以保证测试消息写入相同分区。

2.描述测试主题

3.启动消费者

4.启动生产者,发送测试消息

消息内容故意加入重复key,如下:

代码语言:javascript复制
Patrick,salary: 10000

Lucy,salary: 20000

Bob,salary: 20000

Patrick,salary: 25000

Lucy,salary: 30000

Patrick,salary: 30000

5.查看第3步消费消息

可以看到消费到所有消息,证实上文Compact关键保证之一:不影响没有消费延迟的consumer获取所有消息。

6.等待一分钟,继续生产消息,如:Stephane,salary: 0

7.启动新的消费者

可以看到,经过compact清理,上述第4步发送的重复消息只保留最新value。

源码剖析

KafkaServer.startup会启动LogManager,LogManager.startup会启动一个Schedule线程池和一个LogCleaner(内部启动CleanerThread)。Schedule线程池中有一个任务为kafka-log-retention,对应于delete清理策略;而LogCleaner对应于compact清理策略。

本文只讲述compact相关的LogCleaner,其startup方法如下:

代码语言:javascript复制
/**

   * Start the background cleaning

   */

  def startup() {

    info("Starting the log cleaner")

// cleaner线程数通过参数log.cleaner.threads配置,默认为1

    (0 until config.numThreads).foreach { i =>

      val cleaner = new CleanerThread(i)

      cleaners  = cleaner

      cleaner.start()

    }

  }

接下来主要看下CleanerThread类主流程,位置在LogCleaner.scala文件。

主流程如下:

代码语言:javascript复制
doWork -> cleanFilthiestLog ->  grabFilthiestCompactedLog -> cleanLog -> clean -> doClean

篇幅有限,doClean方法内容不再介绍。感兴趣请自行阅读。如下:

代码语言:javascript复制
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogCleaner.scala#L594

另外LogCleaner提供metric,方便问题排查和性能调优,如下所示:

总结

通过阅读本文,可以掌握compact原理、配置、实践操作、源码分析等内容。至此,Kafka Compact Topic使用与调优轻松拿捏!

0 人点赞