kafka全面解析(二)

2021-05-11 15:09:34 浏览数 (1)

kafka启动流程

  1. 首先实例化用于限流的QuotaManagers.
  2. 启动任务调度器,是基于java.util.concurrent.ScheduledThreadPoolExecutor来实现,kafkaserver会构造一个线程池,这些线程以守候线程在kafkaserver启动时开始运行负责副本管理以及日志管理调度
  3. 创建和zookeeper的连接
  4. 生成一个随机数,然后进行base64处理得到集群的id,调用clusterResourceListener通知集群元数据信息发生变更操作
  5. 实例化日志管理器
  6. 实例化并启动socketServer服务
  7. 实例化并启动副本管理器
  8. 实例化并启动控制器,在kafkaController在实例化同时实例化分区状态机,副本状态机和控制器选择器zookeeperLeaderElector
  9. 实例化并启动组协调器
  10. 实例化权限认证组件以及handLer线程池
  11. 实例化动态配置管理器
  12. 实例化并启动kafka健康状态检查,主要是在zookeeper的brokers/ids路径下创建和代理一样的id节点,该节点是一个临时节点,当代理离线时候,就会被删除
  13. 向mete.properties文件中写入当前代理的id以及固定版本号为0的version信息
  14. 注册kafka的metric信息,在kafkaserver启动时将一些动态的JMX Beans 进行注册,以便对kafka进行监控跟踪

创建主题的流程

生产者

oldproducer是生产者scala版本的生产者,支持同步模式,和异步模式,通过实行producer.type进行配置。

oldProducer

prodcuerPool保存的是生产者和代理的连接,每一个SyncProducer对象,是一个hashMap接口,key是brokerId,value是syncproducer

EventHander,对消息进行发送前准备如下

  1. 进行序列化
  2. 获取主题元数据信息
  3. 管理缓存中的主题元数据信息和每个主题对应的要发送的分区元数据信息
  4. 是否要进行压缩
  5. 对处理后的数据分组分发

异步发送和同步发送的最大的区别就是异步模式会首先将消息存入消息队列,然后又一个独立的线程判断是否需要将数据向代理发送。

kafkaProducer实现原理

实例化过程

kafkaProducer在实例化首先会加载和解析生产者相关配置信息并封装成producerConfig对象,然后根据配置项主要完成以下对象或数据结构的实例化

  1. 重配置项解析出clientid,客户端指定配置型的值以便追踪程序运行情况,如果没有配置则clientId以prducer-前缀后加一个从1递增的数字
  2. 根据配置向创建和注册用于kafak metrics指标收集的相关对象
  3. 实例化分区器,用于消息指定分区
  4. 实例化消息key 和value进行序列化
  5. 根据配置实例化一组拦截器
  6. 实例化用于消息发送相关元数据信息的meteData对象
  7. 实例化用于存储消息的Recordccmulator,类似一个队列,也叫消息累加器
  8. 根据指定的安全协议创建一个channelBuilder然后创建networkclient实例,用于生产者与各个代理进行socket通信,

sender过程分析

kafka实例化之后,调用kafakProducer.send()方法进行消息发送

  1. 获取metaData,
  2. 序列化,分别对key和value进行序列化,将key和value转为byte数组类型
  3. 获取分区,根据指定的partitionId则直接返回返回,否则根据分区器定义的分区策略计算出partitionId
  4. ProducerRecord长度有效性检查,
  5. 创建topicPartition对象,根据ProdcuerRecord对应的topic及partitionId,创建一个TopicPartition对象,在RecordAccumulator中为每个TopicPartition创建一个双向队列
  6. 构造callback
  7. 写BufferPool操作,这一步是调用RecordAccumulator.append方法将ProducerRecord写入RecordAccumulator的BufferPool中。

其中send方法并没有直接发送消息给kafka server,而是存入了RecordAccumulator中,当达到一定条件,会唤醒sender线程获取消息并发送,send方法我们详细看看RecordAccumulator.append方法实现逻辑

  1. 首先将append操作的记录器appendInprogress加1,如果失败就减1,
  2. 通过producerRecord构造的TopicPartition获取对应的Deque<RecordBatch>,获取Deque之后调用RecordAccumulator.tryAppend()方法尝试进行消息的写入
  3. RecordAccumulator.tryAppend方法首先会获取对应队列中的RecordBatch,若RecordBatch不为null,则调用RecordBatch.tryAppend将recorf写入消息缓冲区
    1. RecordBatch.tryAppend尝试将消息写入当前批次
    2. 判断当前批次是否能够容纳当前消息
    3. 如果不能则返回null交个消息累加器继续处理
    4. 如果可以,将消息写入该批次内存中,构建FutureRecordMetaData返回结果,返回future
  4. 根据返回的结果判断是否为空,如果为空,说明还未追加成功,创建一个新的批次RecordBatch,然后调用ProdcuerBatch.tryAppend尝试将消息写入当前批次
  5. 最后实例化RecordAppendResult对象返回给kafkaProducer.

