https://blog.csdn.net/itcodexy/article/details/109574747
https://www.jianshu.com/p/8a7fa04a5a49
https://segmentfault.com/a/1190000015282836
https://segmentfault.com/u/snailiu
https://www.cnblogs.com/sujing/p/10960832.html
详解:
代码语言:java复制消息在网络中传输的方式只能通过二级制的方式,所以首先需要将消息序列化为二进制形式缓存在客户端,
kafka 使用了双端队列的方式将消息缓存起来,
然后使用发送线程(Sender)读取队列中的消息交给 Selector 进行网络传输发送给服务端(Broker)
1. 客户端组件
2. 客户端缓存存储模型
3. 确定消息的 partition 位置
4. 发送线程的工作原理
1、通过使用以下四大客户端组件来完成客户端消息的发送工作:
1、KafkaProducer:是一个生产者客户端的进程,通过该对象启动生产者来发送消息。
2、RecordAccumulator:是一个记录收集器,用于收集客户端发送的消息,并将收集到的消息暂存到客户端缓存中。
3、Sender:是一个发送线程,负责读取记录收集器中缓存的批量消息,经过一些中间转换操作,
将要发送的数据准备好,然后交由 Selector 进行网络传输。
4、Selector:是一个选择器,用于处理网络连接和读写处理,使用网络连接处理客户端上的网络请求
2、客户端缓存模型:一条消息首先需要确定要被存储到那个 partition 对应的双端队列上;
其次,存储消息的双端队列是以批的维度存储的,即 N 条消息组成一批,一批消息最多存储 N 条,
超过后则新建一个组来存储新消息;
其次,新来的消息总是从左侧写入,即越靠左侧的消息产生的时间越晚;
最后,只有当一批消息凑够 N 条后才会发送给 Broker,否则不会发送到 Broker 上。
3、确定消息的 partition 位置:2 种方式:对Partition哈希求余、轮询
A:对于指定了 key 的消息,partition 位置的计算方式为:Utils.murmur2(key) % numPartitions
即先对 key 进行哈希计算,然后在于 partition 个数求余,
从而得到该条消息应该被存储在哪个 partition 上。
B:对于没有指定 key 的消息,partition 位置的计算方式为:采用 round-robin 方式确定 partition 位置
即采用轮询的方式,平均的将消息分布到不同的 partition 上,
从而避免某些 partition 数据量过大影响 Broker 和消费端性能。
4、发送线程的工作原理
Sender 线程的主要工作是收集满足条件的批数据
第一步:扫描记录收集器中满足条件的批数据,然后将 partition -> 批数据映射转换成 BrokerId -> N 批数据的映射。
第二步:Sender 线程会为每个 BrokerId 创建一个客户端请求,然后将请求交给 NetWorkClient,
由 NetWrokClient 去真正发送网络请求到 Broker。
NetWorkClient 的工作内容
Sender:该线程准备好要发送的数据后,交由 NetWorkClient 来进行网络相关操作。
主要包括客户端与服务端的建连、发送客户端请求、接受服务端响应。
完成如上一系列的工作主要由如下方法完成。
reday()方法。从记录收集器获取准备完毕的节点,并连接所有准备好的节点。
send()方法。为每个节点创建一个客户端请求,然后将请求暂时存到节点对应的 Channel(通道)中。
poll()方法。该方法会真正轮询网络请求,发送请求给服务端节点和接受服务端的响应。
流程详解:
代码语言:java复制消息发送的过程中,涉及到两个线程协同工作:
1、主线程首先将业务数据封装成ProducerRecord对象,
2、之后调用send()方法将消息放入RecordAccumulator(消息收集器,也可以理解为主线程与Sender线程直接的缓冲区)中暂存,
3、Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,它从RecordAccumulator中取出消息并批量发送出去,
需要注意的是,KafkaProducer是线程安全的,多个线程间可以共享使用同一个KafkaProducer对象
1、在我们通过代码send()消息之后,这条消息就会发往拦截器Interceptor
2、Interceptor会对数据做处理
~ 加解密/脱敏
~ 过滤不满足条件的数据(ip白名单、错误编码、脏数据或者残缺数据)
~ 统计消息投递成功率或结合第三方工具计算消息在Kafka存储的时间
~ 在消息的header里放一个唯一标识,方便下游做去重
~ 针对旧版本,新版本Kafka引入了幂等性来保证Once Exactly(刚好一次)
3、对数据进行序列化
无论是否存在key,都必须给key和value指定序列化方式(消息在网络中传输的方式只能通过二级制的方式),
可通过实现Serializer自定义序列化规则
4、确定数据分区(确定消息的 partition 位置)
分区策略很重要,好的分区策略可以解决数据倾斜的问题
可通过实现Partitioner接口来自定义分区规则,否则规则如下
~ 1、如果发送send的时候指定了分区,则使用指定分区
~ 2、如未指定,则根据key进行hash,然后对分区数取模(Utils.murmur2(key) % numPartitions)
~ 3、如未指定且没key,则轮询发送给分区(低版本采用随机)
5、临时缓存(存储)
RecordAccumulator采用了双端队列(Deque)数据结构来临时存储
目的:提高发送数据的吞吐量
确定消息发送的分区后,会在RecordAccumulator寻找对应的Deque,找不到对应的Deque则新建
从对应的Deque的尾巴中取出最后一个RecordBatch(记录大小)进行判断:
~ 1、如果该Batch加上当前消息的大小,小于batch.size,则追加进去;
~ 2、否则创建新的Deque,将当前消息放进去并将Batch放到Deque队列
注:RecordBatch是写Kafka的最小单位
6、Sender 拉取数据
当满足linger.ms和buffer.memory任一个条件时,会进行数据的拉取
7、排队发送
每一个Deque的数据都有一个对应的ClientRequest,负责携带RecordBatch,排队等待前一个RecordBatch的响应
(Sender 线程会为每个 BrokerId 创建一个客户端请求)
8、包装
将ClientRequest扔到KafkaChannel中,等待Selector的发送
9、写入Kafka
这一步骤是真正的往Kafka的Broker中写数据,回应的规则是
~ ack=0:发送出去就立马执行第10步,不等待响应
典型的 fire and forget , 性能最好,但也最容易丢数据
~ ack=1:发送出去,等到那批数据被写到主副本上时,就成功响应,执行10步骤
由于只是写到主副本的页缓存,因此存在丢数据的可能
~ ack=-1:发送出去,直到ISR队列中包括主副本在内的min.insync.replicas个副本被写成功,才成功响应,执行10步骤
ack=-1搭配min.insync.replicas的结果
让kafka的副本复制策略游离在同步复制和异步复制之间
既避免了同步复制拖慢性能,又提高了异步复制的可靠性
10、回复 NetworkClient,开始下一个RecordBatch的发送
11、NetworkClient回复 RecordAccumulator,开始下一个RecordBatch的发送。