1 Kafka架构
生产者、Broker、消费者、Zookeeper;
注意:Zookeeper中保存Broker id和消费者offsets等信息,但是没有生产者信息。
1.1 Producer发送数据流程
1.2 Kafka Broker总体工作流程
1.3 消费者组初始化流程
2 Kafka的机器数量
Kafka机器数量 = 2 *(峰值生产速度 * 副本数 / 100) 1
3 副本数设定
一般设置成2个或3个,很多企业设置为2个。
副本的优势:提高可靠性;副本劣势:增加了网络IO传输
4 Kafka压测
Kafka官方自带压力测试脚本(kafka-consumer-perf-test.sh、kafka-producer-perf-test.sh)。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。
5 Kafka日志保存时间
默认保存7天;生产环境建议3天
6 Kafka中数据量计算
每天总数据量100g,每天产生1亿条日志,10000万/24/60/60=1150条/每秒钟 平均每秒钟:1150条 低谷每秒钟:50条 高峰每秒钟:1150条 *(2-20倍)= 2300条 - 23000条 每条日志大小:0.5k - 2k(取1k) 每秒多少数据量:2.0M - 20MB
7 Kafka数据存储需要多少硬盘空间
每天的数据量(100g) * 副本数(2个副本) * 日志保存时长(3天) / 70%
8 Kafka监控
自己开发监控器;
开源的监控器:KafkaManager、KafkaMonitor、KafkaEagle
9 Kakfa分区数计算
分区数一般设置为:3-10个
1)创建一个只有1个分区的topic
2)测试这个topic的producer吞吐量和consumer吞吐量。
3)假设他们的值分别是Tp和Tc,单位可以是MB/s。
4)然后假设总的目标吞吐量是Tt,那么分区数=Tt / min(Tp,Tc)
例如:producer吞吐量 = 20m/s;consumer吞吐量 = 50m/s,期望吞吐量100m/s;则分区数 = 100 / 20 = 5个分区
10 多少个Topic
通常情况:多少个日志类型就多少个Topic。也有对日志类型进行合并的。
11 Kafka的ISR副本同步队列
ISR(In-Sync Replicas),副本同步队列。ISR中包括Leader和Follower。如果Leader进程挂掉,会在ISR队列中选择一个服务作为新的Leader。有replica.lag.max.messages(延迟条数)和replica.lag.time.max.ms(延迟时间)两个参数决定一台服务是否可以加入ISR副本队列,在0.10版本移除了replica.lag.max.messages参数,防止服务频繁的进去队列
。
任意一个维度超过阈值都会把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也会先存放在OSR中。
12 Kafka分区分配策略
在 Kafka内部存在两种默认的分区分配策略:Range和 RoundRobin。
Range是默认策略。Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
例如:我们有10个分区,两个消费者(C1,C2),3个消费者线程,10 / 3 = 3而且除不尽。
C1-0 将消费 0, 1, 2, 3 分区 C2-0 将消费 4, 5, 6 分区 C2-1 将消费 7, 8, 9 分区
RoundRobin方式第一步将所有主题分区组成TopicAndPartition列表,然后对TopicAndPartition列表按照hashCode进行排序,最后按照轮询的方式发给每一个消费线程。
13 Kafka挂掉
1)Flume记录
2)日志有记录
3)短期没事
14 Kafka数据丢失问题
14.1 producer角度
Ack = 0,相当于异步发送,消息发送完毕即offset增加,继续生产。
Ack = 1,leader收到leader replica 对一个消息的接受ack才增加offset,然后继续生产。
Ack = -1,leader收到所有replica 对一个消息的接受ack才增加offset,然后继续生产。
ack在生产者指定,不同生产者可以不同。
ack设为-1,需要ISR里的所有follower应答,想要真正不丢数据,需要配合参数:
min.insync.replicas: n (ack为-1时生效,ISR里应答的最小follower数量)
默认为1(leader本身也算一个!),所以当ISR里除了leader本身,没有其他的follower,即使ack设为-1,相当于1的效果,不能保证不丢数据。
需要将min.insync.replicas设置大于等于2,才能保证有其他副本同步到数据。
retries = Integer.MAX_VALUE,无限重试。
如果上述两个条件不满足,写入一直失败,就会无限次重试,保证数据必须成功的发送给两个副本,如果做不到,就不停的重试,除非是面向金融级的场景,面向企业大客户,或者是广告计费,跟钱的计算相关的场景下,才会通过严格配置保证数据绝对不丢失
代码语言:javascript复制kafka-topics.sh --bootstrap-server hadoop1:9092 --create --topic testisr2
--replication-factor 3 --partitions 4 --config min.insync.replicas=2
完全不丢结论:ack=-1 min.insync.replicas>=2 无限重试
14.2 broker角度
副本数大于1
min.insync.replicas大于1
14.3 consumer角度
手动提交offset
flink结合checkpoint
15 Kafka数据重复
重复指的是发生重试造成的重复。
幂等性 ack-1 事务
15.1 Kafka数据重复
可以在下一级:SparkStreaming、redis、Flink或者Hive中dwd层去重,去重的手段:分组、按照id开窗只取第一个值;
了解:
Kafka幂等性原理(单分区单会话):producer重试引起的乱序和重复
15.2 重复问题的解决:
1)Kafka增加了pid和seq。Producer中每个RecordBatch都有一个单调递增的seq; Broker上每个topic的partition也会维护pid-seq的映射,并且每Commit都会更新lastSeq。
2)recordBatch到来时,broker会先检查RecordBatch再保存数据:
如果batch中 baseSeq(第一条消息的seq)比Broker维护的序号(lastSeq)大1,则保存数据,否则不保存。
15.3 乱序问题的解决
假设我们有5个请求,batch1、batch2、batch3、batch4、batch5;
如果只有batch2 ack failed,3、4、5都保存了,那2将会随下次batch重发而造成乱序。
可以设置max.in.flight.requests.per.connection=1(客户端在单个连接上能够发送的未响应请求的个数)来解决乱序,但降低了系统吞吐。
新版本kafka设置enable.idempotence=true后能够动态调整max-in-flight-request。
正常情况下max.in.flight.requests.per.connection大于1。当重试请求到来时,batch 会根据 seq重新添加到队列的合适位置,并把max.in.flight.requests.per.connection设为1,这样它前面的 batch序号都比它小,只有前面的都发完了,它才能发。
16 Kafka消息数据积压,Kafka消费能力不足怎么处理?
1 、如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数 = 分区数。(两者缺一不可)
2 、如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。