至此kafkaproducer发送Record的第一步操作将Record写入消息写入缓冲区过成分析完毕,第二步有sender线程从消息累加器中取出Record将请求发送到响应的kafak节点。

Send发送消息

首先要获取MetaData中获取集群信息,然后从RecondAccumulator中读取符合的消息,然后构造网络层请求交由NetworkClient去执行,这个过程取出每个TopicPartition对应的分区Leader,但可能存在TopicPartition的leader不存在,就会触发元数据更新操作,在发送的NetworkClient内部维护一个InFlightRequests类型的inflightRequest对象用于保存已发送但未收到响应的请求,这个流程sender更像一个调度器,而NetWorkClient是网路请求的真正执行这,sender不断的从RecordAccumulator获取符合条件的消息,构造请求交由NetworkClient执行。

详细分析sender将消息最终发送到kafka节点

sender是后台一个一直执行的线程,他是通过run方法一直会执行,但真正执行的是run(long now)方法,该方法入参是当前系统时间,具体逻辑如下

  1. 获取Cluster信息,从MetaData中获取集群Cluster信息
  2. 获取各个topicPartition分区的Leader节点集合
  3. 根据第2步返回的结果ReadyCheckResult对象,进行如下处理,若unknownLeaderTopic不为空,进存在没有找到分区leader的主题,则遍历unknownLeaderTopics集合将主题信息加入metaData中,然后调用metaData.requestUpdata()方法将needUpdata设置成true,请求更新metaData信息
  4. 检测与ReadyCheckResult.readyNodes集合中个节点连接状态,通过调用NetWorkClient.ready,方法完成检测工作,同时根据一定条件判断是否为还未建立连接的节点创建连接,若与某个节点连接还未就绪,则将该节点从readyNodes中移除,经过ready方法处理完之后,readyNodes集合中的所有节点均已和netWorkClient建立连接
  5. 根据readyNodes中各节点的id进行分组,每个Node对应一个List<RecordBatch> ready集合,经过去ready处理,构造了一个为readyNodes集合中保存的各节点构造一个Map<Integer,List<RecordBatch>>类型的集合batchs,该map的key是节点id,以该节点为leader节点的所有或部分分区对应双端队列的第一个RecordBatch构造的Lis集合作为value
  6. 如果要保证消息发送有序,我们可以把上一步的map取出value进行二次迭代,对每个RecordBatch,调用RecordAccumulator.mutePartition进行处理,实际上这里是提取所有RecordBatch的TopicPartion,然后去重
  7. 根据配置过滤掉请求已超时的RecordBatch,将过期的请求添加到过期对列中,并将该RecordBatch从双端队列中移除,同时释放空空间,然后将过去的recordBarch交由senderMetrics进行处理,更新和记录响应的元数据
  8. 根据第五步获取的batchs,根据batchs分组的node,将每个Node转化成一个ClientRequest,最后形成一个List<ClientRequest>
  9. 然后调用NetWork.send执行网络擦混输,向代理发送请求,在发送请求中首先将ClientRequest添加inFlightRequest队列中,该队列记录的正在发送或已经发送但还没有响应,此时还没有真正的放,存放到了selector内部相对应的kafkaChannel里面,每个node对应一个kafkachannel,一个kafkachannel一次只能存放一个send数据包,当前的send数据包没有完成发送出去之前,不能存放下一个send,否则抛出异常
  10. 调用NetworkClient.poll真正执行读写操作,该方法首先检查是否要进行更新元数据,然后执行selector.poll,方法真正执行I/O操作,最后对已经完成的请求对其相应结果response进行处理。

生产者的整体流程如图

kafkaConsumer各组件说明

ConsumerConfig,消费者级别的配置,将相应配置传递给其他组件

SubscriotionState,维护了消费者订阅和消费消息的情况

ConsumerCoodinator,负责消费者与服务端GroupCoodinator通信

