导读:目前国内公有云上的kafka产品都是基于开源kafka产品二次封装改造的,基本上开源kafka的配置参数都能应用在云上kafka产品里。本文以腾讯云的ckafka产品为例,分别介绍了几个应用场景,每个点都有详细的配置干货。通过这些设置和正确的使用姿势,我们来很好的保证关联业务的稳定性和可靠性。
一、如何提高生产消费
从生产者的角度来看,向不同的 partition 写入是完全并行的;从消费者的角度来看,并发数完全取决于 partition 的数量(如果 consumer 数量大于 partition 数量,则必有 consumer 闲置)。因此选取合适的分区数量对于发挥 CKafka 实例的性能十分重要。partition 的数量需要根据生产和消费的吞吐来判断。理想情况下,可以通过如下公式来判断分区的数目:
Num = max( T/PT , T/CT ) = T / min( PT ,CT )
其中,Num 代表 partition 数量,T 代表目标吞吐量,PT 代表生产者写入单个 partition 的最大吞吐,CT 代表消费者从单个 partition 消费的最大吞吐。则 partition 数量应该等于 T/PT 和 T/CT 中较大的那一个。
在实际情况中,生产者写入但 partition 的最大吞吐 PT 的影响因素和批处理的规模、压缩算法、确认机制、副本数等有关。消费者从单个 partition 消费的最大吞吐 CT 的影响因素和业务逻辑有关,需要在不同场景下实测得出。
通常建议 partition 的数量一定要大于等于消费者的数量来实现最大并发。 如果消费者数量是 5,则 partition 的数目也应该是 ≥ 5 的。同时,过多的分区会导致生产吞吐的降低和选举耗时的增加,因此也不建议过多分区。提供如下信息供参考:
· 一个 partition 是可以实现消息的顺序写入的。
· 一个 partition 只能被同一个消费者组的一个消费者进程消费。
· 一个消费者进程可同时消费多个 partition,即 partition 限制了消费端的并发能力。
· partition 越多则生产消息失败后, leader 选举的耗时越长。
· offset 的粒度最细是在 partition 级别的,partition 越多,查询 offset 就越耗时。
· partition 的数量是可以动态增加的,只能增加不能减少。但增加会出现消息 rebalance 的情况。
影响生产者写入但 partition 的最大吞吐 PT 的参数:
batch.size=16384
# 生产者会尝试将业务发送到相同的 Partition的消息合包发送到 Broker,batch.size设置合包的大小上限。默认为 16KB。batch.size 设太小会导致吞吐下降,设太大会导致内存使用过多。
acks=1
# Kafka producer 的 ack 有 3 种机制,分别说明如下:
# -1 或 all:Broker 在 leader 收到数据并同步给所有 ISR 中的 follower 后,才应答给 Producer 继续发送下一条(批)消息。 这种配置提供了最高的数据可靠性,只要有一个已同步的副本存活就不会有消息丢失。注意:这种配置不能确保所有的副本读写入该数据才返回,可以配合 Topic 级别参数 min.insync.replicas 使用。
# 0:生产者不等待来自 broker 同步完成的确认,继续发送下一条(批)消息。这种配置生产性能最高,但数据可靠性最低(当服务器故障时可能会有数据丢失,如果 leader 已死但是 producer 不知情,则 broker 收不到消息)
# 1:生产者在 leader 已成功收到的数据并得到确认后再发送下一条(批)消息。这种配置是在生产吞吐和数据可靠性之间的权衡(如果leader已死但是尚未复制,则消息可能丢失)
# 用户不显示配置时,默认值为1。用户根据自己的业务情况进行设置
timeout.ms=30000
# timeout.ms控制生产请求在 Broker 等待副本同步满足 acks 设置的条件所等待的最大时间
buffer.memory=33554432
# buffer.memory配置生产者用来缓存消息等待发送到 Broker 的内存。用户要根据生产者所在进程的内存总大小调节
max.block.ms=60000
# max.block.ms是当生产消息的速度比 Sender 线程发送到 Broker 速度快,导致buffer.memory 配置的内存用完时会阻塞生产者 send 操作,该参数设置最大的阻塞时间
linger.ms=1000
# linger.ms是设置消息延迟发送的时间,这样可以等待更多的消息组成 batch 发送。默认为0表示立即发送。当待发送的消息达到batch.size 设置的大小时,不管是否达到 linger.ms设置的时间,请求也会立即发送
max.request.size=1048576
# max.request.size是生产者能够发送的请求包大小上限,默认为1MB。在修改该值时注意不能超过 Broker 配置的包大小上限16MB
compression.type=[none, snappy, lz4]
# compression.type是压缩格式配置,目前 0.9(包含)以下版本不允许使用压缩,0.10(包含)以上不允许使用 GZip 压缩
request.timeout.ms=30000
# request.timeout.ms是客户端发送给 Broker 的请求的超时时间,不能小于 Broker 配置的 replica.lag.time.max.ms,目前该值为10000ms
max.in.flight.requests.per.connection=5
# max.in.flight.requests.per.connectio是客户端在每个连接上最多可发送的最大的未确认请求数,该参数大于1且 retries 大于0时可能导致数据乱序。 希望消息严格有序时,建议客户将该值设置1
retries=3
# retries是请求发生错误时重试次数,建议将该值设置为大于0,失败重试最大程度保证消息不丢失
retry.backoff.ms=100
# retry.backoff.ms是发送请求失败时到下一次重试请求之间的时间
二、Rebalance如何处理
在我们的Consumer group在 Rebalance 期间,Consumer 会停下手头的事情,什么也干不了,消费停止,不及时处理,会导致消息堆积。
Rebalance 发生原因
根据 Consumer Group 的状态机可知,当 Consumer Group 为 Empty、AwaitSync 或 Stable 状态时,Group 可能会进行 Rebalance。
以下情况可能会发生 Rebalance:
- 一个消费者订阅了 Topic。
- 消费者被关闭。
- 某个 Consumer 被 Group Coordinator(协调器)认为是 Dead 状态时。如果某个Consumer 在 session.timeout.ms时间内没有给 Group Coordinator 发心跳,则该Consumer 将被认为是 Dead 状态,并且发起 Rebalance。
- 分区数增加。
- 订阅了不存在的 Topic。 如果您订阅了一个还未创建的 Topic,那么当这个 Topic 创建后会发生 Rebalance;同理,如果一个已经被订阅的 Topic 被删除,也会发生 Rebalance。
- 应用崩溃。
Rebalance 过程分析
以0.10版本Kafka 的机制为例,Rebalance 过程分析如下:
- 任何一个 Consumer 想要加入到一个 Consumer Group 中时,会发送一个 JoinGroup 的请求给 Group Coordinator。第一个加入 Group 的 Consumer 会变成 Group Leader。
- Leader 会从 Group Coordinator 处收到这个 Group 中所有 Consumer 列表,并且负责给 Group 中的 Consumer 分配 partition。分区的分配可以通过 PartitionAssignor 接口来实现。
- 分配完成后,Leader 会把分配结果发给 Group Coordinator,Coordinator 会把结果发送给所有的 Consumer。因此每个 Consumer 只能查看到自己被分配的 partition,Leader 是唯一能够拿到Consumer Group 中的 Consumer 以及其分区情况的节点的 Consumer。
上述过程会在每次 Rebalance 发生时执行一次。
如何避免不必要的Rebalance
第一类:因为未能及时发送心跳,导致 Consumer 被踢出Group 而引发的。
因此,你需要仔细地设置 session.timeout.ms 和 heartbeat.interval.ms 的值。我在这里给出一些推荐数值,你可以“无脑”地应用在你的生产环境中。设置 session.timeout.ms = 6s。设置 heartbeat.interval.ms = 2s。要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。将 session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer。毕竟,我们还是希望能尽快揪出那些已经挂掉的 Consumer,早日把它们踢出 Group。希望这份配置能够较好地帮助你规避第一类“不必要”的 Rebalance。
第二类: Consumer 消费时间过长导致的。
像Consumer 消费数据时需要将消息处理之后写入到 MongoDB,这是一个很慢的消费逻辑。如果MongoDB 出现不稳定都会导致 Consumer 程序消费时长的增加。此时max.poll.interval.ms 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。就拿 MongoDB 这个例子来说,如果写 MongoDB 的最长时间是 7 分钟,那么你可以将该参数设置为 8 分钟左右。
第三类:共用消费组而影响其它topic的消费
所有topic共用几个消费组,经常由于rebalance影响其它的topic消费,可以不同topic使用不同的消费组进行隔离避免相互影响。
总之,你要为你的业务处理逻辑留下充足的时间。这样Consumer 就不会因为处理这些消息的时间太长而引发 Rebalance 了。如果你按照上面的推荐数值恰当地设置了这几个参数,却发现还是出现了 Rebalance,那么我建议你去排查一下 Consumer 端的 GC 表现,比如是否出现了频繁的 Full GC 导致的长时间停顿,从而引发了 Rebalance。
三、接入方式与ACL
如何对接使用ckafka,以及ckafka提供的用户管理和ACL权限管理,满足多种环境的访问方式和安全权限的需求。
Ckafka提供了以下接入方式:
1、内网接入方式:购买ckafka实例,即提供内网访问的:ip和port
2、VPC网络接入方式:不是一个VPC下资源访问ckafka,可开通路由接入方式,来创建路由类型为VPC网络、接入方式是PLAINTEXT和SASL_PLAINTEXT 的路由:ip和port
3、支撑网络 接入方式:腾讯云的支撑环境下的资源访问ckafka,可开通路由接入方式,来创建路由类型为支撑网络、接入方式是PLAINTEXT和SASL_PLAINTEXT 的路由:ip和port
4、基础网络接入方式:腾讯云的基础网络下的资源访问ckafka,可开通路由接入方式,来创建路由类型为基础网络、接入方式是PLAINTEXT和SASL_PLAINTEXT 的路由:ip和port
5、公网网络接入方式:要通过外网来访问访问ckafka,可开通路由接入方式,来创建路由类型为公网域名接入、接入方式是SASL_PLAINTEXT 的路由:域名和port
注意说明:PLAINTEXT的ip和port为可直接使用的访问方式,SASL_PLAINTEXT则是Ckafak提供的安全认证机制,需要创建用户名和密码,进行用户认证,才能访问Ckafka。
ACL 访问控制列表(Access Control List):
帮助用户定义一组权限规则,允许/拒绝用户 user 通过 IP 读/写 Topic 资源 resource。
公网网络接入方式的使用说明:
生产者和消费的配置文件需要添加以下配置:
代码语言:javascript复制sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="instanceId#admin" password="admin";
- 其中,sasl.jaas.config 部分的 username 和 password 说明如下:username:包含实例 ID 和用户名,使用#拼接,实例 ID 为客户端需要连接的 CKafka 实例,用户名可通过控制台 ACL 策略管理进行设置。
- password:部分为用户名对应的密码。
注意说明:接入方式只影响接入时的验证方式,设置的 ACL 权限则是全局的。
如果您在开通公网访问路由的同时还使用了 PLAINTEXT 方式接入 Kafka,那么之前为 Topic 设置的 ACL 仍然会生效;如果希望 PLAINTEXT 方式的访问不受影响,请为 PLAINTEXT 需要访问的 Topic 添加全部用户的可读写的权限。
四、避免数据丢失
由于生产端的原因导致数据丢失
生产者将数据发送到消息队列 CKafka 时,数据可能因为网络抖动而丢失,此时消息队列 CKafka 未收到该数据。可能情况:
- 网络负载高或者磁盘繁忙时,生产者又没有重试机制。
- 磁盘超过购买规格的限制,例如实例磁盘规格为9000GB,在磁盘写满后未及时扩容,会导致数据无法写入到消息队列 CKafka。
- 突发或持续增长峰值流量超过购买规格的限制,例如实例峰值吞吐规格为100MB/s,在长时间峰值吞吐超过限制后未及时扩容,会导致数据写入消息队列 CKafka 变慢,生产者有排队超时机制时,导致数据无法写入到消息队列 CKafka。
解决方法
- 生产者对自己重要的数据,开启失败重试机制。
- 针对磁盘使用,在配置实例时设置好监控和 告警策略 ,可以做到事先预防。遇到磁盘写满时,可以在控制台及时升配(消息队列 CKafka 非独占实例间升配为平滑升配不停机且也可以单独升配磁盘)或者通过修改消息保留时间降低磁盘存储。
为了尽可能减少生产端消息丢失,您可以通过 buffer.memory 和 batch.size(以字节为单位)调优缓冲区的大小。缓冲区并非越大越好,如果由于某种原因生产者 done 掉了,那么缓冲区存在的数据越多,需要回收的垃圾越多,恢复就会越慢。应该时刻注意生产者的生产消息数情况、平均消息大小等(消息队列 CKafka 监控中有丰富的监控指标)。
- 配置生产端 ACK
- 当 producer 向 leader 发送数据时,可以通过 request.required.acks 参数以及min.isync.replicas 设置数据可靠性的级别。当 acks = 1时(默认值),生产者在 ISR 中的 leader 已成功收到数据可以继续发送下一条数据。如果 leader 宕机,由于数据可能还未来得及同步给其 follower,则会丢失数据。
- 当 acks = 0时,生产者不等待来自 broker 的确认就发送下一条消息。这种情况下数据传输效率最高,但数据可靠性确最低。
- 当 acks = -1或者 all 时,生产者需要等待 ISR 中的所有 follower 都确认接收到消息后才能发送下一条消息,可靠性最高。
即使按照上述配置 ACK,也不能保证数据不丢,例如,当 ISR 中只有 leader 时(ISR 中的成员由于某些情况会增加也会减少,最少时只剩一个 leader),此时会变成 acks = 1的情况。所以需要同时在配合 min.insync.replicas 参数(此参数可以在消息队列 CKafka 控制台 Topic 配置开启高级配置中进行配置),min.insync.replicas 表示在 ISR 中最小副本的个数,默认值是1,当且仅当 acks = -1或者 all 时生效。
建议配置的参数值
此参数值仅供参考,实际数值需要依业务实际情况而定。
- 重试机制:
message.send.max.retries=3;
retry.backoff.ms=10000;
- 高可靠的保证:
request.required.acks=-1;
min.insync.replicas=2;
- 高性能的保证:
request.required.acks=0;
- 可靠性 性能:
request.required.acks=1;
由于消费端的原因导致数据丢失
- 还未真正消费到数据就提交 commit 了 offset,若过程中消费者挂掉,但 offset 已经刷新,消费者错过了一条数据,需要消费分组重新设置 offset 才能找回数据。
- 消费速度和生产速度相差太久,而消息保存时间太短,导致消息还未及时消费就被过期删除。
解决方法
- 合理配置参数 auto.commit.enable,等于 true 时表示自动提交。建议使用定时提交,避免频繁 commit offset。
- 监控消费者的情况,正确调整数据的保留时间。监控当前消费 offset 以及未消费的消息条数,并配置告警,防止由于消费速度过慢导致消息过期删除。
五、其它注意事项
RecordTooLargeException:消息太大。
生产者的参数max.request.size:这个参数用来限制生产者客户端能发送的消息的最大值,默认值为1048576B,即1MB。一般情况下,这个默认值就可以满足大多数的应用场景了。如果要修改,这个参数与之对应的broker端的message.max.bytes参数也要修改,message.max.bytes的默认值也是1MB,broker端对应ckafka的topic维度,在控制台的调整是max.message.bytes:
如果将broker端的max.message.bytes参数配置为2MB,而max.request.size参数配置为3MB,那么当我们发送的一条大小为2.5MB的消息时,生产者客户端就会报出如下的异常:org.apache.kafka.common.errors.RecordTooLargeException:The request included a message larger than the max message size the server will accept.
六、总结
凡事预则立,不预则废。Ckafka作为解耦生产者与消费者的中间件,提供高吞吐性能、高可扩展性的消息队列服务。在性能、扩展性、业务安全保障、运维等方面具有超强优势,让您在享受低成本、超强功能的同时,免除繁琐运维工作。但也是基于正确的使用姿势上,为了避免一些不当的使用姿势,本人分享的一线经验总结,希望对你有所帮助,如有疑问欢迎在评论进行讨论。