Kafka消息存储原理

2021-03-23 11:23:11 浏览数 (1)

Kafka消息存储格式

存储位置及存储文件划分

文件存储概述

  Kafka作为一个高性能的消息队列中间件,有着高效的消息存储方式。我们知道在Kafka中,消息是以topic的形式进行逻辑上的隔离,一个topic又可以分为多个分区,当我们发送消息的时候,会根据某种规则(可以是默认规则,也可以是自定义规则),把消息存储到某个分区当中,同时消息会被分配一个序列号,也就是我们常说的offset,这个offset是一个不断递增的数值。

  一个topic对应多个分区,一个分区对应一个日志目录,一个日志目录里面,又分为多个日志片段,日志片段存储的就是我们的消息内容,我们叫日志片段为LogSegment。那这里就有个问题了,为什么日志还要分为LogSegment呢,首先这么区分是为了方便清理数据,对于过期的数据清理,这样划分为一个个片段,比在一个大文件中去寻找过期的数据方便多了。其次还方便管理,比如我要查找消息,从片段中查找比一个大文件里查找容易多了。LogSegment并不是一个文件,而是指多个,在kafka中,每个LogSegment对应一个日志文件和两个索引文件,以及可能存在的其他文件,比如.txnindex后缀的事务日志索引文件。下面的图片描述了kafka的文件存储的构成:

举个例子说明一个,比如我们通过命令行创建了一个topic,名字叫做topic-log-format,这个topic有两个分区,那么就会在消息存储文件目录中,有两个文件夹,分别叫做topic-log-format-0和topic-log-format-1,命名规则其实就是${topic}-${partition},这两个文件夹存储的就是两个分区的消息,如果我们往topic生产了消息,那么这消息文件目录里就会有segment产生,这个segment包含三个文件,分别为日志文件,偏移量索引文件和时间索引文件。

当我们使用生产者不断完topic里面写数据的时候,消息数据就会不断往这几个文件里面写数据,这里的写操作是一个顺序写。segment文件是可能会有多个的,举个例子,如果当前segment的大小大于我们配置的最大大小,就会产生一个新的segment(当然产生新的segemnt不仅仅这一种情况),消息只会往最新一个segment的文件末尾写数据,这个segment我们叫做activeSegment(当前活跃分片),至于之前的segment就变为了只读的文件了。

  每个segment中,.log后缀表示的是日志文件,为了便于检索日志,会有两个配套的索引文件,分别为偏移量索引文件(.inde后缀)和时间戳索引文件(.timeindex后缀),这三个文件的文件名都是一样的,各位为基准偏移量 文件后缀,这个基准偏移量是一个64位的长整形,为什么叫做基准偏移量呢,因为文件里面会有相对偏移量,这个我们后面详细说明。

如果到这里对存储文件的划分还是不清楚也没关系,后面实际操作讲解中,看一遍就知道是怎么回事了。

消息文件存储示例展示

1.下载kafka,本文下载的是kafka_2.11-1.1.1,然后放置在/opt/目录。接着执行以下命令:

代码语言:javascript复制
[root@VM-232-122-centos /opt]# cd kafka_2.11-1.1.1/config/

// 修改启动配置
[root@VM-232-122-centos /opt/kafka_2.11-1.1.1/config]# vi server.properties 
// 修改log.segment.bytes,这个参数表上segment的大小,默认1G,我们这里为了观察,修改为1M
// 修改log.dirs,该参数表示日志文件存储路径,我们这里修改为/tmp/kafka-logs,这里可以配置多个根目录,如果配置多个的情况下,broker会选择分区数最小的根目录创建topic的日志存储文件。

// 启动kafka
[root@VM-232-122-centos /opt/kafka_2.11-1.1.1/bin]# ./kafka-server-start.sh ../config/server.properties &

// 创建topic
[root@VM-232-122-centos /opt/kafka_2.11-1.1.1/bin]# ./kafka-topics.sh --zookeeper localhost:2181 --create --topic lhm-log-format-test --partitions 2  --replication-factor 1

// 往topic写消息
[root@VM-232-122-centos /opt/kafka_2.11-1.1.1/bin]# ./kafka-console-producer.sh --broker-list localhost:9093 --topic lhm-log-format-test
>a
>b
>c
>d
>...// 尝试写入更多的消息

// 查看日志文件
[root@VM-232-122-centos /opt/kafka_2.11-1.1.1/bin]# cd /tmp/kafka-logs-1/

// 可以看到我们创建的topic下面有两个对应的分区的日志目录
[root@VM-232-122-centos /tmp/kafka-logs-1]# ls
lhm-log-format-test-1
lhm-log-format-test-0 

// 进入lhm-log-format-test-1就可以看到我们的segment文件,随着后面日志的文件增多,
// 会出现基础偏移量更大的segment文件
[root@VM-232-122-centos /tmp/kafka-logs-1/lhm-log-format-test-1]# ls
00000000000000000000.index  00000000000000000000.log  00000000000000000000.timeindex  leader-epoch-checkpoint

消息文件存储格式

  注意这里我们讲解的是最新版的消息日志格式,老版本的消息格式这里我们先不关注。kafka_2.x版本使用的都是这种消息类型。消息的存储是以消息集为单位的,称为record batch,每个record batch含有一条或多条消息,这里的消息称为record,record batch和record都有自己的header。

  我们先来看看RecordBatch的数据结构,需要注意的是,即使开启消息压缩,header部分是不会被压缩的(baseOffset到baseSequence,被压缩部分只有records),生产者客户端中的ProducerBatch对应这里的RecordBatch,ProducerRecord对应这里的Record:

