kafka全面解析(一)

2021-04-23 11:12:27 浏览数 (1)

kafka基本结构

主题

kafka将消息抽象归纳一个主题,一个主题就是对消息的一个分类,生产发送消息到特定主题,消费者订阅主题进行消费

消息

消息是kafka通信的基本单位,由一个固定长度的消息头和一个可变长的消息体构成

分区和副本

kafka经一组消息归纳为一个主题,每个主题有被分为多个分区,每个分区在物理上对应为一个文件夹,分区编号从0开始,每个分区又有一到多个副本,分区的副本分布在集群的不同代理,以提高可用性,

kafka只能保证一个分区之内消息的有序性,并不能保证跨分区消息有序性,每个消息是顺序写到磁盘中的,因此效率很高

leader副本和follower副本

由于副本的存在,就需要保证一个分区的多个副本之间数据的一致性,kafka会选择该分区的一个副本作为Leader副本,而该分区其他副本为follower副本,只有leader副本处理客户端的读写请求,follower副本从leader副本同步数据.

偏移量

任何发布到分区的消息会直接追加到日志文件的尾部,每条消息在日志文件的位置都会有一个按序递增的偏移量,偏移量是一个在分区下严格有序的逻辑,但是并不代表在磁盘上有序,消费者可以通过控制偏移量来对消息进行消费,如消费者可以指定起始偏移量,为了保证消息被顺序消费,消费者已消费的消息对应的偏移量也许要保存。需要注意的是消费者对消费偏移量的操作并不会影响消息本身的偏移量。

日志段

一个日志被划分为多个日志端,日志段是kafka日志对象分片的最小单位,与日志对象一样,日志段也是一个逻辑概念,一个日志段对应磁盘上一个具体日志文件和两个索引文件,日志文件是以.log文件后缀的数据文件,用于保存消息实际数据,两个索引文件分别以.index,和.timeindex作为文件名后缀,分别代表消息偏移量索引文件,和消息时间戳索引文件.

代理

在kafka基本体系架构中我们提到了kafka集群,kafka集群就是有一个或多个kafka实例构成,我们把每一个kafka实例称为代理,也叫代理kafka服务器,在生产环境中kafka集群一般包括一台或者多台,每一台服务器上配置一个或多个代理,每一个代理都有唯一的表示id,这个id是非负整数,

生产者

就是生产消息,向代理发送消息,也就是向kafka代理发送消息的客户端

消费者和消费组

消费者拉取的方式拉取数据,他是消费的客户端,每一个消费者都属于一个消费组,我们可以为每个消费者指定一个消费组,如果没有指定就会属于一个默认的消费组,每个消费者也会有一个全局唯一的id,如果没有指定就kafka默认指定一个,同一个主题的一条消息只能被同一个消费组的某一个消费者消费,但不同的消费组的消费者可以同时消费消息,消费组是kafka实现对一个主题消费进行广播和单播的手段,实现广播只需指定各个消费者属于不同的消费组,消费单播则只需让各个消费者属于一个消费组就行

ISR

kafka在zookeepr中动态维护一个ISR,即保存的是同步的副本列表,该列表中保存的是与leader副本保持消息同步的所有副本对应的代理id,如果副本宕机或者落后太多,就会动态的从ISR列表中移除.

zookeeper

kafka利用zookeeper保存响应元数据信息,kafka元数据信息包括如代理节点信息,kafka集群信息,旧版消费者信息及其消费偏移量信息,主题信息,分区状态信息,分区副本分配方案信息.动态配置信息等,kafka通过监听机制监听节点元数据的变化,从而由zookeeper负责管理维护kafka集群,同时可以通过zookeeper很方便对kafak集群进行水平扩展以及数据迁移

kafka特性

  1. 消息持久化

kafak高度依赖于文件系统来存储和缓存消息,我们普遍认为磁盘读写慢,其实并不一定,关键是我们如何使用他,且操作系统提供了预读和延迟写技术,使得磁盘并不是很慢,由于与kafka是基于JVM的,而java对象内存消耗较大,却java对象增加jvm的垃圾回收也频繁和繁琐,基于上面原因kafka使用文件系统和依赖页缓存的存储比维护一个内存的存储或其他结构来存储消息更有优势,因此kafka选择文件系统存储数据。

基于消息系统本身的作用考虑,数据的持久化可以建立在简单对文件进行追加的实现方案上,因此顺序追加,所以kafka在设计上是采用时间复杂度O(1)的磁盘结构,他提供常量时间的性能,即使数据存储TB级数据,性能和数据的大小关系也不大

正如kafka将消息持久化,当机器宕机重启的时候,消息不会丢失

  1. 高吞吐量 kafka将数据写到磁盘,充分利用磁盘的顺序读写,同时kafka在数据写入及数据同步采用零拷贝技术,使用sendFile()函数,在两个文件描述符之间直接传递数据,完全在内核中操作,从而避免了内核缓冲区和用户区之间的拷贝,操作效率极高,还支持数据压缩以及批量发送,同时又有多个分区,因此kafka具有很高的吞吐量
  2. 扩展性

依赖zookeeper来对集群进行协调管理,使得kafka更加容易扩展

  1. 多客户端支持

支持java,scala,c ,c# ,Erlang等多种客户端

  1. kafka streams

在0.10版本引入了kafka stream,他是一个基于java实现用于流处理jar文件,

  1. 安全机制
    1. 通过SSL和SAAL验证生产者和消费者和代理连接验证
    2. 支持代理和zookeeper链接验证
    3. 通信数据加密
    4. 客户端读写权限认证
    5. 支持外部其他认证授权服务继承
  2. 数据备份

为每个主题建立分区,每个分区有一个或多个副本,对数据进行持久化备份

  1. 轻量级