ConsumerNetWorkClient,对网络层通信NetworkClient的封装,用于消费者与服务端通信

Fetcher,对CondumerNetworkClient进行包装,负责从服务端获取消息

消费订阅

kafkaConusmer提供了两种订阅消息的方法,一种通过KafkaConusmer,subscrible方法指定消息对应的主题,另一种是通过kafakConsumer.assign方法指定需要消费的分区,

subscribe有几个重载方法如下

代码语言:javascript复制
public void subscribe(Collection topics)
public void subscribe(Pattern pattern)
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Collection topics, ConsumerRebalanceListener listener)

subcribe也可分为两种,一种是非模式匹配订阅主题的基本流程,一种是模式匹配订阅主题(正则的方式)

上图就是非模式订阅的基流程,而模式匹配订阅主题的方式与直接致电主题列表方式时间逻辑类似,也是调用Subscriptionstate.subscribe方式经订阅关系保存到SubscriptionState维护的用来保存订阅关系的数据结构中,即将订阅主题的模式赋值给subsctibedPatten,由于通过模式匹配来查找订阅的主题,所以接下来需要先设置Metadata.needMetadataForAllTopic表示设置为true,然后更细Metadata,最后交由消费者协调器ConsumerCoordinator从集群Cluter的当前所有主题中查找符合的主题,将主题添加到subsriptionState的subscription和groupSubscription集合中,并更新这些主题在Metadata中记录的过去时间

assign方法实现逻辑和subcribe类似,也是首先检测是否有并发操作,然后判断请求参数是否合法,即分区是否nul,以及是否为空集合,分别进行与subcribe方法相同的吹,然后遍历订阅的分区,构造一个与分区相对应的主题集合,

在将用户的消费者分区分配关系保存到subscriptionSatate.assignment之前,先调用Consumer.maybeAutoCommitOffsetsNow进行消费偏移量提交,保证同一个消费组的消费者对分区的消费偏移量已提交,防止重复消费,最后更新订阅的分区对应的主题过期时间。

两种订阅方式是互斥,客户端只能选择其中一种订阅方式。

消费消息

kafkaConsumer提供poll方法从服务端拉取消息,该方法是通过Fetcher类来完成消息的拉取及更新消费偏移量,因此我们首先讲解Fetcher.

Fetcher主要功能是负责构造拉取消息的FetchRequest请求,然后通过ConsumerNetWorkclient发送FetchRequest请求,最后对返回的结果进行处理更新缓存中记录的消费记录,

首先我们分析用于构造FetchRequest的Fetcher.createFetchRequest实现逻辑

  1. 根据medata.fetch方法获取集群信息cluster,然后从消费者所分配的分区subscriptions中查找该消费者可拉取消息的分区集合
  2. 查到到可拉取消息的分区集合之后,迭代集合分区,查找该分区的leader副本所在的节点,如果节点不存在则设置metadata更新表示为true,触发kafka元数据更新操作,由于leader分区节点不存在,因此本次拉取消息将忽略该分区
  3. 如果leader分区存在,同时unset队列不包括将要发往该leader节点的请求,并且inFligheRequests也不包括发往该节点的请求,则构造与该分区对应的FetchRequest,partitionData对象,并将该对象保存到fetchable集合中,fetchable是一个Map<Node,LinkedHashMap<TopicParittion,FetchRequest.partitionData>>类型的集合,这样就把分区leader节点进行了分组,最后遍历fetchable中的每个元素,根据每个元素的值构造FetchRequest,最终将fetchable转换成了Map<Node,FetchRequest>类型的Requests集合。
  4. 完成了FetchRequest构造之后,就可以执行FetchRequsest请求发送
  5. Fetcher.sendFetches就是负责将构造的requests集合中的每个FetchRequest发送相应的节点
  6. 该方法会遍历requests集合的每个元素,调用client.send方法将FetcherRequest构造一个ClientRequest对象,并将保存到cluent.unsent缓冲队列中等待发送
  7. 同时会绑定一个RequestFutureListener,用于对FetchRespons处理,RequestFutureLister,提供了onSuccess和onFailure方法,分别对处理成功和处理失败的结果进行处理,在onSuccess方法对FetchResponse进行处理,用每个分区返回的数据实例化一个CompletedFetch对象,并添加到competedFetches队列中
  8. completedFetches队列中的数据并不是最终返回给客户端的ConsumserRecord类型数据,Fetcher定义了一个FetchedRequest方法用于将completeFetches队列中保存的消息转为ConsumerRecord类型的消息,同时会更新每个分区对应用的TopicPartitionState的position值,position值是下一次拉取消息的其实位置

