使用kafka可以对系统解耦、流量削峰、缓冲,可以实现系统间的异步通信等。在活动追踪、消息传递、度量指标、日志记录和流式处理等场景中非常适合使用kafka。这篇文章主要介绍下kafka中的基本概念。
1. kafka的整体结构
下图展示了很多关于kafka的细节,暂时先不用关注:
图中展示出了kafka的一些重要组件,接下来逐个介绍一下。
1.1 Broker
服务代理节点。其实就是一个kafka实例或服务节点,多个broker构成了kafka cluster。
1.2 Producer
生产者。也就是写入消息的一方,将消息写入broker中。
1.3 Consumer
消费者。也就是读取消息的一方,从broker中读取消息。
1.4 Consumer Group
消费组。一个或多个消费者构成一个消费组,不同的消费组可以订阅同一个主题的消息且互不影响。
1.5 ZooKeeper
kafka使用zookeeper来管理集群的元数据,以及控制器的选举等操作。
1.6 Topic
主题。每一个消息都属于某个主题,kafka通过主题来划分消息,是一个逻辑上的分类。
1.7 Partition
分区。同一个主题下的消息还可以继续分成多个分区,一个分区只属于一个主题。
1.8 Replica
副本。一个分区可以有多个副本来提高容灾性。
1.9 Leader and Follower
分区有了多个副本,那么就需要有同步方式。kafka使用一主多从进行消息同步,主副本提供读写的能力,而从副本不提供读写,仅仅作为主副本的备份。
1.10 Offset
偏移。分区中的每一条消息都有一个所在分区的偏移量,这个偏移量唯一标识了该消息在当前这个分区的位置,并保证了在这个分区的顺序性,不过不保证跨分区的顺序性。
简单来说,作为消息系统的kafka本质上还是一个数据系统。既然是一个数据系统,那么就要解决两个根本问题:
- 当我们把数据交给kafka的时候,kafka怎么存储;
- 当我们向kafka要回数据的时候,kafka怎么返回。
2. 消息如何存储(逻辑层面)
目前大多数数据系统将数据存储在磁盘的格式有追加日志型以及B 树型。而kafka采用了追加日志的格式将数据存储在磁盘上,整体的结构如下图:
追加日志的格式可以带来写性能的提升(毕竟只需要往日志文件后面追加就可以了),但是同时对读的支持不是很友好。为了提升读性能,kafka需要额外的操作。
关于kafka的数据是如何存储的是一个比较大的问题,这里先从逻辑层面开始。
2.1 Topic Partition的两层结构
kafka对消息进行了两个层级的分类,分别是topic主题和partition分区。
将一个主题划分成多个分区的好处是显而易见的。多个分区可以为kafka提供可伸缩性、水平扩展的能力,同时对分区进行冗余还可以提高数据可靠性。
不同的分区还可以部署在不同的broker上,加上冗余副本就提高了可靠性。
2.2 Offset
对于追加日志格式,新来的数据只需要往文件末尾追加即可。
对于有多个分区的主题来说,每一个消息都有对应需要追加到的分区(分区器),这个消息在所在的分区中都有一个唯一标识,就是offset偏移量:
这样的结构具有如下的特点:
- 分区提高了写性能,和数据可靠性;
- 消息在分区内保证顺序性,但跨分区不保证。
逻辑层面上知道了kafka是如何存储消息之后,再来看看作为使用者,如何写入以及读取数据。
3. 如何写入数据
接下来从使用者的角度来看看,如何将数据写入kafka。
3.1 整体流程
生产者将消息写入kafka的整体流程如下图:
在生产端主要有两个线程:main和sender,两者通过共享内存RecordAccumulator通信。
各步骤如下:
- KafkaProducer创建消息;
- 生产者拦截器在消息发送之前做一些准备工作,比如过滤不符合要求的消息、修改消息的内容等;
- 序列化器将消息转换成字节数组的形式;
- 分区器计算该消息的目标分区,然后数据会存储在RecordAccumulator中;
- 发送线程获取数据进行发送;
- 创建具体的请求;
- 如果请求过多,会将部分请求缓存起来;
- 将准备好的请求进行发送;
- 发送到kafka集群;
- 接收响应;
- 清理数据。
在消息累加器RecordAccumulator中来进行缓存,缓存大小通过参数buffer.memory
配置,默认32MB。累加器根据分区来管理每一个消息,其中消息又被组织成ProducerBatch的形式(通过batch.size
控制大小,默认1MB),为了提高吞吐量降低网络请求次数,ProducerBatch中可能包含一个或多个消息。
当消息不多时一个Batch可能没有填满,但不会等待太长时间,可以通过linger.ms
控制等待时间,默认0。增大这个值可以提高吞吐量,但是会增加延迟。
当生产消息的速度过快导致缓存满了的时候,继续发送消息可能会有阻塞或异常,通过参数max.block.ms
控制,默认60秒。
数据到达发送线程创建好请求之后,需要对其进行重新组合,根据需要发送到的broker节点分组,每个节点就是一个连接,每个连接可以缓存的请求数通过max.in.flight.requests.per.connection
控制,默认5。每个请求的大小通过max.reqeust.size
控制,默认1MB。
3.2 发送方式
消息的发送有三种方式:
- 发后即忘(fire and forget):只管发送不管结果,性能最高,可靠性也最差;
- 同步(sync):等集群确认消息写入成功再返回,可靠性最高,性能差很多;
- 异步(async):指定一个callback,kafka返回响应后调用来实现异步发送的确认。
其中前两个是同步发送,后一个是异步发送。不过这里的异步发送没有提供callback的能力。
那么生产者发送消息之后kafka怎么才算确认呢?这涉及到acks
参数:
acks = 1
, 默认值1,表示只要分区的leader副本成功写入就算成功;acks=0
,生产者不需要等待任何服务端的响应,可能会丢失数据;acks=-1
或acks=all
,需要全部处于同步状态的副本确认写入成功,可靠性最强,性能也差。
3.3 生产者重要参数
4. 如何读取消息
4.1 消费消息
4.1.1 消费模式
消息的消费一般来说有两种模式:推模式和拉模式,而kafka中的消费是基于拉模式的。消费者通过不断地调用poll
来获取消息进行消费,基本模式如下(伪代码):
while(true) {
records := consumer.Pull()
for record := range records {
// do something with record
}
}
4.1.2 位移提交
kafka中的消息都有一个offset唯一标识,对于消费者来说,每消费完一个消息需要通知kafka,这样下次拉取消息的时候才不会拉到已消费的数据(不考虑重复消费的情况)。这个消费者已消费的消息位置就是消费位移,比如:
假设9527当前拉取到消息的最大偏移量且已经消费完,那么这个消费者的消费位移就是9527,而要提交的消费位移是9528,表示下一条需要拉取的消息的位置。
消费者一次可能拉取到多条消息,那么就会有一个提交的方式问题。kafka默认使用的是自动提交,即五秒自动将拉到的每个分区中最大的消息位移(相关参数是enable.auto.commit
和auto.commit.interval.ms
)。不过这可能导致重复消费以及数据丢失的问题。
先看重复消费:
上一次提交的消费位移是9527,说明9526及之前的消息都已经被消费了;当前这次pull拉取到的消息是9527、0528和9529,因此,这次消费成功后要提交的唯一就是9530;消费者当前正在处理消息9528,如果此时消费者挂掉,如果此时还没有提交9530,那么9527到9529之间的消息都会被分配到下一个消费者,导致消息9527重复处理。
下面看一下消息丢失。还是上面的图,如果消费者刚拉取到9527到9529这三个消息,刚好自动提交了9530,而此时消费者挂了,那么还没有处理就提交了,导致这三条消息丢失。
4.2 分区分配策略
消息在kafka是的存储是分多个分区的,那么消费者消息分区的消息也就有一个分区分配策略。拿最开始的图来说就是下面consumer group这部分:
一共有三个分区,消费组1有四个消费组,所以有一个处于空闲状态;消费组2有两个消费组,所以有一个消费组需要处理两个分区。
kafka消费者的分区分配策略通过参数partition.assigment.strategy
来配置,有如下几种:
- Range:按照消费者的总数和分区总数进行整除运算来分配,不过是按照主题粒度的,所以可能会不均匀。比如:
- RoundRobin:将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式这个将分区一次分配给每个消费者。比如:
- Sticky:这个策略比较复杂,目的是分区的分配尽可能均匀,以及分配要尽可能和上次保持一致。
4.3 再均衡
消费者之间的协调是通过消费者协调器(ConsumerCoordinator)和组协调器(GroupCoordinator)来完成的。其中一项就是消费者的再均衡。
下面几种情况会导致消费者再均衡的发生:
- 有新的消费者加入;
- 有消费者下线;
- 有消费者主动退出;
- 消费组对应的组协调器节点发生变化;
- 订阅的主题或分区发生数量变化。
再均衡会经过下面几个步骤:
- FindCoordinator:消费者查找组协调器所在的机器,然后建立连接;
- JoinGroup:消费者向组协调器发起加入组的请求;
- SyncGroup:组协调器将分区分配方案同步给所有的消费者;
- Heartbeat:消费者进入正常状态,开始心跳。
5. 如何存储消息(物理层面)
在前面介绍了逻辑层面kafka是如何存储数据的,接下来在物理层面继续。还是这张图:
5.1 日志文件
kafka使用日志追加的方式来存储数据,新来的数据只要往日志文件的末尾追加即可,这样的方式提高了写的性能。
但是文件也不能一直追加吧,因此,kafka中的log文件对应着多个日志分段LogSegment。
采用分段的方式方便对其进行清理。而kafka有两种日志清理策略:
- 日志删除(Log Retention):按照一定策略直接删除日志分段;
- 日志压缩(Log Compaction):对每个消息的key进行整合,只保留同一个key下最新的value。
5.1.1 日志删除
日志删除策略有过期时间和日志大小。默认保留时间是7天,默认大小是1GB。
虽然默认保留时间是7天,但是也有可能保留时间更长。因为当前活跃的日志分段是不会删除的,如果数据量很少,当前活跃日志分段一直没能继续拆分,那么就不会删除。
kafka会有一个任务周期性地执行,对满足删除条件的日志进行删除。
5.1.2 日志压缩
日志压缩针对的是key,具有相同key的多个value值只保留最近的一个。
同时,日志压缩会产生小文件,为了避免小文件过多,kafka在清理的时候还会对其进行合并:
5.2 日志索引
日志追加提高了写的性能,但是对于读就不是很友好了。为了提高读的性能,就需要降低一点写的性能,在读写之间做一点平衡。也就是在写的时候维护一个索引。
kafka维护了两种索引:偏移量索引和时间戳索引。
5.2.1 偏移量索引
为了能够快速定位给定消息在日志文件中的位置,一个简单的办法就是维护一个映射,key就是消息的偏移量,value就是在日志文件中的偏移量,这样只需要一次文件读取就可以找到对应的消息了。
不过当消息量巨大的时候这个映射也会变很大,kafka维护的是一个稀疏索引(sparse index),即不是所有的消息都有一个对应的位置,对于没有位置映射的消息来说,一个二分查找就可以解决了。
下图就是偏移量索引的原理:
比如要找offset是37的消息所在的位置,先看索引中没有对应的记录,就找不大于37的最大offset是31,然后在日志中从1050开始按序查找37的消息。
5.2.2 时间戳索引
时间戳索引就是可以根据时间戳找到对应的偏移量。时间戳索引是一个二级索引,现根据时间戳找到偏移量,然后就可以使用偏移量索引找到对应的消息位置了。原理如下图:
5.3 零拷贝
kafka将数据存储在磁盘上,同时使用日志追加的方式来提升性能。为了进一步提升性能,kafka使用了零拷贝的技术。
零拷贝简单来说就是在内核态直接将文件内容复制到网卡设备上,减少了内核态与用户态之间的切换。
非零拷贝:
零拷贝:
6. kafka的可靠性
kafka通过多副本的方式实现水平扩展,提高容灾性以及可靠性等。这里看看kafka的多副本机制。
6.1 一些概念
下图展示了副本同步的一些重要概念(单个分区视角):
6.1.1 AR: Assigned Replicas
所有的副本统称为AR。
6.1.2 ISR: In-Sync Replicas
ISR是AR的一个子集,即所有和主副本保持同步的副本集合
6.1.3 OSR: Out-of-Sync Replicas
OSR也是AR的一个子集,所有和主副本未保持一致的副本集合。所以AR=ISR OSR。
kafka通过一些算法来判定从副本是否保持同步,处于失效的副本也可以通过追上主副本来重新进入ISR。
6.1.4 LEO: Log End Offset
LEO是下一个消息将要写入的offset偏移,在LEO之前的消息都已经写入日志了,每一个副本都有一个自己的LEO。
6.1.5 HW: High Watermark
所有和主副本保持同步的副本中,最小的那个LEO就是HW,这个offset意味着在这之前的消息都已经被所有的ISR写入日志了,消费者可以拉取了,这时即使主副本失效其中一个ISR副本成为主副本消息也不会丢失。
6.2 主副本HW与LEO的更新
LEO和HW都是消息的偏移量,其中HW是所有ISR中最小的那个LEO。下图展示了消息从生产者到主副本再同步到从副本的过程:
- 生产者将消息发送给leader;
- leader追加消息到日志中,并更新自己的偏移量信息,同时leader也维护着follower的信息(比如LEO等);
- follower向leader请求同步,同时携带自己的LEO等信息;
- leader读取日志,拉取保存的每个follower的信息(LEO);
- leader将数据返回给follower,同时还有自己的HW;
- follower拿到数据之后追加到自己的日志中,同时根据返回的HW更新自己的HW,方法就是取自己的LEO和HW的最小值。
从上面这个过程可以看出,一次同步过程之后leader的HW并没有增长,只有再经历一次同步,follower携带上一次更新的LEO给leader之后,leader才能更新HW,这个时候村能确认消息确实是被所有的ISR副本写入成功了。
leader的HW很重要,因为这个值直接决定了消费者可以消费的数据。
6.3 Leader Epoch
考虑下面的场景,初始时leader以保存了两条消息,此时LEO=2,HW=1:
正在上传图片...
在sync 1中follower拉取数据,追加之后还需要再请求leader一次(sync 2)才能更新leader和follower的HW。
这样在更新HW中就会有一个间隙,当sync 1成功之后sync 2之前follower挂掉了,那么重启之后的HW还是1,follower就会截断日志导致m2丢失,而此时leader也挂掉的话这个follower就会成为leader,m2就彻底丢失了(即使原来的leader重启之后也改变不了)。
为了解决这个问题,kafka引入了leader epoch的概念,其实这就是一个版本号,在follower同步请求中不仅仅传递自己的LEO,还会带上当前的LE,当leader变更一次,这个值就会增1。
由于有了LE的信息,follower在崩溃重启之后就不会轻易截断日志,而是会请求最新的信息,避免了上述情况下数据丢失的问题。
这篇文章通过简单的语言、简单的图,简单地描述了kafka中的一些重要概念。其实kafka是一个复杂的系统,需要更多的学习才能深入了解kafka。