kafka代理无状态,即代理不记录消息是否消费,消息偏移量的管理由于消费者自己或组协调器来维护,集群本身几乎不需要生产者和消费者的状态信息

  1. 消息压缩

支持Gzip,Snappy,LZ4这3种压缩方式,可以将多条消息放到一起组成messageSet,然后放到一条消息里面去,从而提高压缩比率而提高吞吐量。

kafka使用场景

  1. 消息系统
  2. 应用监控
  3. 网站用户行为跟踪
  4. 流处理
  5. 持久化日志

延迟操作组件

kafka将一些不立刻执行而要等待满足一定条件才触发完成的操作称为延迟操作,这类操作抽象为一个抽象类DelayedOperation,他是一个基于事件启动有失效时间的TimeTask,TimeTask实现了Runable接口,维护了一个TimerTaskEntry对象,TimerTaskEntry绑定了一个TimerTask,TimerTaskEntry被添加到TimerTaskList中,TimerTaskList是一个环形双向链表,按失效时间排序

  • tryComplete是一个抽象方法,由子类检测执行条件是否满足,如果没有满足则调用forceComplete完成延迟操作
  • forceComplete在条件满足时候,检测延迟任务是否为被执行,若没有执行则调用onComplete
  • oncomplete也是一个抽象方法,由子类实现,执行延迟操作满足条件后需要执行的时间业务逻辑
  • safeTryComplete方法,以synchronzed同步锁调用onComplete方法,供外部调用
  • onExpiration,也是一个抽象方法,有子类来实现延迟操作已经失效时间的相应逻辑处理。这操作是有kafka通过systemTimer来定时检测请求是否超时,内部维护一个线程池,用于提交响应的线程执行,例如当检测延迟操作已失效则将延迟操作提交到该线程值,即执行线程的run方法,DepalyedOperation覆盖了TimeTask的run方法,该方法先执行foreComplete方法,当该方法返回true在调用onExpiration。

DelayedOperationPurgatory

DelayedOperationPurgatory是一个对DelayOperation管理的辅佐类,他以泛型的形式将一个DelayedOperation添加到内部维护的Pool[Any,Watchers]类型watchersForKey对象中,同时将DelayedOperation添加到SystemTimer中,

其中watchers是Purgatory的内部类,底层是一个ConcurrentlinkedQueue,定义了ConcurrentLinkQueue类型的属性operation属性,用于保存DelayedOperation,其实就是对DelayedOperation进行监控,

Purgatory就是根据watchers对DelayedOperation进行管理,通过watchers调用DelayedOperation响应的完成,让DelayedOperation在delsyMs时间内完成,或者超时

DelayedProduce

代码语言:javascript复制
DelayedProduce(delayMs:long,produceMetadata:ProduceMetedata,
                replicaManager:ReplicaManager
                responseCallback:Map[TopicPartition,PartitionResponse]=>Unit)

DeplayedProduce是协助ReplicaManager完成相应延迟操作的,而ReplicaManager的主要功能负责生产者将消息写入Leader副本,管理Follower副本与Leader副本之间的同步以及副本之间的角色之间转换,DeplayedProduce显然是在生产者发送消息相关的延迟操作,因此只可能在消息写入到Leader副本时需要DelayedProduce的协助

当ProduceRequest的ack为-1的情况下,会创建一个DelayedProduce对象,而生产者在发送消息的时候,会调用ReplicaManager.appendMessage().追加消息到相应的leader分区之中,此时我们知道ack=-1。意味着生产者需要等待该分区的所有消息都与Leader副本同步之后才会进行下一条消息的发送,若要控制在分区个Follower副本和Leader副本同步完成只有在向生产者应答,就要发挥DelayedProduce的作用了

因此就可以看出DelayedProduce的作用就是协助副本管理器在ack=-1的场景下,延迟回调responseCallBack向生产者做出响应,具体就是当消息在分区的各个Follower副本完成了和Leader副本消息同步之后回调responCallBack给生产者。

DelayedFetch

DelayedFetch就是在FetchRequest处理时进行的延迟操作,而在kafka中只有消费者或Follower副本会发起FetchRequest,FetchRequest是由KafkaApis,handleFetchRequest方法处理,其中会调用ReplicaManager.fetchMessage()方法从相应分区的Leader副本拉去消息,在fetchMessage方法中创建DelayedFetch延迟操作。在拉取消息时候需要延迟操作,是为了本次拉取足够的数据。

DelayedJoin

DelayedJoin是组协调器在消费组准备平衡操作时候进行相应的处理,当消费组的状态转换为PreparingRebalance时候,即准备平衡操作,在组协调器的prepareRebalance(),方法中会创建一个DelayedJoin对象,并交给DelayedOperaionJoin负责监控

之所有加入DelayedJoin,是为了组协调器等待当前消费组下所有的消费者都请求加入了消费组,即发起JoinGroupRequest请求,每次组协调器处理完JoinGroupRequest时都会检测DelayJoin是否满足了完成执行条件

DelayedJoin响应方法的实现是调用GroupCoordnator相关方法来完成,Delayed.tryComplete调用GroupCoordinator.tryCompleteJoin方法,该方法判断是否还有未申请加入消费组的消费者,若所有消费者均申请加入了消费组,则表示DelayedJoin满足le完成执行的条件,否则继续等待,直到满足执行条件或超时。

DelayedHeartbeat

DelayedHeartbeat用于协助消费者与组协调器心跳检测相关的延迟操作。DelayedHeartbeat相关功能的实现是调用GroupCoordinator的响应方法来完成.

DelayedCreateTopics

在创建主题时候,需要为主题的每个分区分配到Leader之后,才会回调函数将创建主题结果返回客户端,DelayCreateTopic延迟操作等待主题的所有分区副本分配到Leader或是等待超时后调用回调函数返回到客户端;

