1.SocketServer SocketServer作为Broker对外提供Socket服务的模块,主要用于接收socket连接的请求,然后产生相应为之服务的SocketChannel对象。
内部主要包括三个模块:
Acceptor主要用于监听Socket连接; Processor主要用于转发Socket的请求和响应。 RequestChannel主要用于缓存Socket的请求和响应。 1.1Acceptor对象主要功能
(1)开启socket服务
(2)注册Accept事件
(3)监听此ServerChannel上的ACCEPT事件,当其发生时,将其以轮询的方式把对应的 SocketChannel转交给Processor处理线程。
1.2Processor对象主要功能 (1)当有新的SocketChannel对象进来的时候,注册其上的OP_READ事件以便接收客户端的请求。
(2)从RequestChannel中的响应队列获取对应客户端的请求,然后产生OP_WRITE事件。
(3)监听selector上的事件。如果是读事件,说明有新的request到来,需要转移给 RequestChannel的请求队列;如果是写事件,说明之前的request已经处理完毕,需要从 RequestChannel的响应队列获取响应并发送回客户端;如果是关闭事件,说明客户端已经关闭了 该Socket连接,此时服务端也应该释放相关资源。
1.3RequestChannel
本质上就是为了解耦SocketServer和KafkaApis两个模块,内部包含Request的阻塞队列和Response的阻塞队列。
注:SocketServer为了防止空闲连接大量存在,采用了LRU算法,即最近最少使用算法,会将长时间没有交互的SocketChannel对象关闭,及时释放资源。因此Processor仅仅是起到了接收Request,发送Response的作用,其处理Request的具体业务逻辑是由KafkaApis层负责的,并且两者之间是通过RequestChannel相互联系起来的。
总结可得,SocketServer负责下面三个方面:
(1)建立Socket,保持和客户端的通信;
(2)转发客户端的Request;
(3)返回Response给客户端。最后通过RequestChannel与其他模块解耦。
2.KafkaRequestHandlerPool KafkaRequestHandlerPool本质上就是一个线程池,里面包含了num.io.threads 个IO处理线程,默认 为8个。KafkaRequestHandlerPool在内部启动了若干个KafkaRequestHandler处理线程,并将RequestChannel对象和KafkaApis对象传递给了KafkaRequestHandler处理线程,因为KafkaRequestHandler需要从前者的requestQueue中取出Request,并且利用后者来完成具体的业务逻辑。
3.KafkaApis KafkaApis负责具体的业务逻辑,它主要和Producer、Consumer、Broker Server交互。 KafkaApis主要依赖以下四个组件来完成具体的业务逻辑:
LogManager提供针对Kafka的topic日志的读取和写入功能。 ReplicaManager提供针对topic分区副本数据的同步功能。 OffsetManager提供针对提交至Kafka偏移量的管理功能。 KafkaSchedule为其他模块提供定时的调度和管理功能。 3.1LogManager
LogManager负责提供Broker Server上topic的分区数据读取和写入功能,负责读取和写入位于Broker Server上的所有分区副本数据;如果Partition有多个Replica,则每个Broker Server不会存在相同Partition的Replica;如果存在的话,一旦遇到Broker Server下线,则会立刻丢失Partition的多份副本,失去 了一定的可靠性。
Topic、Partition和Replica三者之间的关联关系:
3.2ReplicaManager
ReplicaManager负责提供针对topic的分区副本数据的同步功能,需要针对不同的变化做出及时响应,例如Partition的Replicas发送Leader切换时,Partition的Replicas所在的Broker Server离线的时候,Partition的Replicas发生Follower同步Leader数据异常的时候,等等。
分区两个名词:AR和ISR
AR是Assign Replicas的缩写,代表已经分配给Partition的副本。 ISR是In-Sync Replicas的缩写,代表处于同步状态的副本。 并不是所有的AR都是ISR,尤其是当Broker Server离线的时候会导致对应TopicAndPartition的Replica没有及时同步Leader状态的Replica,从而该Replica不是ISR。
a.ReplicaManager是如何实现Replica数据的同步?
主要利用ReplicaFetcherThread(副本数据拉取线程)和Height Watermark Mechanism(高水位线机制)来实现数据的同步管理。
b.什么是高水位?
本质上代表的是ISR中的所有replicas的last commited message的最小起始偏移量,即在这偏移之前的数据都被ISR所有的replicas所接收,但是在这偏移之后的数据被ISR中的部分replicas所接收。
其中RecoverPoint代表的是recover-point-offset-checkpoint文件中记录的偏移量,LogEndOffset代表的是当前TopicAndPartition的replica所接收到消息的最大偏移量,HeightWatermark代表的是已经同步给所有ISR的最小偏移量。Replica的HeightWatermark发生更新在以下两种情况:
(1)Leader状态的Replica接收到其他Follower状态的Replica的FetchRequest请求时,会选择性得更新HeightWatermark。
(2)Follower状态的Replica接收到来自Leader状态的Replica的FetchResponse时,会选择性更新HeightWatermark,即ReplicaFetcherThread内部的processPartitionData流程。
4.OffsetManager 4.1Kafka提供两种保存Consumer偏移量的方法:
(1)将偏移量保存到Zookeeper中。
(2)将偏移量保存至Kafka内部一个名为_consumer_offsets的Topic里面。
将偏移量保存至Zookeeper中是kafka一直就支持的,但是考虑到zookeeper并不太适合大批量的频繁写入操作,大数据培训因此kafka开始支持将Consumer的偏移量保存再Kafka内部的topic中,即_consumer_offsets Topic。当用户配置offsets.storage=kafka时,高级消费者会将偏移量保存至Topic里面,同时通过OffsetManager提供对这些偏移量的管理。
4.2 OffsetManager主要功能
缓存最新的偏移量。 提供对偏移量的查询。 Compact,保留最新的偏移量,以此来控制Topic日志的大小。 Kafka如何将Consumer Group 产生的偏移量信息保存在_consumer_offsets的不同分区?
本质是通过计算不同Consumer Group的hash值和_consumer_offsets的分区数的模数,其结果作为指定分区的索引。
5.KafkaScheduler KafkaScheduler为其他模块提供定时任务的调度和管理,例如LogManager内部的cleanupLogs定时任务,flushDirtyLogs定时任务和checkpointRecoverPointOffsets定时任务;ReplicaManager模块内部的maybeShrinkIsr定时任务;OffsetManager内部的offsets-cache-compactor定时任务等等。KafkaScheduler内部是基于ScheduledThreadPoolExecutor实现的,对外封装了任务调度的接口schedule,线程个数由参数background.threads决定,默认值为10。
6.KafkaHealthcheck KafkaHealthcheck主要提供Broker Server健康状态的上报。Broker Server健康状态本质上就是指Broker Server是否在线,如果Broker Server在线,说明处于健康状态,如果Broker Server离线,说明处于死亡状态。
Broker Server如何上报健康状态?
BrokerChangeListener通过监听目录为/brokers/ids的zookeeper路径,当发生有数据变化时,则获取当前目录下的数据,从而获取当前集群的在线Broker Server列表。而KafkaHealthcheck正是提供了在目录为/brokers/ids的Zookeeper路径上注册节点的能力,该节点所在路径为EphemeralPath(非永久路径),当Broker Server由于异常情况导致下线时,此EphemeralPath随着Broker Server和zookeeper链接的断开而消失。
7.TopicConfigManager kafka提供对topic配置参数的在线修改能力,修改完成之后无需重新启动kafka集群,在线生效。Topic配置参数包括:数据文件的大小,索引文件的大小,索引项的大小,索引项的粒度,日志文件保留的策略等等;
Topic的配置参数位于路径为/config/topics/[topic]的zookeeper上,Broker Server内部为了避免针对每个Topic都在相关路径上建立监听器,对外提供了一个被通知的路径,其位于/brokers/config_changes,如果检测到该路径 上发生变化,则读取该路径上的数据,获取配置文件待更新的Topic,然后再从/config/topics/[topic]上加载最新的配置文件。