kafkaConsumer拉取消息

kafkaConusmer.poll核心逻辑是当没有拉取到消息是在timeout时间内循环调用pollOnce方法向服务端发送FetchRequest请求并进行相应处理,若pollOnce方法拉取消息,则poll方法会在消息返回给客户端之前调用Fetcher.sendFetces方法发送下一次拉取消息的请求,若干没有拉取消息同时等待时间没有超过timeout设置,则循环调用pollonce方法处理,若超时则构造一个空消息集合返回客户端。

pollonce方法的主要逻辑是,确保消费组在服务端对应的组协调器已完成分配并正常连接,消费者已加入到该组协调器的管理之中,同时以同步方式调用doAutoCommitOffsetsAsync方法获取消费初始位置,然后调用Fecher.fetchRecords方法,检测是否已获取消息,之所以首先调用Fetch.fetchedRecords进行处理,是因为kafkaConsumer.poll方法每次调用pollOnce方法获取消息之后,就会发送下一次FetcherRequest请求避免阻塞等待,若获取到消息立刻返回到poll方法执行题,然后发送下一次FetcherRequst,若没有获取到消息调用Fetcher.sendFetches方法发送FetchRequest请求,并调用ConusmerNetWorkClient.poll,执行网络层请求处理,阻塞等服务端响应之后构造返回结果,在构造返回结果之前,需要检测在长时间处理poll过程中,消费者是否需要重新加入消费组进行平衡操作,若需要重新加入消费组则返回一个空消息结合,否则代用Fetcher.fetchedRecord获取消息最后返回poll方法执行体。

kafka提供了两种方式获取消费起始位置和客户端调用相应的API确定消费其实位置

代码语言:javascript复制
seek()用于指定消费起始位置到一个特定位置
seekToBeginning()指定OffsetResetStrategy为EARLIEST,相当与auto.offset.reset=earliest
seekToEnd()指定offsetResetStrategy为LATEST,相当与通过配置向auto.offset.reset=latest

另一种方式就是设置auto.offset.reset设置消费起始位置,默认是LATEST策略自动重置消费起始位置.

消费偏移量提交

kafka提供了两种消费偏移量的方式,一种是自动提交,一种是手动提交使用API。

kafak提供同步提交commitSync和异步提交commitAsync提供客户端提交消费偏移量,这两种方式分别调用ConsumerCoordinator的CommitOffsetSync和commitOffsetsAsync,底层实现是通过客户端消费者协调器ConsumerCoordinator发送offsetCommitRequeq请求,服务端协调器GroupCoodinator进行处理,最后将消费偏移量提交到kafak内部主题中

分区数与消费者线程的关系

kafka分配线程与分区的分配策略

  1. round-robin分配策略

首先订阅的主题分区以及消费者线程进行排序,然后通过轮询方式分别将分区依次分给消费者线程

2.range分配策略

首先对同一个主题里面分区进行序号排序,并对消费者按字母排序进行排序,假设我们分区进行排完序为0,1,2,3,4,5,6,7,8,9,消费者排序完将会是C1,C2然后将分区的个数除于消费者的总数决定每个消费者消费几个分区,如果除不尽,那么前面几个消费者线程将会多消费一个分区,

我们10分分区,2个消费者,10/2=5,那个消费者C1消费者C2将会消费同样多的分区,所以最后分区分配的结果如下

代码语言:javascript复制
C1 ->0,1,2,3,4分区
C2 ->5,6,7,8,9分区

假设如果有11分区,那么最后分区分配结果如下

代码语言:javascript复制
C1->0,1,2,3,4,5分区
C2 ->6,7,8,9分区

假设我们有2个主题(T1,T2)分别有11个分区,最后的结果如下

代码语言:javascript复制
C1 ->T1 0,1,2,3,4,5 T2 0,1,2,3,4,5分区
C2 ->T1 6,7,8,9,10 T2 6,7,8,9,10 分区

很明显C1消费者比C2消费者多消费了2个分区,这就是一个弊端

如果我们把round-robin分配实例按照range分配策略进行分配,分配如下

持续关注,下一篇kafka全面总结,如果对您有一丝丝帮助,麻烦点个关注,也欢迎转发,谢谢

0 人点赞