控制器

在启动kafka集群中,每一个代理都会实例化并启动一个kafkaController,并将代理的brokerId注册到zookeeper相应的节点中,kafka集群会在各个代理中选举出一个代理作为Leader控制器,即Leader控制器,当控制器宕机的时候会重新选举新的leader控制器,控制器只要作用就是主题的创建和删除,分区和副本的管理以及代理故障转移,当一个代理被选举成控制器的时候,该代理的kafkacontroller就会注册控制器相应的权限,同时标记自己就是leader,当代理不再是控制器的时候,就要注销掉相应的权限,

  • controller_epoch,用于记录控制器更改的次数,每个向控制器发送请求都会带上这个字段,当controller_epoch的值小于内存中controller_epochd的值,则认为这个请求已经过期,当大于内存中的controller_epoch的值,则说明已经有新的控制器了,这个值是为了保证集群控制器的唯一性,进而保证操作的一致性
  • zkVersion,类似于乐观锁,用于更新zookeeper路径下的相应元数据信息,如controller_epoch,ISR信息等
  • leadre_epoch,分区leader更新次数,controller_epoch是对应代理的,而这个是对应分区的来说的,
  • 已分配副本,每个分区的所有副本的集合,简称AR
  • LeaderAndIsr,Kafka将Leader对应的brokerId和ISR列表封装成LeaderAndIsr类,Json串表示
代码语言:javascript复制
{
"leader":"leader对应brokerId",
  "leader_epoch":"leader更新次数",
  “isr”:"isr列表"
}

优先副本,就是AR中第一个副本称为preferred replica,也就是我们说的优先副本.理想情况下,优先副本即该分区的leader。

控制器初始化

  1. 创建controllerContext对象实例,他的作用就是缓存控制器各种处理操作所需要的数据结构,controllerContext会初始化控制器选举的次数,zkVersion,同时设置主题,副本,AR等,声明控制器与其他代理通信对象controllerChannelManager
  2. 实例化用于维护和管理分区的状态机且会给分区状态机注册两个监听器,topicChangeListener,用于监听/brokers/topic路径子节点的变化,DeleteTopicsListener用于监听/admin/delete_topic子节点的变更,
  3. 实例化一个对副本状态管理的状态机,且会定义一个内部BrolerChangeListener监听器,他监听zookeeper的/brokers/ids路径下的代理对应brokerids节点的变化,当该路径下的节点变化,就会触发监听器,该监听器就会调用ConrollerContext,controllerChannelManager对节点变化进行相应的处理
  4. 创建用于当前代理选举为控制器的zookeeperleaderElector选举对象,且会有两个回调函数,一个是初始化操作的oncontrollerFailover,一个是当新的控制器当选的时候,让当前控制注册的onConrollerResignation.kafak选举的策略就是在zookeeper创建一个临时节点,并注册一个leaderChangeListener,监听临时节点,当该节点变化时候,就会触发监听器,当新当选的控制器保存信息的时候,就会调用监听器的handleDataChange方法进行响应的处理,当监听到节点被删除的是,将触发oncontrollerResigination方法,同时触发选举机制.
  5. 创建一个独立定时任务kafakScheduler,该定时任务用于控制进行平衡操作,但仅仅在当前代理是leader控制器期间有效,当当前代理不在是leader控制器的时候,就会调用oncontrollerResignation方法进行关闭
  6. 声明一个对主题操作管理TopicDeletionManage对象
  7. 创建一个用户分区状态变化时,为分区选举leader副本的分区选举器PartitionLeaderSelector,
  8. 实例化ControllerBrokerRequestBatch.封装了 leaderAndRequestMap,stopReplicaRequestMap,updateMetadataRequestMap这3个集合,用于记录和缓存hadleStateChange方法产生的request.控制器将这些request交由ControllerbrokerRequestBatch.sendRequestToBrokers方法批量发出去,交由kafakApis调用响应的handle方法进行处理
  9. 实例化三个监听器,
    1. 用于监听分区重分配的partitionRessignedListener.
    2. 用于监听当分区状态变化时触发 preferredRelicaPartitionLeaderSeletcor选举器将优先副本选举为leader的preferredReplicaElectionListener.
    3. 用于监听ISR发生变化时将ISR变化通知给zookeeper进行更新操作,同时向多有代理发送元数据修改请求的isrChangeNotificationListener.

到此,控制器实例化过程结束,当一个代理启动就会创建一个kafkaController实例并启动,在启动kafakcontroller时,先注册一个用于监听zookeeper回话超时的监听器,sessionExpirationListener,然后启动控制器选举,让当前代理试图竞选控制器。

控制器选举过程

每个代理首先会从zookeeper中获取leaderid的信息,解析当前leader的LeaderId,若leaderId=-1,表示还没有节点成功当选leader,则将自身节点信息写入zookeeper中,如果leader!=-1表示已经有代理成为leader,

在抢占/controller节点时候,若出现已存在异常,就获取leader且更新内存的leader的值,否则将leader设置为-1,且删除临时节点.这个时候就会触发重新选举leader.同时节点变化就会触发leaderChangeListener.handleDataChange方法,这时其他代理将通过当前的leaderId和自己的brokerid比较,如果自己是之前的leader,而现在leaderId和自己的brokerid不一样,则自己退位,回调onControllerResignation函数.

故障转移

可以触发控制器进行选举的有三种情况

  1. 在控制器启动的时候
  2. 控制器发生故障转移的时候
  3. 当心跳检测超时的时候

故障转移其实就是控制权的转移就是重新选出新的控制器,而控制器实例化创建一个zookeeperLeaderElector对象,实例化需要对象需要回调两个函数,分贝是是当选的控制器进行初始化,以及注册响应权限的onControllerFailVover方法,和注销相应权限的onControllerResignation方法,

