目录
- 1. kafka存储结构
- 1.1 topic
- 1.2 partition
- 1.3 segment
- 1.4 message结构
- 1.5 查找 message过程
- 2 Kafka存储原理
- 2.1 顺序写
- 2.2 零拷贝
- 2.3 缓存机制
- 3 Kafka维护消费偏移
- 3.1 消费偏移的更新方式
- 3.1.1 自动提交(默认方式)
- 3.1.2 手动提交
- 3.1 消费偏移的更新方式
- 4 日志的清除策略以及压缩策略
- 4.1 日志删除
- 4.2 日志压缩策略
1. kafka存储结构
kafka 使用日志文件的方式来保存生产者和发送者的消息,每条消息都有一个 offset 值来表示它在分区中的偏移量。Kafka 中存储的一般都是海量的消息数据,为了避免日志文件过大,一个分片并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的命名规则是_。 比如创建一个名为firstTopic的topic,其中有3个partition,那么在 kafka 的数据目录(/tmp/kafka-log)中就有 3 个目录,firstTopic_0~3 多个分区在集群中多个broker上的分配方法
- 将所有 N Broker 和待分配的 i 个 Partition 排序
- 将第 i 个 Partition 分配到第(i mod n)个 Broker 上
1.1 topic topic可以对应多个partition,而topic只是逻辑概念,不涉及到存储,partition才是物理概念,同一topic的不同partition可能分布在不同机器上1.2 partition partition是一个文件夹,其中包含多个segment,如果其中有n个segment,则共有2*n个文件,每个partition是一个有序的队列,partition中的每条消息都会分配一个有序的id,即offset1.3 segment 由一对文件组成,一个索引文件,一个数据文件
每个分片目录中,kafka 通过分段的方式将 数据 分为多个 LogSegment,一个 LogSegment 对应磁盘上的一个日志文件(00000000000000000000.log)和一个索引文件(如上:00000000000000000000.index),其中日志文件是用来记录消息的。索引文件是用来保存消息的索引。
每个LogSegment 的大小可以在server.properties 中log.segment.bytes=1073741824 (设置分段大小,默认是1gb)选项进行设置。
文件命名规则,上一个segment在partition中的最大offset数值,即,比如00000000000000345678.log文件中的第一条消息的offset为345679 最大为64位的long,不足位数补0,如00000000000000345678.log 索引文件后缀.index,日志数据文件后缀.log
kafka 这种分片和分段策略,避免了数据量过大时,数据文件文件无限扩张带来的隐患,更有助于消息文件的维护以及被消费的消息的清理。
日志文件的树型结构如下图所示
segment索引文件
- 存储一系列的元数据,每条元数据就是一条索引
- 元数据条数小于消息条数,是稀疏索引
- 元数据(即索引)构成
- 索引所指向的message在数据日志文件中的相对序号,即相对的offset,从1开始,该相对offset加上文件名当中的值就是该message在整个partition中的绝对offset了
- 索引所指向的message在数据日志文件中的位置,即文件游标,从0开始,方便直接指定游标打开数据文件
1.4 message结构
- offset 偏移量,消息的唯一标识,通过offset能找到唯一消息,类型long 8bytes
- MessageSize 消息长度,类型int32 4bytes
- crc32校验码, 4bytes,校验message
- magic, 表示本次发布kafka服务程序协议版本号 1byte
- attributes 独立版本,标识压缩类型,编码类型 1byte
- key length 4bytes 当key length=-1时,key字段可不写
- key 可选
- value byte payload 实际消息内容
为了提高查找消息的性能,kafka为每一个日志文件添加 了2 个索引文件:OffsetIndex 和 TimeIndex,分别对应.index以及.timeindex, *.TimeIndex 是映射时间戳和相对 offset的文件
索引文件和日志文件内容关系如下:
message特点
- message是无状态的,即不会标识是否已被消费过
- message不能同时被多个consumer来消费,可以等前一个消费完成,下一个继续消费。
- consumer可以同时消费多条message
1.5 查找 message过程
- 根据 offset 的值,查找 segment 段中的 index 索引文件。由于索引文件命名是以上一个文件的最后一个offset 进行命名的,所以,使用二分查找算法能够根据offset 快速定位到指定的索引文件
- 找到索引文件后,根据 offset 进行定位,找到索引文件中的匹配范围的偏移量position。(kafka 采用稀疏索引的方式来提高查找性能)
- 得到 position 以后,再到对应的 log 文件中,从 position处开始查找 offset 对应的消息,将每条消息的 offset 与目标 offset 进行比较,直到找到消息
比如说,我们要查找 offset=2490 这条消息,那么先找到00000000000000000000.index, 然后找到[2487,49111]这个索引,再到 log 文件中,根据 49111 这个 position 开始查找,比较每条消息的 offset 是否大于等于 2490,最后查找到对应的消息以后返回。
注意:
- 可以利用offset在partition中查找,不能在整个topic中查找的,因为offset只保证在partition中唯一,有序
2 Kafka存储原理
2.1 顺序写
Kafka利用分段、追加日志的方式,在很大程度上将读写限制为顺序I/O(sequential I/O),这在大多数的存储介质上都很快。
人们普遍错误地认为硬盘很慢。然而,存储介质的性能,很大程度上依赖于数据被访问的模式。同样在一块普通的7200 RPM SATA硬盘上,随机I/O(random I/O)与顺序I/O相比,随机I/O的性能要比顺序I/O慢3到4个数量级。
此外,现代的操作系统提供了预先读和延迟写的技术,这些技术可以以块为单位,预先读取大量数据,并将较小的逻辑写操作合并成较大的物理写操作。
因此,顺序I/O和随机I/O之间的性能差异在闪存和其他固态非易失性介质中仍然很明显,不过它们在旋转存储,比如固态硬盘中的性能差异就没有那么明显。
页缓存 页缓存是操作系统用来作为磁盘的一种缓存,减少磁盘的I/O操作。
在写入磁盘的时候其实是写入页缓存中,使得对磁盘的写入变成对内存的写入。写入的页变成脏页,然后操作系统会在合适的时候将脏页写入磁盘中。
在读取的时候如果页缓存命中则直接返回,如果页缓存 miss 则产生缺页中断,从磁盘加载数据至页缓存中,然后返回数据。
并且在读的时候会预读,根据局部性原理当读取的时候会把相邻的磁盘块读入页缓存中。在写入的时候会后写,写入的也是页缓存,这样存着可以将一些小的写入操作合并成大的写入,然后再刷盘。
而且根据磁盘的构造,顺序 I/O 的时候,磁头几乎不用换道,或者换道的时间很短。
缺点: 这样的写入存在数据丢失的风险,例如机器突然断电,那些还未刷盘的脏页就丢失了。不过可以调用 fsync 强制刷盘,但是这样对于性能的损耗较大。
因此一般建议通过多副本机制来保证消息的可靠,而不是同步刷盘。
2.2 零拷贝
即使采用顺序写,但是频繁的 I/O 操作仍然会造成磁盘的性能瓶颈,所以 kafka还有一个性能策略:零拷贝
消息从发送到落地保存,broker 维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。
在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不动的通过 socket 发送给消费者。虽然这个操作描述起来很简单,但实际上经历了很多步骤。如下:
操作系统将数据从磁盘读入到内核空间的页缓存 ▪ 应用程序将数据从内核空间读入到用户空间缓存中 ▪ 应用程序将数据写回到内核空间到 socket 缓存中 ▪ 操作系统将数据从 socket 缓冲区复制到网卡缓冲区,以便将数据经网络发出
这个过程涉及到 4 次上下文切换以及 4 次数据复制,并且有两次复制操作是由 CPU 完成。
但是这个过程中,数据完全没有进行变化,仅仅是从磁盘复制到网卡缓冲区。
通过“零拷贝”技术,可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数。现代的 unix 操作系统提供一个优化的代码路径,用于将数据从页缓存传输到 socket;
在 Linux 中,是通过 sendfile 系统调用来完成的。Java 提供了访问这个系统调用的方法:FileChannel.transferTo API。使用 sendfile,只需要一次拷贝就行,允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的。
2.3 缓存机制
Kafka实际上是一个由磁盘支持的内存队列(受缓冲区/页面缓存大小的限制)。
Kafka在确认写操作之前并没有调用fsync,ACK的唯一要求是记录已经写入I/O缓冲区。
但是,这种形式的写入是不安全的,因为副本的出错可能导致数据丢失,即使记录似乎已经被ACK。换句话说,与关系型数据库不同,仅写入缓冲区并不意味着持久性。保证Kafka持久性的是运行几个同步的副本。即使其中一个出错了,其他的(假设不止一个)将继续运行——假设出错的原因不会导致其他的副本也出错。因此,无fsync的非阻塞I/O方法和冗余的同步副本组合为Kafka提供了高吞吐、持久性和可用性。
3 Kafka维护消费偏移
如果是用kafka默认的api【org.apache.kafka.clients.consumer.KafkaConsumer】来消费,其消费者的offset会更新到一个kafka自带的topic下:consumer_offsets,默认有50个分区
计算指定group在consumer_offsets的哪个分区:Math.abs(groupID.hashCode()) % numPartitions
3.1 消费偏移的更新方式
无论是kafka默认api,还是java的api,offset的更新方式都有两种:自动提交和手动提交
3.1.1 自动提交(默认方式)
Kafka中偏移量的自动提交是由参数enable_auto_commit
和auto_commit_interval_ms
控制的,当enable_auto_commit=True
时,Kafka在消费的过程中会以频率为auto_commit_interval_ms
向Kafka自带的topic(__consumer_offsets
)进行偏移量提交,具体提交到哪个Partation:Math.abs(groupID.hashCode()) % numPartitions。
这种方式也被称为at most once,fetch到消息后就可以更新offset,无论是否消费成功。
3.1.2 手动提交
鉴于Kafka自动提交offset的不灵活性和不精确性(只能是按指定频率的提交),Kafka提供了手动提交offset策略。手动提交能对偏移量更加灵活精准地控制,以保证消息不被重复消费以及消息不被丢失。
对于手动提交offset主要有3种方式:
- 同步提交
- 异步提交
- 异步 同步 组合的方式提交
- 同步提交 提交失败的时候一直尝试提交,直到遇到无法重试的情况下才会结束,同步方式下消费者线程在拉取消息会被阻塞,在broker对提交的请求做出响应之前,会一直阻塞直到偏移量提交操作成功或者在提交过程中发生异常,限制了消息的吞吐量。
- 异步提交 异步手动提交offset时,消费者线程不会阻塞,提交失败的时候也不会进行重试,并且可以配合回调函数在broker做出响应的时候记录错误信息。 对于异步提交,由于不会进行失败重试,当消费者异常关闭或者触发了再均衡前,如果偏移量还未提交就会造成偏移量丢失。
- 异步 同步 针对异步提交偏移量丢失的问题,通过对消费者进行异步批次提交并且在关闭时同步提交的方式,这样即使上一次的异步提交失败,通过同步提交还能够进行补救,同步会一直重试,直到提交成功。 通过finally在最后不管是否异常都会触发consumer.commit()来同步补救一次,确保偏移量不会丢失
4 日志的清除策略以及压缩策略
4.1 日志删除
在Kafka的日志管理器中会有一个专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端参数log.retention.check.interval.ms
来配置,默认值为300000ms
,即5分钟。
当前日志分段的保留策略有2种:
- 基于时间的保留策略
- 基于日志大小的保留策略。
4.1.1 基于时间
日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值(retentionMs)来寻找可删除的日志分段文件集合(deletableSegments)
retentionMs可以通过broker端参数log.retention.hours
、log.retention.minutes和log.retention.ms
来配置,其中 log.retention.ms
的优先级最高,log.retention.minutes
次之,log.retention.hours
最低。
默认情况下只配置了log.retention.hours
参数,其值为168,故默认情况下日志分段文件的保留时间为7天。
查找过期的日志分段文件,并不是简单地根据日志分段的最近修改时间lastModifiedTime
来计算的,而是根据日志分段中最大的时间戳largestTimeStamp
来计算的。因为日志分段的lastModifiedTime
可以被有意或无意地修改,比如执行了touch
操作,或者分区副本进行了重新分配,lastModifiedTime
并不能真实地反映出日志分段在磁盘的保留时间。
要获取日志分段中的最大时间戳 largestTimeStamp
的值
首先要查询该日志分段所对应的时间戳索引文件,查找时间戳索引文件中最后一条索引项,若最后一条索引项的时间戳字段值大于 0,则取其值,否则才设置为最近修改时间lastModifiedTime
。
4.1.2 基于日志大小
日志删除任务会检查当前日志的大小是否超过设定的阈值(retentionSize)来寻找可删除的日志分段的文件集合(deletableSegments)。retentionSize可以通过broker端参数log.retention.bytes
来配置,默认值为-1,表示无穷大。注意log.retention.bytes
配置的是Log中所有日志文件的总大小,而不是单个日志分段(确切地说应该为.log日志文件)的大小。单个日志分段的大小由broker 端参数 log.segment.bytes
来限制,默认值为1073741824,即1GB。
基于日志大小的保留策略与基于时间的保留策略类似,首先计算日志文件的总大小size和retentionSize的差值diff,即计算需要删除的日志总大小,然后从日志文件中的第一个日志分段开始进行查找可删除的日志分段的文件集合deletableSegments。
4.2 日志压缩策略
Kafka 还提供了“日志压缩(Log Compaction)”功能,通过这个功能可以有效的减少日志文件的大小,缓解磁盘紧张的情况,在很多实际场景中,消息的 key 和 value 的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心 key 对应的最新的 value。因此,我们可以开启 kafka 的日志压缩功能,服务端会在后台启动Cleaner线程池,定期将相同的key进行合并,只保留最新的 value 值。日志的压缩原理如下图:
注意: 当启用压缩时,对批处理的影响特别明显,因为随着数据大小的增加,压缩通常会变得更有效。特别是在使用基于文本的格式时,比如JSON,压缩的效果会非常明显,压缩比通常在5x到7x之间。
此外,记录的批处理主要作为一个客户端操作,负载在传递的过程中,不仅对网络带宽有积极影响,而且对服务端的磁盘I/O利用率也有积极影响。