代码语言:javascript复制
		baseOffset: int64
		batchLength: int32
		partitionLeaderEpoch: int32
		magic: int8 (current magic value is 2)
		crc: int32
		attributes: int16
			bit 0~2:
				0: no compression
				1: gzip
				2: snappy
				3: lz4
				4: zstd
			bit 3: timestampType
			bit 4: isTransactional (0 means not transactional)
			bit 5: isControlBatch (0 means not a control batch)
			bit 6~15: unused
		lastOffsetDelta: int32
		firstTimestamp: int64
		maxTimestamp: int64
		producerId: int64
		producerEpoch: int16
		baseSequence: int32
		records: [Record]

这里看看RecordBatch的字段含义:

  • baseOffset:当前RecordBatch起始位移
  • batchLength:partitionLeaderEpoch开始的整个RecordBatch长度
  • firstTimestamp:第一条record的时间戳
  • producerId,producerEpoch,baseSequence:用于支持幂等和事务的字段

再来看看Record的格式:

代码语言:javascript复制
        length: varint
		attributes: int8
			bit 0~7: unused
		timestampDelta: varint
		offsetDelta: varint
		keyLength: varint
		key: byte[]
		valueLen: varint
		value: byte[]
		Headers => [Header]

再看看Record里面的Header:

代码语言:javascript复制
        headerKeyLength: varint
		headerKey: String
		headerValueLength: varint
		Value: byte[]

这里可以看到有大量的字段使用到了varint类型,这是一种可变长整型,这种类型就是Protocol Buffers使用的可变长整型,可以参考https://developers.google.com/protocol-buffers/docs/encoding#varints,或者查阅本系列后续文章。这里主要讲讲Record这个类的的字段:

  • length:消息总长度
  • attributes:保留字段,保留一个字节以备后续使用
  • timestampDelta:增量时间戳,这里的增量是和Record Batch的firstTimestamp相比的增量,只保存增量的目的是为了减少存储空间
  • offsetDelta:增量位移,这里的增量是和Record Batch的baseOffset相比的增量,为了节省占用空间
  • Headers:这个字段用于支持应用级别的扩展,一个Record可以包含0到多个Header

索引文件存储格式

偏移量索引

在.index为后缀的偏移量索引文件中,一个偏移量索引项占8个字节,偏移量索引的格式为:4字节相对偏移量(relativeOffset) 4字节消息在日志文件中的物理位置(position)。我们如何根据目标偏移targetOffset查找消息内容呢?通用的寻找办法是,先找到baseOffset不大于我们要查找的targetOffset的日志分片,这里kafka是通过一个跳跃表的数据结构查询的,kafka会在内部使用concurrentSkipListMap缓存了所有日志分片的数据,key为文件名(baseOffset)value为分片数据,这样查找的时候就可以快速找到需要的分片。然后通过targetOffset - baseOffset得到目标相对偏移量targetRelativeOffset,然后在偏移量索引文件中,使用二分查询,快速找到不大于targetRelativeOffset的最大索引项,然后就能得到一个position,根据position顺序查找日志分段文件,就能寻找到消息了。

举个例子,在我们上面演示的例子中,00000000000000000000.index,我们查看下内容:

代码语言:javascript复制
[root@VM-232-122-centos /tmp/kafka-logs-1/lhm-log-format-test-1]# hexdump -C 00000000000000000000.index 
00000000  00 00 00 81 00 00 10 32  00 00 00 00 00 00 00 00  |.......2........|

可以看到前4个字节是relativeOffset,转换为十进制为129,后面四个字节为position,转回为十进制为4146,那么假如我们需要查找offset为140的消息如何查找呢?首先找到不大于130的最大的baseOffset的日志分片,假如我们除了00000000000000000000.index的segment,还有00000000000000100000.index,那么不大于130的最大baseOffset分片就是00000000000000000000.index,然后计算目标相对偏移量:140 - 129 = 11,那么我们就可以从00000000000000000000.log定位到4146的position,然后顺序查找11条消息即可得到目标消息了。

时间戳索引

在.timeindex为后缀的时间戳索引文件中,一个时间戳索引项占用12个字节,格式为:8字节时间戳(timestamp) 4字节时间戳对应的消息的相对偏移量(relativeOffset)。消息发送到服务端的时候,可以指定时间戳,也可以使用服务端的时间戳,这个时间戳就会记录到时间戳索引当中,所以时间戳索引里面的索引项的时间戳是不断增大的。时间戳索引并没有像偏移量索引那样缓存在kafka的内存,所以需要遍历时间戳索引,时间戳索引定位消息的步骤如下:

1.遍历所有时间戳索引,查询时间戳索引文件最后的时间戳索引项,和目标时间戳对比,找到第一个大于目标时间戳的索引,得到relativeOffset

2.根据relativeOffset,去偏移量索引查找消息即可,步骤参考偏移量索引

根据时间戳索引查找消息的主要代码逻辑如下:

代码语言:javascript复制
def fetchOffsetsByTimestamp(targetTimestamp: Long): Option[TimestampOffset] = {
  val targetSeg = {
    val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp)
  }

  targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, logStartOffset))

}

def findOffsetByTimestamp(timestamp: Long, startingOffset: Long = baseOffset): Option[TimestampOffset] = {
  // Get the index entry with a timestamp less than or equal to the target timestamp
  val timestampOffset = timeIndex.lookup(timestamp)
  val position = offsetIndex.lookup(math.max(timestampOffset.offset, startingOffset)).position

  // Search the timestamp
  Option(log.searchForTimestamp(timestamp, position, startingOffset)).map { timestampAndOffset =>
    TimestampOffset(timestampAndOffset.timestamp, timestampAndOffset.offset)
  }
}

0 人点赞