onControllerFailover操作

  1. 从zookeeper的/controller_epoch路径读取当前控制器轮值次数,并更新到当前cntrollerContext中。
  2. 将控制器的轮值次数加1,并尝试去更新zookeeper中/controller_epoch中记录的轮值次数的值,若更新失败则表示当前的控制器也被其他控制器替代,因此当前代理成为控制器相关初始化处理将以异常而告终,若更新失败是因为不存在/controller_epoch节点,则表明控制器是第一次启动,第一个控制器当选,因此zookeeper中创建该节点并写入控制器轮值次数,同时更新ControllerContext中缓存的与轮值次数相关的数据。
  3. 注册分区管理相关的监听器,例如分区重分配操作监听器,isr发生变化的监听器,以及将优先副本选为分区leader操作
  4. 注册主题管理的监听器,例如主题变化的监听器,删除主题监听器
  5. 注册代理变化处理的监听器,例如代理变化进行相应的处理
  6. 初始化controllerContext,即当代理成为控制器后,原控制器所持有的controllerContext将被重新赋值.
  7. 启动分区状态机和副本状态机。
  8. 从controllerContext中读取所有主题,轮询每个主题,为每个主题添加用于监听分区变化的partitionModificationListener.
  9. 检测当前是否分区需要触发分区重分配操作。若需要,则进行一次分区重分配操作
  10. 检测当前是否需要从优先副本选举为leader的分区,并进行相应的操作
  11. 向kafak集群发送元数据更新操作
  12. 根据配置决定是否创建用于分区平衡操作的定时任务
  13. 启动删除主题管理的topicDeletionManage组件

以上工作主要是对完成相应元数据的初始化以及对代理,主题,分区等变化感知的监听器的注册,和启动相应管理组件

oncontrollerResignation操作

  1. 取消该控制器在zookeeper中注册的用于对分区及副本变化感知的监听器的监听,关闭删除主题操作的TopicDeletionManager
  2. 在获取controllerContext维护的重入锁的条件下取消对分区ISR变化监听关闭分区状态机和副本状态机,关闭控制器与其他代理进行通讯的controllerChannelManager
  3. 最后将controllerContext中用于记录控制器轮值次数及轮值树对应的epochzkVersion字段置零,将当前代理状态设置成RunningAsBroker,即当前代理不再是控制器的角色。

分区平衡

分区自动平衡是通过分区的优先副本选为分区的leader,通常当分区副本是通过kafka自动分配,会保证分区副本分配在不同的代理节点,即使用优先副本的第一个副本当做leader,这样的分配是一个相对平衡的状态,但是随着时间推移,部分节点变化导致重新选举分区leader,此时优先副本发生故障,然而使用其他副本选举为leader,之后即使优先副本恢复,也不能成为leader.因此我需要进行自动平衡,具体步骤如下

  1. 从controllerContext的partitionReplicationAssignment数据结构中查询出当前所有可用的副本,根据分区AR的头结点进行分组
  2. 轮询所有代理节点,判断该节点是否要进行优先副本选举,判断的依据是根据分区不平衡率,如果超过指定的平衡率且分区没有进行分区重分配和优先副本选举操作以及当前没有删除主题操作,则调用onPreferredRepllicaElection,执行优先副本选举,让优先副本成为分区leader

协调器

kafka有三种协调器,消费者协调器,组协调器,任务管理协调器,

kafka高级消费者是强依赖zookeeper,每一个消费者在启动的时候都会在zookeeper对应的路径下,注册监听器,当节点发生变化的时候,消费者进行平衡操作,由于这种方式,当消费组的任何一个消费者发生变化,同一个组的消费者都会进行平衡操作,而消费者之间并不知道其他消费者的状态,回导致kafka工作在一个不正确的状态,同时这种方式完全依赖zookeeper,以监听的方式管理消费者存在两个缺陷

  1. 羊群效应,任何代理或消费者变化,所有消费者同时进行平衡操作,因为对同一个路径监听,就可能导致死锁
  2. 脑裂问题,平衡操作的时候每个消费者都和zookeeper进行通讯,判断消费者状态不一致,zookeeper本身特性同一时刻各个消费者的状态可能不一致,导致kafka运行在不正确状态下

由于上面原因,新版kafka引入了消费者协调器,负责同一个消费组下各个消费者与服务端组协调器之间的通信,服务端引入了组协调器,用于管理部分消费组和该消费组下每个消费者的消费偏移量

消费者协调器

消费者协调器是kafkaconsumer的成员变量,使用他和组协调器进行通信,且是消费者私有的,因此只有对应的消费者才可见,不同消费者不可见,可以简单理解是消费者的一个代理,但是又不是代理,消费者协调器是消费组管理相关请求的发起者

消费者协调器主要责任如下

  1. 负责本消费者加入消费者前后的处理
  2. 负责消费者离开消费组
  3. 负责向组协调器发送提交的偏移量
  4. 让组协调器感知自己的状态、
  5. 负责执行分区的分配

当消费者协调器向组协调器请求加入的消费组后,组协调者会为同一个组下选出一个leader,leader的消费者的协调器负责消费者与分区的分配,会再给组协调器一个请求,这个请求回到有分配的结果,组协调器会把分配的结果再返回给follower消费者的协调器,而非leader也会有一个请求,但是这个请求中的分配结果是空的, 这种的方式,将分区分配的职责交个客户端自己处理,从而减轻服务端的负担

组协调器

组协调器负责对管理的组员提交的相关请求进行处理,组员即消费者,他负责管理与消费者之间建立连接,并从与之连接的消费者之中选出一个消费者作为leader消费者,同时管理与之连接的消费者偏移量的提交,每个消费者消费偏移量保存到kafka的内部主题中,并通过心跳来检测消费者与自己的连接状态。

