Kafka消息存储格式
存储位置及存储文件划分
文件存储概述
Kafka作为一个高性能的消息队列中间件,有着高效的消息存储方式。我们知道在Kafka中,消息是以topic的形式进行逻辑上的隔离,一个topic又可以分为多个分区,当我们发送消息的时候,会根据某种规则(可以是默认规则,也可以是自定义规则),把消息存储到某个分区当中,同时消息会被分配一个序列号,也就是我们常说的offset,这个offset是一个不断递增的数值。
一个topic对应多个分区,一个分区对应一个日志目录,一个日志目录里面,又分为多个日志片段,日志片段存储的就是我们的消息内容,我们叫日志片段为LogSegment。那这里就有个问题了,为什么日志还要分为LogSegment呢,首先这么区分是为了方便清理数据,对于过期的数据清理,这样划分为一个个片段,比在一个大文件中去寻找过期的数据方便多了。其次还方便管理,比如我要查找消息,从片段中查找比一个大文件里查找容易多了。LogSegment并不是一个文件,而是指多个,在kafka中,每个LogSegment对应一个日志文件和两个索引文件,以及可能存在的其他文件,比如.txnindex后缀的事务日志索引文件。下面的图片描述了kafka的文件存储的构成:
举个例子说明一个,比如我们通过命令行创建了一个topic,名字叫做topic-log-format,这个topic有两个分区,那么就会在消息存储文件目录中,有两个文件夹,分别叫做topic-log-format-0和topic-log-format-1,命名规则其实就是${topic}-${partition},这两个文件夹存储的就是两个分区的消息,如果我们往topic生产了消息,那么这消息文件目录里就会有segment产生,这个segment包含三个文件,分别为日志文件,偏移量索引文件和时间索引文件。
当我们使用生产者不断完topic里面写数据的时候,消息数据就会不断往这几个文件里面写数据,这里的写操作是一个顺序写。segment文件是可能会有多个的,举个例子,如果当前segment的大小大于我们配置的最大大小,就会产生一个新的segment(当然产生新的segemnt不仅仅这一种情况),消息只会往最新一个segment的文件末尾写数据,这个segment我们叫做activeSegment(当前活跃分片),至于之前的segment就变为了只读的文件了。
每个segment中,.log后缀表示的是日志文件,为了便于检索日志,会有两个配套的索引文件,分别为偏移量索引文件(.inde后缀)和时间戳索引文件(.timeindex后缀),这三个文件的文件名都是一样的,各位为基准偏移量 文件后缀,这个基准偏移量是一个64位的长整形,为什么叫做基准偏移量呢,因为文件里面会有相对偏移量,这个我们后面详细说明。
如果到这里对存储文件的划分还是不清楚也没关系,后面实际操作讲解中,看一遍就知道是怎么回事了。
消息文件存储示例展示
1.下载kafka,本文下载的是kafka_2.11-1.1.1,然后放置在/opt/目录。接着执行以下命令:
代码语言:javascript复制[root@VM-232-122-centos /opt]# cd kafka_2.11-1.1.1/config/
// 修改启动配置
[root@VM-232-122-centos /opt/kafka_2.11-1.1.1/config]# vi server.properties
// 修改log.segment.bytes,这个参数表上segment的大小,默认1G,我们这里为了观察,修改为1M
// 修改log.dirs,该参数表示日志文件存储路径,我们这里修改为/tmp/kafka-logs,这里可以配置多个根目录,如果配置多个的情况下,broker会选择分区数最小的根目录创建topic的日志存储文件。
// 启动kafka
[root@VM-232-122-centos /opt/kafka_2.11-1.1.1/bin]# ./kafka-server-start.sh ../config/server.properties &
// 创建topic
[root@VM-232-122-centos /opt/kafka_2.11-1.1.1/bin]# ./kafka-topics.sh --zookeeper localhost:2181 --create --topic lhm-log-format-test --partitions 2 --replication-factor 1
// 往topic写消息
[root@VM-232-122-centos /opt/kafka_2.11-1.1.1/bin]# ./kafka-console-producer.sh --broker-list localhost:9093 --topic lhm-log-format-test
>a
>b
>c
>d
>...// 尝试写入更多的消息
// 查看日志文件
[root@VM-232-122-centos /opt/kafka_2.11-1.1.1/bin]# cd /tmp/kafka-logs-1/
// 可以看到我们创建的topic下面有两个对应的分区的日志目录
[root@VM-232-122-centos /tmp/kafka-logs-1]# ls
lhm-log-format-test-1
lhm-log-format-test-0
// 进入lhm-log-format-test-1就可以看到我们的segment文件,随着后面日志的文件增多,
// 会出现基础偏移量更大的segment文件
[root@VM-232-122-centos /tmp/kafka-logs-1/lhm-log-format-test-1]# ls
00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex leader-epoch-checkpoint
消息文件存储格式
注意这里我们讲解的是最新版的消息日志格式,老版本的消息格式这里我们先不关注。kafka_2.x版本使用的都是这种消息类型。消息的存储是以消息集为单位的,称为record batch,每个record batch含有一条或多条消息,这里的消息称为record,record batch和record都有自己的header。
我们先来看看RecordBatch的数据结构,需要注意的是,即使开启消息压缩,header部分是不会被压缩的(baseOffset到baseSequence,被压缩部分只有records),生产者客户端中的ProducerBatch对应这里的RecordBatch,ProducerRecord对应这里的Record:
代码语言:javascript复制 baseOffset: int64
batchLength: int32
partitionLeaderEpoch: int32
magic: int8 (current magic value is 2)
crc: int32
attributes: int16
bit 0~2:
0: no compression
1: gzip
2: snappy
3: lz4
4: zstd
bit 3: timestampType
bit 4: isTransactional (0 means not transactional)
bit 5: isControlBatch (0 means not a control batch)
bit 6~15: unused
lastOffsetDelta: int32
firstTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
records: [Record]
这里看看RecordBatch的字段含义:
- baseOffset:当前RecordBatch起始位移
- batchLength:partitionLeaderEpoch开始的整个RecordBatch长度
- firstTimestamp:第一条record的时间戳
- producerId,producerEpoch,baseSequence:用于支持幂等和事务的字段
再来看看Record的格式:
代码语言:javascript复制 length: varint
attributes: int8
bit 0~7: unused
timestampDelta: varint
offsetDelta: varint
keyLength: varint
key: byte[]
valueLen: varint
value: byte[]
Headers => [Header]
再看看Record里面的Header:
代码语言:javascript复制 headerKeyLength: varint
headerKey: String
headerValueLength: varint
Value: byte[]
这里可以看到有大量的字段使用到了varint类型,这是一种可变长整型,这种类型就是Protocol Buffers使用的可变长整型,可以参考https://developers.google.com/protocol-buffers/docs/encoding#varints,或者查阅本系列后续文章。这里主要讲讲Record这个类的的字段:
- length:消息总长度
- attributes:保留字段,保留一个字节以备后续使用
- timestampDelta:增量时间戳,这里的增量是和Record Batch的firstTimestamp相比的增量,只保存增量的目的是为了减少存储空间
- offsetDelta:增量位移,这里的增量是和Record Batch的baseOffset相比的增量,为了节省占用空间
- Headers:这个字段用于支持应用级别的扩展,一个Record可以包含0到多个Header
索引文件存储格式
偏移量索引
在.index为后缀的偏移量索引文件中,一个偏移量索引项占8个字节,偏移量索引的格式为:4字节相对偏移量(relativeOffset) 4字节消息在日志文件中的物理位置(position)。我们如何根据目标偏移targetOffset查找消息内容呢?通用的寻找办法是,先找到baseOffset不大于我们要查找的targetOffset的日志分片,这里kafka是通过一个跳跃表的数据结构查询的,kafka会在内部使用concurrentSkipListMap缓存了所有日志分片的数据,key为文件名(baseOffset)value为分片数据,这样查找的时候就可以快速找到需要的分片。然后通过targetOffset - baseOffset得到目标相对偏移量targetRelativeOffset,然后在偏移量索引文件中,使用二分查询,快速找到不大于targetRelativeOffset的最大索引项,然后就能得到一个position,根据position顺序查找日志分段文件,就能寻找到消息了。
举个例子,在我们上面演示的例子中,00000000000000000000.index,我们查看下内容:
代码语言:javascript复制[root@VM-232-122-centos /tmp/kafka-logs-1/lhm-log-format-test-1]# hexdump -C 00000000000000000000.index
00000000 00 00 00 81 00 00 10 32 00 00 00 00 00 00 00 00 |.......2........|
可以看到前4个字节是relativeOffset,转换为十进制为129,后面四个字节为position,转回为十进制为4146,那么假如我们需要查找offset为140的消息如何查找呢?首先找到不大于130的最大的baseOffset的日志分片,假如我们除了00000000000000000000.index的segment,还有00000000000000100000.index,那么不大于130的最大baseOffset分片就是00000000000000000000.index,然后计算目标相对偏移量:140 - 129 = 11,那么我们就可以从00000000000000000000.log定位到4146的position,然后顺序查找11条消息即可得到目标消息了。
时间戳索引
在.timeindex为后缀的时间戳索引文件中,一个时间戳索引项占用12个字节,格式为:8字节时间戳(timestamp) 4字节时间戳对应的消息的相对偏移量(relativeOffset)。消息发送到服务端的时候,可以指定时间戳,也可以使用服务端的时间戳,这个时间戳就会记录到时间戳索引当中,所以时间戳索引里面的索引项的时间戳是不断增大的。时间戳索引并没有像偏移量索引那样缓存在kafka的内存,所以需要遍历时间戳索引,时间戳索引定位消息的步骤如下:
1.遍历所有时间戳索引,查询时间戳索引文件最后的时间戳索引项,和目标时间戳对比,找到第一个大于目标时间戳的索引,得到relativeOffset
2.根据relativeOffset,去偏移量索引查找消息即可,步骤参考偏移量索引
根据时间戳索引查找消息的主要代码逻辑如下:
代码语言:javascript复制def fetchOffsetsByTimestamp(targetTimestamp: Long): Option[TimestampOffset] = {
val targetSeg = {
val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp)
}
targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, logStartOffset))
}
def findOffsetByTimestamp(timestamp: Long, startingOffset: Long = baseOffset): Option[TimestampOffset] = {
// Get the index entry with a timestamp less than or equal to the target timestamp
val timestampOffset = timeIndex.lookup(timestamp)
val position = offsetIndex.lookup(math.max(timestampOffset.offset, startingOffset)).position
// Search the timestamp
Option(log.searchForTimestamp(timestamp, position, startingOffset)).map { timestampAndOffset =>
TimestampOffset(timestampAndOffset.timestamp, timestampAndOffset.offset)
}
}