背景
随着平台Kafka的对接客户越来越多,我发现很多人只知道Kafka Topic可以根据设置保存大小和保存时间触发数据清理机制,但是并不熟悉Kafka Topic另一种清理策略compact。遂有此文,本文主要介绍compact原理、相关配置、实践案例操作记录、相关源码分析等内容。欢迎关注微信公众号:大数据从业者
Compact原理
Kafka数据清理策略是由log.cleanup.policy参数决定的,当前支持两种策略:delete(普通主题默认)、compact(系统主题默认)。两种策略可以同时使用,互不冲突。所以,log.cleanup.policy可以设置为delete或compact或delete,compact。本文暂不涉及delete清理策略,只讲述compact清理策略。Kafka系统主题__consumer_offsets默认清理策略就是compact。
强调一点:compact策略仅对Topic内同时携带key和value的消息有效。换句话说,如果需要使用compact策略,那么producer发送的消息需要同时携带key和value。
我们知道Topic是由Partition组成的,producer将消息写入Partition,每条消息都会被分配一个唯一且不可变的offset。如下图所示:
如果清理策略是delete,那么当满足保存大小或者保存时间的条件时,触发数据清理机制。指定offset之前的消息都将被删除,也就是Delete Retention Point之前的消息,如下图所示:
换句话说,delete策略不会考虑消息的key或value是什么,更不考虑有没有相同的Key消息存在。而compact策略则会考虑同一分区内的相同key的消息,最终只保留相同key的消息中最新的value对应的消息。如下图所示,原始数据中K1有三条消息,经过compact处理,只保留K1:V4这一条消息。个人感觉,该过程称为compact并不是很贴切,应该称为update之类的。
Compact策略适用于只想保留当前快照而不是完整修改历史的场景。比如:为了保存员工工资信息,可以创建主题employee-salary且设置compact策略,如下图所示:
Compact关键保证
代码语言:javascript复制1.不影响没有消费延迟的consumer获取所有消息。换句话说,compact只会操作非active segment,而没有消费延迟的consumer正在消费active segment。
2.Compact不会改变消息的offset值、key值、partition值、前后顺序。只是删除一些消息。
3.在log.cleaner.delete.retention.ms(默认24H)时间内,消费者仍能消费到待删除的消息。
除了正常携带key和value的消息之外,compact还有一种特殊消息:key正常但value=null,这种消息称为tombstone消息。tombstone消息进行合没有意义,所以compact会删除这类消息。欢迎关注公众号:大数据从业者
Compact配置
代码语言:javascript复制log.cleanup.policy:清理策略(delete或compact或delete,compact)
log.cleaner.enable:是否启用compact清理任务
log.cleaner.threads:compact清理任务的线程数
log.segment.bytes:segmemnt文件的最大字节
log.segment.ms:segment保持active的最大时间
log.cleaner.backoff.ms: 清理任务闲时休眠时间
log.cleaner.min.compaction.lag.ms:触发compact的最小延迟时间
log.cleaner.max.compaction.lag.ms:触发compact的最大延迟时间
log.cleaner.dedupe.buffer.size:清理任务线程用于去重的内存
log.cleaner.delete.retention.ms:compact删除消息延迟删除时间
log.cleaner.io.buffer.load.factor:清理任务线程 IO buffer负载率
log.cleaner.io.buffer.size:清理任务线程IO buffer内存
log.cleaner.io.max.bytes.per.second:清理任务线程IO限速
log.cleaner.min.cleanable.ratio:触发compact的脏数据比例
实践案例
1.创建测试主题
为便于测试,设置min.cleanable.dirty.ratio=0.001、segment.ms=5000以保证compact清理任务尽快执行,设置partitions=1以保证测试消息写入相同分区。
2.描述测试主题
3.启动消费者
4.启动生产者,发送测试消息
消息内容故意加入重复key,如下:
代码语言:javascript复制Patrick,salary: 10000
Lucy,salary: 20000
Bob,salary: 20000
Patrick,salary: 25000
Lucy,salary: 30000
Patrick,salary: 30000
5.查看第3步消费消息
可以看到消费到所有消息,证实上文Compact关键保证之一:不影响没有消费延迟的consumer获取所有消息。
6.等待一分钟,继续生产消息,如:Stephane,salary: 0
7.启动新的消费者
可以看到,经过compact清理,上述第4步发送的重复消息只保留最新value。
源码剖析
KafkaServer.startup会启动LogManager,LogManager.startup会启动一个Schedule线程池和一个LogCleaner(内部启动CleanerThread)。Schedule线程池中有一个任务为kafka-log-retention,对应于delete清理策略;而LogCleaner对应于compact清理策略。
本文只讲述compact相关的LogCleaner,其startup方法如下:
代码语言:javascript复制/**
* Start the background cleaning
*/
def startup() {
info("Starting the log cleaner")
// cleaner线程数通过参数log.cleaner.threads配置,默认为1
(0 until config.numThreads).foreach { i =>
val cleaner = new CleanerThread(i)
cleaners = cleaner
cleaner.start()
}
}
接下来主要看下CleanerThread类主流程,位置在LogCleaner.scala文件。
主流程如下:
代码语言:javascript复制doWork -> cleanFilthiestLog -> grabFilthiestCompactedLog -> cleanLog -> clean -> doClean
篇幅有限,doClean方法内容不再介绍。感兴趣请自行阅读。如下:
代码语言:javascript复制https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogCleaner.scala#L594
另外LogCleaner提供metric,方便问题排查和性能调优,如下所示:
总结
通过阅读本文,可以掌握compact原理、配置、实践操作、源码分析等内容。至此,Kafka Compact Topic使用与调优轻松拿捏!