消费者加入组的过程

  1. 消费者被创建后通过消费者协调器选择一个负载最小的节点,然后向该节点发送查找组协调器的请求
  2. 调用该节点的组协调器方法,通过请求带的groupid,取其hashCode值与kafka内部主题分区总数取模获得一个分区,该分区的leader副本所在节点及消费者的组协调器。
  3. 找到组协调器之后申请(joingrouprequest)加入消费组
  4. 首先根据groupid信息获取或构造该消费者的GroupMetadate信息
  5. 然后根据消费者的clientid和一个uuid拼接组成字符串作为消费组的memeberid,并构造MemeberMetadata信息,然后将信息注册到GroupMetadata中
  6. 如果这个消费者是第一个消费者,就把这个memberid作为leaderid,即leader消费者,同时指定分区分配策略,最后构造JoinGroupResult对象,回调responseCallBack返回消费者.
  7. 选出消费者leader后,消费组成员发送SyncGroupRequest请求,leader消费者为消费者分配分区,并构造sysnGroupRequest请求时候上传分区分配结果,非leader消费者的分区分配结果为空
  8. 组协调器收到请求后,一直等待leader消费者的请求处理完后,在进行回到处理,向消费者组的所有消费者做出响应,在返回响应时候会将分区分配结果发送给各个消费者
  9. 最后将消费者与分区的对应关系写入kafka内部主题

消费偏移量管理

新版kafka将消费偏移量保存到kafka一个内部主题中,当消费者正常运行或者进行平衡操作时候向组协调器提交当前的消费偏移量.组协调器负责消费组的管理和消费偏移量管理,但客户端可以仅仅选择让组协调器管理偏移量,如客户端指定了分区的时候,就不需要kafka负责分区的分配了

当组协调器收到偏移量的提交请求时候,会检查是否满足以下条件

  • 是该消费者组的成员提交的偏移量
  • 仅选择让组协调器负责消费便宜来那个的管理的消费者提交的请求

如果都不满足提交的条件就会调用回到函数返回响应的错误码

具体过程如下

  1. 调用GroupCoordinator.docommitOffsets,如果是第一种需要租协调器管理消费组,因此比第二种条件多一步操作,即让延迟的心跳检测执行完成,更新收到的心跳时间戳,
  2. 之后的逻辑操作保持一致,调用GroupMetadataManage.prepareStoreOffset方法,主要是构造消费者偏移量相关信息,以及封装一个在偏移量对应的消息成功追加到kafka内部主题之后回到方法putCacheBack
  3. 执行完GroupMetadataManage.prepareStoreOffset后返回一个delayedstore对象,该对象交由GroupMetadataManage.store方法处理
  4. 在store方法中调用relicaManager.appendMessages方法将偏移量追加到kafka内部主题中,
  5. 之后回调的putcacheCallback方法会更新缓存中记录的分区与offsetAndMetadata的映射信息,并回调responseCallback方法

网路通信

kafkaserver启动时候,初始化一个socketServer服务,用于接受客户端连接,处理客户端请求,发送响应,同时创建一个kafkaRequestHandlerPool用于管理kafkaRequestHander,

SockerServer是基于java NIO实现的网络通信组件,线程模型是一个Acceptor线程负责接受客户端所有的连接,N个processor线程,每个processor有多个selector,负责从每个连接中读取请求,M个hander线程处理请求,并将产生的请求返回给processor,而handler是由kafkaRequestHanderPool管理,在processor和hander之间通过requestChannel来缓存请求,每个hander从requestChannel.requestQueue接受RequestChannel.Request.把Request交由KafkaApi的hander处理,然后处理后把对应的response存进RequestChannel.responseQueue队列。

Accept

他主要是监听并接受客户端连接的请求,建立和客户端的数据传输通道serverSockerChannel,然后为客户端指定一个processor

他接受客户端新的连接,创建sockerChanel,以轮询的方式交由processor处理,添加到processor的newConnections队列并唤醒processor线程,这样就建立了客户端与kafkaserver之间的通信通道

Processor

他也是一个线程类,主要用于客户端读取请求数据和将响应结果返回给客户端RequestChannel

他是为了给processor线程与handler线程之间通信提供数据缓存,是通信过程中Request和Response缓存的通道,是processor线程与hander线程交换数据的地方

SocketServer

socketServer启动就可以通过Acceptor接口客户端请求,交由Acceptor对应的Processor处理,Processor线程将请求添加到RequestQueue队列中,Hander从RequestQueue取出请求分发处理,然后将结果存入responseQueue队列中,添加response时会唤醒与之对应的processor,Processor从RequestChannel.responseQueues队列中取出自己对应的responseQueue队列根据ResponseAction进行相应处理。

日志管理器

日志管理器是kafka用来管理所有日志,包括日志的创建,删除,日志检索,日志加载和恢复,检查点,以及日志文件刷写磁盘,日志清理。

在kafka中,每个主题之间互相独立,每个主题在逻辑上由一个或多个分区构成,分区树可以在创建主题的时候创建,也可以在主题创建后在修改,但只能增加一个主题的分区数,而不能减少分区数,

存储结构上分区的每个副本在逻辑上对应一个log对象,每个log有划分为多个LogSegment,每个LogSegment包括一个日志文件和两个索引文件,一个索引文件是偏移量索引文件,另一个是时间戳索引文件,每个log对象中维护了一个concurrentSkipListMap,其底层是一个跳跃表,保存主题的每个分区对应的所有logsegment,kafka将日志文件封装成一个FileMessageSet对象,两个索引文件分别封装成OffsetIndex,和TimeIndex对象,

