kafka的架构及常见面试题
一、介绍
Kafka是一种高吞吐量、持久性、分布式的发布订阅的消息队列系统。它最初由LinkedIn(领英)公司发布,使用Scala语言编写,与2010年12月份开源,成为Apache的顶级子项目 。
Kafka是一个多分区、多副本且基于zookeeper协调的分布式消息系统。也是一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用 。
二、架构
1)生产、消费
首先得了解这个,比较简单的一个集群图
- 生产者(Producer):生产消息,发送消息的服务
- 消费者(Comsumer):消费消息,处理消息的服务
2)每一个kafka实例中有什么
如上图,只画了其中一个,具体看看里面是什么
- broker:一个kafka进程就是一个
broker
,也就可以这样理解,集群中每一台kafka服务就是broker
- 主题(topic):在发布订阅的模式下,我们需要对消息进行一个区分,同一个功能的消息,我们发往同一个主题下
- 分区(Partition):可以看到每一个主题topic下,有多个分区。消息会推送到这些分区里面,可以增加生产者,消费者的对消息生产处理的吞吐量
在没有画出来了另外两个kafka中,我们会推选出领导
Leader
,以及追随者Follower
,这个我们后面再聊
3)简单的消费
如上图,对于消费者如何消费分片中的消息的,其中有下面几点的解释
- 一个
Partition
只能由一个Consumer
来消费,一个Consumer
可以消费多个不同的Partition
。所以,我们应该保证每个主题的Partition
的数量大于Consumer
的数量 -
Consumer
越多,则吞吐量越高,消费得越快。当然,要结合第一点 -
Consumer
增加或减少时,Partition
和Consumer
的消费关系会自动调整
4)带group的消费
在上一节看到了简单的消费,那只不过是同一个group
下,接下来引入group
这个概念
上面第三节说的都不错,在这里就是要加一个前提条件,一个Partition
的消息同一个group
中的一个Consumer
来消费,
交给了同组的某个Consumer
,就不能交给同组的其他Consumer
了
每一个group
都可以完整消费主题中的所有消息
5)消费partition
里面的消息
如上图,有以下几点特性
- 一个Partition内部的消息是有序的,越新的消息
offset
越大。不同partition
的消息根据offset
无法比较新l旧 -
Consumer
顺序地消费partition
里的每一条消息,可以每读一条就向kafka上报(commit),当前读到了哪个位置(offset),也可以间隔性上报- 可以读几条再上报
offset
,比如说每读5条,上报更新一下offset
- 也可以时间间隔的方式上报
offset
,比如说每隔5s
上报更新
- 可以读几条再上报
-
Consumer
重启时kafka
根据该group
上一次提交的最大offset
来决定从哪个地方开始消费。- 这里就会出现重复消费的问题,而解决这个重复消费的问题,是面试中的高频问题。
- 不同
group
之间,记录的offset
是不同的,这也是上一节每个group
独立消费topic
的消息的原因
6)生产消息,写入Partition
关于生产者生产消息至Partition
,有三种情况,按照优先级这样排序
- 生产者可以指定
Partition
进行写入 - 通过消息携带的
key
,再通过hash
分发器计算得到结果,来决定去哪个Partition
- 按照时间片轮动选择
Partition
。比如说当前5
分钟,往Partition 0
中写入;下一个5
分钟,往Partition 1
中写入
7)生产消息,写入Partition
应答ack
在上面一节,我们确定了partition
存储是哪个,接下来还有一个问题,就是如果是kafka
集群架构的话,我们会出现同个Partition
,有一个Leader
,多个Follower
。
- 在上面确定
partition
后,我们要去寻找它的Leader
-
Leader partition
将消息写入本地磁盘- 当写入完成后,向
Producer
进行应答响应
- 当写入完成后,向
-
Follower partition
会将消息从Leader
那拉回来,写入自己的本地磁盘- 当写入完成后,向
Leader
进行应答响应 - 当
leader
收到所有的Follower
应答后,再向Producer
应答
- 当写入完成后,向
那么在此刻,生产消息的应答ack
有三种策略
- 完全不管
ack
应答 -
Producer
只需要Leader Partition
应答即可,不用管Follower Partition
是否写入成功 -
partition
需要保证所有的Follower
才进行应答
8)Partition
备份机制
在kafka
集群中,我们有Partition
的备份机制,如下
同一个主题下,集群中的每个broker
,都会维护自己的Partition
。
- 其中,他们会选出
Leader
、Follower
,生产者的数据优先推送给Leader
- 每一个
Partition
都有自己的Leader
- 同一个
Topic
下的,不同Partition
尽量分布在不同的broker
当有leader
的broker
宕机后,kafka
集群会重新竞选那台broker
上原本是leader
的Partition
,和下面ISR
队列有关。
9)消息的磁盘存储文件结构
- 分区
Partition
,一个Topic
中有多个Partition
,可以有效地避免了消息的堆积 - 分段
segment
,消息在Partition
里面,消息是分段来进行存储的,每次操作的消息读写都是针对segmengt
一个segment
包括一个log
文件,两个index
文件,三个文件成套出现。前面数字的文件名代表着offset
偏移量开始索引位置- 000000101.log:存储具体消息的数据文件
- 000000101.index:存储
Consumer
的offset
便宜量的索引文件 - 000000101.timeindex:存储消息时间戳的索引文件
- 索引
index
,kafka
分段后的数据建立的索引文件
如下图
可以看到上面有两个索引文件,
-
index
文件是记录offset
消息和log
文件中消息位置的映射关系的文件 -
timeindex
文件时记录时间戳和offset
关系的文件
请注意,这边的索引并不会记录每一条消息的索引,而是采取稀疏索引,也就是隔一段消息才会记录消息的索引。
这个消息索引的稠密程度,影响
kafka
存储读取的速度 索引越稠密,则读取的速度越快 索引越稀疏,则文件存储的空间越大
由于上面存储文件都是采用offset
偏移量来命名,所以kafka
会采取二分查找方法,可以大大提交检索效率。
三、面试题
1)如何避免kafka消息丢失
1.1)出现消息丢失的原因
从上面架构上来看,kafka
丢失消息的原因主要可以分为下面几个场景
-
Producer
在把消息发送给kafka
集群时,中间网络出现问题,导致消息无法到达- 网络抖动原因
Producer
消息超出大小限制,broker
收到以后没法进行存储
-
kafka
集群接收到消息后,保存消息至本地磁盘出现异常- 集群接收到数据后会将数据进行持久化存储到磁盘,消息都是先写入到页缓存,然后由操作系统负责具体的刷盘任务或者使用fsync强制刷盘。如果此时Broker宕机,且选举了一个落后
leader
副本限多的follower
副本成为新的leader
副本,那么落后的消息数据就会丢失。
- 集群接收到数据后会将数据进行持久化存储到磁盘,消息都是先写入到页缓存,然后由操作系统负责具体的刷盘任务或者使用fsync强制刷盘。如果此时Broker宕机,且选举了一个落后
-
Consumer
在消费消息时发生异常,导致Consumer
端消费失败- 消费者配置了
offset
自动提交参数,enable.auto.commit=true
。消费者接受到了消息,进行了自动提交。但其实消费者并没有处理完成,就宕机了,此时kafka
认为Consumer
已经消费了这条消息了,后续便不再分配,造成了消息的丢失
- 消费者配置了
1.2)解决方法——Producer
消息发送消息失败
关于上面Producer
消息发送消息失败的解决方法,总结归纳出五种,可以结合使用
- 生产者调用异步回调消息。伪代码如下:
producer.send(msg, callback)
- 生产者增加消息确认机制,设置生产者参数:
acks=all
。partition
的leader
接收到消息,等待所有的follower
副本都同步到了消息之后,才认为本次生产者发送消息成功了。 - 生产者设置重试次数。比如:
retrie>=3
,增加重试次数以保证消息的不丢失 - 定义本地消息日志表,定时任务扫描这个表自动补偿,做好监控告警。
- 后台提供一个补偿消息的工具,可以手工补偿。
1.3)解决方法——broker
写入磁盘失败
- 同步刷盘(不太建议)。同步刷盘可以提高消息的可靠性,防止由于机器没有及时写入磁盘的消息丢失。但是会严重影响性能
- 利用
Partition
的多副本机制(建议)。使用下面的这段配置,unclean.leader.election.enable=false
:表示不允许非ISR
中的副本被选举为leader
,以免数据丢失replication.factor>=3
:消息分区的副本个数,建议设置大于等于3
个min.insync.replicas>1
:这个值大于1
,要求leader
至少能和一个Follower
副本保证联系
1.4)解决方法——Consumer
消费异常
消费者需要关闭自动提交,采用手动提交offset
,enable.auto.commit=false
,并在代码中写入
// 同步提交
consumer.commitSync();
// 异步提交
consumer.commitAsync();
2)如何避免重复消费消息
这实际上是一个消息的幂等性问题
幂等性是指一个操作可以被重复执行,但结果不会改变的特性。在消息队列中,幂等性是指在消息消费过程中,保证消息的唯一性,不会出现重复消费的情况 。
我们有以下几个方案可以解决
- 对于一些业务相关的消息,我们通常有需要处理的消息业务主键。比如说,发送短信的发送流水号,支付业务的订单流水号等。
- 当消费者接受到消息后,使用这个消息主键建立获取分布式锁,同时将消息业务主键写入库。
- 如果第一步成功,消费者进行消费
- 当消费者处理完成后,释放分布式锁
- 如果有一条重复的消息进入,那么在第一步中就会失败,要么是分布式锁,要么是数据库主键冲突
- 针对没有业务的消息,可以再生产消息的时候给予一个分布式全局ID,后面的处理方法与第一条类似
- 在有状态流转的业务当中,一个消费者只消费一种业务状态,当这个消息的业务状态已经更新、已经处理。那么直接丢弃掉此次消息即可
- 乐观锁,消息在生产的时候携带业务上一次查询出的版本号,在消费时携带版本号去更新数据库。如果乐观锁原因导致失败,那么不需要进行后续处理
-
insert ... on duplicate key update
,消费插入数据时,数据已存在则进行更新
3)kafka的零拷贝是什么原理
- 第一次:将磁盘文件,读取到操作系统内核缓冲区;
- 第二次:将内核缓冲区的数据,copy 到 application 应用程序的 buffer;
- 第三步:将 application 应用程序 buffer 中的数据,copy 到 socket 网络发送缓冲区(属于操作系统内核的缓冲区);
- 第四次:将 socket buffer 的数据,copy 到网卡,由网卡进行网络传输。
如下图
取消掉两次CPU
的拷贝,从而减小CPU
的消耗。
零拷贝是操作系统提供的,如Linux
上的sendfile
命令,是将读到内核空间的数据,转到 socket buffer
,进行网络发送
还有Java NIO
中的transferTo()
方法
4)kafka如何在分布式的情况下保证顺序消费
在kafka
的broker
中,主题下可以设置多个不同的partition
,而kafka
只能保证Partition
中的消息时有序的,但没法保证不同Partition
的消息顺序性。
比如说,有一个主题Topic A
,里面有两个Partition
,但消费端只有一个Consumer
。根据上面的架构可以知道,这个Consumer
会消费两个Partition
中的消息,这样就肯定会出现消费乱序的情况。
那么针对上面这种乱序的情况,我们可以这样进行设置
- 一个主题只建立一个
Partition
,这样所有的消息也就只会发送到一个Partition
中,也就保证了消息的顺序性。Producer
也可以指定往一个partition
中发送消息。具体可以查看第二章第6节
- 可以保证一个
Partition
只能被一个Consumer
消费,也可以保证消息的有序性消费。但也要避免Rebalance
,原本一对一好好的,Consumer
宕机或者下线导致Rebalance
就会导致消费的乱序。
5)kafka为什么这么快
主要原因有下面几个
- 磁盘写入采用了顺序读写,保证了消息的堆积
- 顺序读写,磁盘会预读,预读即在读取的起始地址连续读取多个页面,主要时间花费在了传输时间,而这个时间两种读写可以认为是一样的。
- 随机读写,因为数据没有在一起,将预读浪费掉了。需要多次寻道和旋转延迟。而这个时间可能是传输时间的许多倍。
- 零拷贝:第3节提到过,避免了两次
CPU
拷贝,减少了CPU
的消耗 - 分区、分段、索引,再配合二分查找检索,提高消息的检索效率
- 分区
Partition
,有效避免了消息的堆积 - 分段
segment
,消息在Partition
里面,消息是分段来进行存储的,每次操作的消息读写都是针对segmengt
- 索引
index
,kafka
分段后的数据建立的索引文件,就是第二章第9节的文件存储结构
- 分区
- 批量压缩读写
- 多条数据一起压缩,存储,读取
kafka
是直接操作的page cache
,而不是堆内对象,读写速度更高。且进程重启后,缓存也不会丢失
6)什么是ISR
,它有什么用
在kafka
中,除了有ISR
,还有OSR
,AR
,功能如下
- ISR(InSyncRepli):在
kafka
中,当一个broker
宕机挂掉的时候,原本在其broker
的Leader Partition
会重新进行竞选。这个竞选基本从ISR
队列中选举。那么现在可以这样说,ISR
是一个维护了Follower Partition
的队列,其中的Partition
都与Leader Partition
消息保持一致。 - OSR(OutSyncRepli):没在
ISR
队列中的其他Follower Partition
组成的队列 - AR(AllRepli):全部分区的
Follower Partition
,也就是ISR
和OSR
的Partition
总和
7)kafka中的Rebalance是什么,什么时候会触发
Rebalance
是指Partition
与Consumer
之间的关系需要重新调整分配,这个重新调整分配的动作称为Rebalance
。
那么当出现下面几种情况的时候,会触发Rebalance
- 当一个
Group
中的Consumer
新增后 - 当一个
Group
中的Consumer
离开后,比如说宕机 - 当
Topic
下的Partition
数量发生变化后
总之,两边的关系数量发生变化的话,都会触发Rebalance
8)当kafka出现消息积压时,该怎么办
当出现上面这种情况的时候,要么就是Consumer
挂掉了或者消费水平太低,要么就是Producer
消息太多,间接导致Consumer
消费不及时。
针对上面这种情况,我们可以有以下的解决方案,可以结合使用
- 提高
Consumer
的数量,可以通过增加消费者组中的Consumer
数量或者增加Consumer
实例来实现。这样每个Consumer
可以并行处理消息,提高整体消费能力。 - 增加
Partition
分区数量,在kafka
中,可以设置主题下的Partition
,将消息分散至更多的Partition
中,配合第一点方案提高整体的消费能力 - 提高
Consumer
的消费能力,优化消费者的处理能力,确保Consumer
能够快速处理每条消息。将Consumer
处理消息的速度优化至高于Producer
生产消息的速度。在不破坏代码业务逻辑的情况下,也可以使用异步处理来消费消息。
在面试过程中,第三点方案是至关重要的,很多企业由于硬件资源的原因,没有增加Consumer
的数量,没有增加Partition
数量的空间。故此,Consumer
优秀的消费能力,就成了他们考察的目标了。
四、最后
我是半月,你我一同共勉!!!