kafka基本知识:https://km.woa.com/group/51596/articles/show/480932?from=iSearch
这里单提出几个关键地方:
1 offset 是消息在分区中的唯一标识,是一个单调递增且不变的值。Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序而不是主题有序。
2 kafka分区有主从的多副本容灾,但为啥没有mysql主写从读,1)并非写少读多场景,实际上product consume都可以认为是写;2)主从采用异步拉取,没有解决一致性问题;3)consumer消费数据本来就有路由操作,若分区相对均匀地分散到各个broker上,同样可以达到负载均衡的效果
3 消费模型,consumergroup实现了支持订阅 点对点2种模式,consumer在consumergroup中最多消费某些分区(consumer消费的分区不会重叠,consumerNum>partitionNum那么就会有闲置的consumer)。consumer只存在某个cg就是点对点消费,consumer加入了不同consumergroup就是订阅;
4 消费rebalance,Rebalance 的触发条件有 3 个。
- 组成员数发生变更。新 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例被“踢出”组,踢出的判定条件:1)未报心跳2)消费能力不足,sarama里就是2次poll的时间间隔(max.poll.interval.ms)
- 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题consumer.subscribe(Pattern.compile("t.*c")) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
- 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
5 重复消费问题,Consumer需要向 Kafka 汇报自己的位移数据,consumer有2种配置,enable.auto.commit,false那么需要手动commitOffset,true那么下次poll的时候会把本次的offset commit,那么如果本次数据已经在消费,在下次poll之前consumer异常退出,那么就有重复消费了。kafka不保证exactly-once,所以下游最好考虑下操作幂等性。
kafka单机搭建:
broker.id=0
listeners=internal://0.0.0.0:9092,external://0.0.0.0:10001,vip://0.0.0.0:10000 # 9092是内网的,10001对外的broker是真正给consumer消费的端口,10000是对外vip(因为同一个clb不能绑定多次同实例端口)
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,external:SASL_PLAINTEXT,internal:SASL_PLAINTEXT,vip:SASL_PLAINTEXT
zookeeper.connect=localhost:2181
advertised.listeners=external://expose-ip:10001,internal://172.30.0.15:9092,vip://expose-ip:10001 # 172.30.0.15:9092是内网ip,expose-ip:10001是broker绑定在clb上对应的地址,是真正给consumer消费的地址,vip是分组后用来做高可用拿brokers的公共vip
inter.broker.listener.name=internal # 非常重要,是告诉zookeeper,internal对应的9092才是内部brokers通信的,expose-ip:10001是对外的
broker.id=1
listeners=internal://0.0.0.0:9092,external://0.0.0.0:10002,vip://0.0.0.0:10000 # 9092是内网的,10002对外的broker是真正给consumer消费的端口,10000是对外vip(因为同一个clb不能绑定多次同实例端口)
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,external:SASL_PLAINTEXT,internal:SASL_PLAINTEXT,vip:SASL_PLAINTEXT
zookeeper.connect=localhost:2181
advertised.listeners=external://expose-ip:10002,internal://172.30.0.15:9092,vip://expose-ip:10002 # 172.30.0.15:9092是内网ip,expose-ip:10002是broker绑定在clb上对应的地址,是真正给consumer消费的地址,vip是分组后用来做高可用拿brokers的公共vip
inter.broker.listener.name=internal # 非常重要,是告诉zookeeper,internal对应的9092才是内部brokers通信的,expose-ip:10002是对外的
注意点:
1 kafka-server&client的版本严格匹配,很多时候是不兼容的