如最上面图显示,分区对应的目录的命名规则为主题名-分区编号,分区编号从0开始顺序递增,分区编号最大值为分区总数键1,数据文件的命令规则是由数据文件第一条消息的偏移量(基准偏移量),左补0构成20位数字字符组成,每个分区第一个数据文件的基准偏移量为0,因此每个分区的第一个日志文件为000000000000000000000.log,索引文件的命名是一样,如000000000000000000.index,0000000000000000000.timeindex.

如我创建一个主题log-format,且分区为3,副本为2,其分布如下图

消息组成部分

每条消息有一个固定长度的消息头和一个可变长度的payload组成,payload也称为消息体,上图是消息结构的说明,在实际存储的消息总长度还包括12字节额外的开销,这个12字节包括两部分,其中8个字节代表消息的偏移量,另外4字节代表消息的总长度,因此一个当前版本kafka一条消息固定长度为34字节

数据文件

下图为一个数据文件的部分内容

可以看到,offset=1与offset=0的消息的position只差为35,用35减1(payloadSize)为消息的固定长度34,

数据文件存储的消息,由于是第一个数据文件,因此起始偏移量为0,上面内容的postition代表的是在数据文件中的实际位置,之后每条消息的position为前一条消息的postion于消息固定长度和消息总长度之和,CreateTime表示消息时间类型为消息创建时间,isValid表示消息CRC校验是否合法,payloadSize表示消息实际长度,CompressCodeec表示消息压缩方式,crc表示消息的crc32校验和,payload表示消息的实际内容

偏移量索引文件

本来在kafka是将消息分段保存在不同的文件中,同时每条消息都一个唯一的偏移量,数据文件已该文件基准偏移量左补0命名,并将每个日志段以基准偏移量key保存到concurrentSkipListMap中,这样查找指定偏移量的消息时候,用二分法找到消息所在的段文件,但是为了进一步提高查找效率,kafka为每个数据文件创建了一个基于偏移量的索引文件,该索引文件文件名和数据文件相同,后缀为index,

如果我们要查找指定偏移量为23消息,如下步骤

  1. 根据二分法到map中找到对应的日志段
  2. 日志段包含对应的index,和log,如图发现对应的0000000.index,和000000.log
  3. 在通过二分法在偏移量索引文件中找到不大于23的最大索引项,即offset=20那一栏
  4. 然后根据索引项的position=320,到日志文件中具体的物理位置为320的位置开始寻找
  5. 直到找到offset=23的消息

时间戳索引文件

该时间戳索引文件和对应的数据文件名称一样,以timeindex为后缀,该索引文件包括一个8字节的时间戳字段,和一个4字节的偏移量字段

如果我们要查找时间戳为1557554753430的消息

  1. 1557554753430和每个日志段中最大的时间戳对比,取出第一个不小于1557554753430所对应的日志分段,日志分段中最大时间戳的计算是先查询日志分段对应时间戳索引文件,找到最后一条索引项,如果最后一条索引时间戳字段值大于0,取该值,否则去该日志分段的最近修改时间
  2. 找到对应的日志分段之后,使用二分法定位,找到不大于1557554753430最大索引项,也就是[1557554753420,430]
  3. 拿着偏移量为430到偏移量索引文件中使用二分法找到不大于430的最大索引项,即[20,320]
  4. 日志文件中从320的物理位置开始找不小于1557554753430的消息

日志清理

kafka提供了两种策略,日志删除,和日志压缩,通过参数cleanUp.policy指定日志清除策略,可以控制到主题级别,主题级别策略会覆盖代理级别的配置策略

日志删除

在日志管理器启动有一个定时任务线程用于定时的删除日志段文件,默认是5分钟执行一次,可以通过${log.retention.check.interval.ms}设置

kafka提供了基于日志保留时长和日志段大小两种日志删除配置方式

日志保留时长

默认是7天,可以通过log.retention.minutes设置,要主要的是查找保留时长的日志段文件,并不是剪短的依据日志单最晚更新时间,他并不能代表真正反映日志单在瓷片的保留时间,如分区副班重分配是后该日止更新时间会被修改

因此最长时间是通过查询日志分段的时间戳所以你文件,查到到时间戳索引文件中最后一项索引项,若索引项的时间戳字段大于0,就取改值,否则去最近修改时间

在计算出日志最长时间后,从最早日志段文件依次扫描直到第一个不满足超时条件的段文件,查找到要删除的文件,若删除的日志单总数等于该分区日志段的总数,说明所有日志段均过期,但该分区下至少要有一个日志段接受消息的写入,因此,需要切分一个新的日志段,然后迭代删除待删除的日志段文件,

删除的过程如下

  1. 从日志对象中所维护日志分段的跳跃表中移除待删除的日志分段,保证没有线程对这些日志段进行读取操作
  2. 这些日志分段所有文件添加.delete后缀
  3. 后天有一个名为delete-filed的定时任务进行删除

基于日志大小

日志删除任务会检查当前日志大小是否超过设定值,通过log.retention.bytes设置,

删除过程如下

  1. 计算需要被删除的日志总大小(当前日志文件大小-retention值)
  2. 从日志文件第一个logSegment开始查找的日志段的文件集合
  3. 执行删除

日志压缩

这种策略是一种更细粒度的清理策略,他是基于消息的key,通过压缩每个key对应的消息只保留最后一个版本的数据,该key对应其他版本在压缩时会被清除,类似数据库的更新操作,压缩策略将可key对应值为空的消息,认为是直接删除该消息,为了不影响日志追加操作,日志压缩并不会在活跃段进行操作,同时对非活跃段压缩也不是一次性执行,而是分批进行

需要注意将日志清理与日志删除区分开,日志删除是删除整个日志段,而日志清理是将相同key的日志进行合并,只保留key最后一个值,将合并后的数据构成新的日志段,同时删除原来的日志段,

副本管理器

kafka 0.8 版本引入了副本管理器,引入副本机制使得kafka能够在整个集群中只要保证至少一个代理存活就不会影响整个集群

kafka存活的条件

  1. 一个存活的节点必须和zookeeper保持连接,维护zookeeper的session(通过zookeeper的心跳机制实现)
  2. 如果一个节点作为follower副本,该节点必须能及时与分区leader副本保持消息同步,不能落后太多

leader副本会跟踪所有同步的节点,一旦一个节点宕机,卡主,或者延迟太久,leader副本就会将该节点从同步副本集合列表中移除,

如何判断代理卡主或者下线

  1. kafka0.9版本根据配置型${replica.lag.time.max.mx}决定,默认是10秒,
  2. kafka0.9之前的版本是通过配置项replica.lag.max.messages,配置follower落后leader消息条数来定义某个代理是否落后太多,但是0.9版本移除了,是因为消息的条数并不能正式反应代理的落后太多。

副本的数量可以通过以下方式配置(默认副本数是1)

  1. server.properties中通过配置default.replication.factor=n
  2. 也可以在创建主题的时候通过设置--replication-factore n 指定副本数

副本管理器负责副本管理,主要包括对控制器发送的leaderAndIsrRequest指定,stopReplicaRequest指令以及UpdateMetedataRequest指令进行处理,维护副本ISR变化,以及Follower与Leader数据同步的管理

首先我们现在先介绍一下分区和副本的相关知识

分区

kafka将一个主题在逻辑上分成一个或多个分区,每个分区在物理存储上对应一个目录,目录名为${topicName}-${partionId}其中topicName为主题的名称,partionId是分区编号,每个主题的分区都有一个唯一的编号,从0递增.分区数可以大于节点数,但是副本数不能大于节点数,因为副本需要分布在不同的节点上,这样才能达到本分的目的。

如上图所示每个分区,在存储结构上有LEO和HW两个重要的概念

LEO是log end offset的缩写,表示每个分区最后一条消息的位置,分区每个副班都有自己的LEO

HW是Hight Watermark的缩写,将一个分区对应的ISR中最小的LEO作为HW,HW之前的消息表示已提交的消息,对消费者是可见的,消费者最多只能消费到HW所在的位置,HW之后的消息表示还没有被Follower副本同步完成,每个副本都有自己的HW,副本leader和follower各自负责更新自己HighWatermark状态,Follow.hw<=leader.leo

对于分区leader副本,leo和hw的存储结构示意图如下

每个主题的一个分区只能被同一个消费者下的其中一个消费者消费,因此我们说分区是消费并行度的基本单位,

副本

一个分区可以有一个或多个副本,副本根据是否接受读写其请求,分为Leader副本和follower副本,一个分区有一个leader副本,有0个或多个Follower副本,leader副本处理分区的所有读写请求并维护自身及Follower副本的状态信息,如LEO,HW,等副本作为消费者从leader副本拉去消息进行同步,当leader失效时候,通过分区leader选举器从副本类表中选出一个副本作为新的leader.

如果brokerid与当前代理brokerId相同时候,我们将该副本叫为本地副本,否则叫远程副本,副本抽象一个Relica对象,这个对象的属性包含了主题,分区,代理编号,还有LEO,HW,副本追加数据的log,以及上次与leader同步的时间,因此还有logEndoffsetMetadata,higheWatermarkMetadata,Log和lastCaughtUpTimeMsUnderlying属性字段,对于远程副本而言log字段对应的值为null,因为远程部分log并不在当前代理上,logEndoffsetMetadata表示已追加到log的最新对应的偏移量,不过本地副本和远程副本获取此字段方式不同,远程副本由于log属性为空,因此并不能直接从本地获取,而该字段的值是由远程副本对应的代理发送请求进行更新,对于Follower副本highWatermarkMetadata的值是从Leader副本获取更新.

副本过期检查

follower把leader的LEO之前的日志全部同步完成时候,则认为follower已经赶上了leader,此时会以当前时间更新该副本的lastCaughtUpTimeMs字段,kafka副本管理器会启动一个过期检查的定时任务,这个任务会定期检查当前时间与副本lastCaughtUpTimeMs之差是否超过replica.lag.time.max.mx值,如果大于,则会把这个副本提出ISR副本集合

假设最右侧的foller副本被提出ISR集合,这个分区的HW就会发生变化变成3.

追加消息

副本管理器用appendmessage方法将消息写入副本的处理逻辑如下

代码语言:javascript复制
appendMessages(timeout:Log,
               requiredAcks:Short,
               internalTopicsAllowed:Bollean,
               messagePerPartition:Map[Topicpartition,MessageSet],
               responseCallback:Map[TopicPartition,PartitionResponse]=>Unit)
  1. 检查ack的的合法性,ack值为0,-1,1,如果不是这三个值,表示不合法,回调用responseCallBack,若合法,回调用appendToLocalLog将消息写入leader副本并得到个TopicPartition对应的消息追加操作状态。
  2. 检查是否满足延迟生产操作,若同时满足以下条件,则需要擦混管家爱你delayedProduce延迟操作
    1. acks=-1,即ISR列表中的所有follower副本要从leader副本将消息同步到本地
    2. messagePerPartition集合不为空,即消息与主题和分区映射关系不能为空,客户端本次请求需要有数据写入
    3. 至少要对一个分区的消息追加成功
  3. 若满足创建延迟操作的条件,则创建一个delayedProduce对象并交由delayedProducePurgatory管理,由DelayedProduce回调responseBack,向客户端返回追加操作结果状态,否则直接回调responsecallback将appendToLocalLog方法对各TopicPartition消息追加操作的状态返回给客户端

拉去消息

  1. 首先根据replicaid设置是isFromFollower值,用于区分是Follower拉去消息还是普通消费者拉去消息,如果是消费者拉去消息设置fetchOnlyCommitted表示为true,表示值消费已提交的消息,否则设置成false,还有fetchOnlyFromLeader的值,如果是消费者模式设置为false,其他场景为true
  2. 设置完表示后,从leader副本拉去消息
  3. 判断是否是Follower模式,则更新follower副本LEO,且检查是否要扩张ISR列表
  4. 检查DelayedProduce操作是否满足执行条件,让其执行完成
  5. 检查是否满足立刻对FetchRequest做出拉取响应的条件,若满足,则构造响应结果回到reponseBack,否则构造一个DelayedFetch对象,交由delayedFetchPurgatory进行管理,若满足条件后回调responseBack.

副本数据同步过程

  1. 生产者在发布消息到某个partition时,先通过zookeeper找到该分区的leader,然后生产者将该消息发送到该分区的leader.
  2. leader会将消息写到log,每个follower都从leader pull数据,这种方式Follower存储的数据顺序可以与leader保持一致,Follower在写入其log后,向leader发送ACK
  3. 一旦leader收到了isr中的所有副本的ack,就认为该消息已经被提交,leader将增加HW并向生产者发送ACK

在初始状态下,leader和follower的HW和LEO都是0,leader副本会保存remote leo,表示follower的LET为0,这个时候,producer没有发送消息,follower会不断向leader不断的向leader发送fetch请求,但是因为没有数据,这个请求就会被寄存,在指定时间内(replica.fetch.wait.max.ms)后会被强制完成,如果在指定时间内,有消息发送过来,就会唤醒Fetch请求,让leader继续处理

数据的同步分为两种情况

  1. leader处理完prducer请求之后,follower发送一个fetch请求过来
  2. follower阻塞在leader指定时间内,leader副本收到producer请求

第一种情况同步不走如下

  • leader处理完prducer请求 leader请求收到请求之后,会做如下事情
    1. 把消息追加到log文件,同步更新leader副本的LEO
    2. 尝试更新leader HW值,这个时间由于follower副本还没有发送fetch请求,那么leader保存的remote leo还是为0,leader会比较自己的leo和远程leo的值,发现最小值为0,与HW值相同,所以不会更新HW
  • follower发送fetch请求,leader副本处理fetch请求
    1. 读取log数据,更新remote leo=0,因为follwer还没有写入数据,这个值是根据follower的fetch请求中的offset来确定的
    2. 尝试更新HW,此时leader的leo,和remote leo不一致,因此leader HW=0
    3. leader把消息内容和当前分区的hw值发送给follower副本
    4. follower副本接收到response以后,将消息写入本地log,同时更新follower的leo
    5. 更新follower hw,本地的leo和返回的hw比较取小的值,所以仍然是0.
  • follower发第二次fetch请求,leader处理请求
    1. 读取log数据
    2. 更细remote le0=1,因为这次携带offset=1
    3. 更新当前分区HW,这个时候leader leo和remote leo都是1,因此更新hw=1
    4. 再把数据和当前分区的hw值返回follower副本,这个时候没有数据,返回为空,
    5. follower副本接到response的时候,如果有数据写入,并且更新leo
    6. 更新follower的hw值,hw=1

第二种情况

leader接受到新的数据,当leader收到请求会唤醒处于阻塞Fetch请求,处理过程和第一种基本一样,

  • leader接受请求写入log,更新leader的leo
  • 唤醒follower的fetch请求
  • 更新HW

我们知道min.insync.relicas=1,是表示需要多少个副本同步才能表示消息是提交的,默认值是1(server.properties中配置,并且acks=-1(表示需要所有副本确认,此值参数才生效),这个参数提高了数据了可靠性,

要说明的是acks对应的值说明

  1. 0,表示producer不需要等待broker消息确认,这个选项延迟最小,但风险最大,服务宕机,数据丢失
  2. 1.表示producer主要要得到kafka集群的leaser节点确认即可,这个选择延迟较小,确保了leader节点确认接收成功
  3. -1(all),需要isr中所有的副本接受确认,速度慢,安全性高,但是isr也会缩小到一个leader replica,所以设置参数all并不能一定避免丢失

当min.insync.replicas=1的时候,一旦消息写入leader端本地就认为已提交,而延迟一轮fetch更细HW值的设计是的follower的hw值是异步延迟更新的,如果此时leader宕机,成为新的leader的follower的hw就可能是过期的,则认为成功提交的消息被删除

那么kafka有没有解决数据丢失的解决方案呢

kafka0.11.0.0引入了leader epoch来解决这个问题

epoch实际上就是一对值(epoch,offset),epoch带包leader的版本,从0开始递增,当leader发送变化,epoch 1,而offset则对应这个epoch版本的leader写入第一条消息的offset

比如,有两个leader epoch<0,0>和<1,120>那么第一个leader epoch版本号是0,这个版本的leader从位移0开始保存消息,一共保存了120条消息,之后,leader发生变更,版本号增加到1,新版本的起始位移是120。

当引入leader epoch机制后,Follower副本B重启回来之后,需要向A发送一个特殊的请求获取leader 的leo值,follower发现leo值不比他自己的leo小,且缓存中epoch的起始位移没有大于这个leo的值,因此副本不需要截断日志,这样就会丢失数据

但是当leader此时宕机,follower副本成为了leader,同样的原leader重启回来之后,也会向原follower副本获取leo的值,发现获取的leo的值和不比自己的leo值小,因此原leader不会进行截短日志,同时在更新元副本的缓存leader epoch的值,这样就能保证数据的丢失。

0 人点赞