CKafka系列学习文章 - 你是否踩过Ckafka的这些坑(十六)

2020-06-15 20:00:06 浏览数 (1)

导读:目前国内公有云上的kafka产品都是基于开源kafka产品二次封装改造的,基本上开源kafka的配置参数都能应用在云上kafka产品里。本文以腾讯云的ckafka产品为例,分别介绍了几个应用场景,每个点都有详细的配置干货。通过这些设置和正确的使用姿势,我们来很好的保证关联业务的稳定性和可靠性。

一、如何提高生产消费

从生产者的角度来看,向不同的 partition 写入是完全并行的;从消费者的角度来看,并发数完全取决于 partition 的数量(如果 consumer 数量大于 partition 数量,则必有 consumer 闲置)。因此选取合适的分区数量对于发挥 CKafka 实例的性能十分重要。partition 的数量需要根据生产和消费的吞吐来判断。理想情况下,可以通过如下公式来判断分区的数目:

Num = max( T/PT , T/CT ) = T / min( PT ,CT )

其中,Num 代表 partition 数量,T 代表目标吞吐量,PT 代表生产者写入单个 partition 的最大吞吐,CT 代表消费者从单个 partition 消费的最大吞吐。则 partition 数量应该等于 T/PT 和 T/CT 中较大的那一个。

在实际情况中,生产者写入但 partition 的最大吞吐 PT 的影响因素和批处理的规模、压缩算法、确认机制、副本数等有关。消费者从单个 partition 消费的最大吞吐 CT 的影响因素和业务逻辑有关,需要在不同场景下实测得出。

通常建议 partition 的数量一定要大于等于消费者的数量来实现最大并发。 如果消费者数量是 5,则 partition 的数目也应该是 ≥ 5 的。同时,过多的分区会导致生产吞吐的降低和选举耗时的增加,因此也不建议过多分区。提供如下信息供参考:

· 一个 partition 是可以实现消息的顺序写入的。

· 一个 partition 只能被同一个消费者组的一个消费者进程消费。

· 一个消费者进程可同时消费多个 partition,即 partition 限制了消费端的并发能力。

· partition 越多则生产消息失败后, leader 选举的耗时越长。

· offset 的粒度最细是在 partition 级别的,partition 越多,查询 offset 就越耗时。

· partition 的数量是可以动态增加的,只能增加不能减少。但增加会出现消息 rebalance 的情况。

影响生产者写入但 partition 的最大吞吐 PT 的参数:

batch.size=16384

# 生产者会尝试将业务发送到相同的 Partition的消息合包发送到 Broker,batch.size设置合包的大小上限。默认为 16KB。batch.size 设太小会导致吞吐下降,设太大会导致内存使用过多。

acks=1

# Kafka producer 的 ack 有 3 种机制,分别说明如下:

# -1 或 all:Broker 在 leader 收到数据并同步给所有 ISR 中的 follower 后,才应答给 Producer 继续发送下一条(批)消息。 这种配置提供了最高的数据可靠性,只要有一个已同步的副本存活就不会有消息丢失。注意:这种配置不能确保所有的副本读写入该数据才返回,可以配合 Topic 级别参数 min.insync.replicas 使用。

# 0:生产者不等待来自 broker 同步完成的确认,继续发送下一条(批)消息。这种配置生产性能最高,但数据可靠性最低(当服务器故障时可能会有数据丢失,如果 leader 已死但是 producer 不知情,则 broker 收不到消息)

# 1:生产者在 leader 已成功收到的数据并得到确认后再发送下一条(批)消息。这种配置是在生产吞吐和数据可靠性之间的权衡(如果leader已死但是尚未复制,则消息可能丢失)

# 用户不显示配置时,默认值为1。用户根据自己的业务情况进行设置

timeout.ms=30000

# timeout.ms控制生产请求在 Broker 等待副本同步满足 acks 设置的条件所等待的最大时间

buffer.memory=33554432

# buffer.memory配置生产者用来缓存消息等待发送到 Broker 的内存。用户要根据生产者所在进程的内存总大小调节

max.block.ms=60000

# max.block.ms是当生产消息的速度比 Sender 线程发送到 Broker 速度快,导致buffer.memory 配置的内存用完时会阻塞生产者 send 操作,该参数设置最大的阻塞时间

linger.ms=1000

# linger.ms是设置消息延迟发送的时间,这样可以等待更多的消息组成 batch 发送。默认为0表示立即发送。当待发送的消息达到batch.size 设置的大小时,不管是否达到 linger.ms设置的时间,请求也会立即发送

max.request.size=1048576

# max.request.size是生产者能够发送的请求包大小上限,默认为1MB。在修改该值时注意不能超过 Broker 配置的包大小上限16MB

compression.type=[none, snappy, lz4]

# compression.type是压缩格式配置,目前 0.9(包含)以下版本不允许使用压缩,0.10(包含)以上不允许使用 GZip 压缩

request.timeout.ms=30000

# request.timeout.ms是客户端发送给 Broker 的请求的超时时间,不能小于 Broker 配置的 replica.lag.time.max.ms,目前该值为10000ms

max.in.flight.requests.per.connection=5

# max.in.flight.requests.per.connectio是客户端在每个连接上最多可发送的最大的未确认请求数,该参数大于1且 retries 大于0时可能导致数据乱序。 希望消息严格有序时,建议客户将该值设置1

retries=3

# retries是请求发生错误时重试次数,建议将该值设置为大于0,失败重试最大程度保证消息不丢失

retry.backoff.ms=100

# retry.backoff.ms是发送请求失败时到下一次重试请求之间的时间

二、Rebalance如何处理

在我们的Consumer group在 Rebalance 期间,Consumer 会停下手头的事情,什么也干不了,消费停止,不及时处理,会导致消息堆积。

Rebalance 发生原因

根据 Consumer Group 的状态机可知,当 Consumer Group 为 Empty、AwaitSync 或 Stable 状态时,Group 可能会进行 Rebalance。

以下情况可能会发生 Rebalance:

  • 一个消费者订阅了 Topic。
  • 消费者被关闭。
  • 某个 Consumer 被 Group Coordinator(协调器)认为是 Dead 状态时。如果某个Consumer 在 session.timeout.ms时间内没有给 Group Coordinator 发心跳,则该Consumer 将被认为是 Dead 状态,并且发起 Rebalance。
  • 分区数增加。
  • 订阅了不存在的 Topic。 如果您订阅了一个还未创建的 Topic,那么当这个 Topic 创建后会发生 Rebalance;同理,如果一个已经被订阅的 Topic 被删除,也会发生 Rebalance。
  • 应用崩溃。

Rebalance 过程分析

以0.10版本Kafka 的机制为例,Rebalance 过程分析如下:

  1. 任何一个 Consumer 想要加入到一个 Consumer Group 中时,会发送一个 JoinGroup 的请求给 Group Coordinator。第一个加入 Group 的 Consumer 会变成 Group Leader。
  2. Leader 会从 Group Coordinator 处收到这个 Group 中所有 Consumer 列表,并且负责给 Group 中的 Consumer 分配 partition。分区的分配可以通过 PartitionAssignor 接口来实现。
  3. 分配完成后,Leader 会把分配结果发给 Group Coordinator,Coordinator 会把结果发送给所有的 Consumer。因此每个 Consumer 只能查看到自己被分配的 partition,Leader 是唯一能够拿到Consumer Group 中的 Consumer 以及其分区情况的节点的 Consumer。

上述过程会在每次 Rebalance 发生时执行一次。

如何避免不必要的Rebalance

第一类:因为未能及时发送心跳,导致 Consumer 被踢出Group 而引发的。

因此,你需要仔细地设置 session.timeout.ms 和 heartbeat.interval.ms 的值。我在这里给出一些推荐数值,你可以“无脑”地应用在你的生产环境中。设置 session.timeout.ms = 6s。设置 heartbeat.interval.ms = 2s。要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。将 session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer。毕竟,我们还是希望能尽快揪出那些已经挂掉的 Consumer,早日把它们踢出 Group。希望这份配置能够较好地帮助你规避第一类“不必要”的 Rebalance。

第二类: Consumer 消费时间过长导致的。

像Consumer 消费数据时需要将消息处理之后写入到 MongoDB,这是一个很慢的消费逻辑。如果MongoDB 出现不稳定都会导致 Consumer 程序消费时长的增加。此时max.poll.interval.ms 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。就拿 MongoDB 这个例子来说,如果写 MongoDB 的最长时间是 7 分钟,那么你可以将该参数设置为 8 分钟左右。

第三类:共用消费组而影响其它topic的消费

所有topic共用几个消费组,经常由于rebalance影响其它的topic消费,可以不同topic使用不同的消费组进行隔离避免相互影响。

总之,你要为你的业务处理逻辑留下充足的时间。这样Consumer 就不会因为处理这些消息的时间太长而引发 Rebalance 了。如果你按照上面的推荐数值恰当地设置了这几个参数,却发现还是出现了 Rebalance,那么我建议你去排查一下 Consumer 端的 GC 表现,比如是否出现了频繁的 Full GC 导致的长时间停顿,从而引发了 Rebalance。

三、接入方式与ACL

如何对接使用ckafka,以及ckafka提供的用户管理和ACL权限管理,满足多种环境的访问方式和安全权限的需求。

Ckafka提供了以下接入方式:

1、内网接入方式:购买ckafka实例,即提供内网访问的:ip和port

2、VPC网络接入方式:不是一个VPC下资源访问ckafka,可开通路由接入方式,来创建路由类型为VPC网络、接入方式是PLAINTEXT和SASL_PLAINTEXT 的路由:ip和port

3、支撑网络 接入方式:腾讯云的支撑环境下的资源访问ckafka,可开通路由接入方式,来创建路由类型为支撑网络、接入方式是PLAINTEXT和SASL_PLAINTEXT 的路由:ip和port

4、基础网络接入方式:腾讯云的基础网络下的资源访问ckafka,可开通路由接入方式,来创建路由类型为基础网络、接入方式是PLAINTEXT和SASL_PLAINTEXT 的路由:ip和port

5、公网网络接入方式:要通过外网来访问访问ckafka,可开通路由接入方式,来创建路由类型为公网域名接入、接入方式是SASL_PLAINTEXT 的路由:域名和port

注意说明:PLAINTEXT的ip和port为可直接使用的访问方式,SASL_PLAINTEXT则是Ckafak提供的安全认证机制,需要创建用户名和密码,进行用户认证,才能访问Ckafka。

ACL 访问控制列表(Access Control List):

帮助用户定义一组权限规则,允许/拒绝用户 user 通过 IP 读/写 Topic 资源 resource。

公网网络接入方式的使用说明:

生产者和消费的配置文件需要添加以下配置:

代码语言:javascript复制
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="instanceId#admin" password="admin";
  • 其中,sasl.jaas.config 部分的 username 和 password 说明如下:username:包含实例 ID 和用户名,使用#拼接,实例 ID 为客户端需要连接的 CKafka 实例,用户名可通过控制台 ACL 策略管理进行设置。
  • password:部分为用户名对应的密码。

注意说明:接入方式只影响接入时的验证方式,设置的 ACL 权限则是全局的。

如果您在开通公网访问路由的同时还使用了 PLAINTEXT 方式接入 Kafka,那么之前为 Topic 设置的 ACL 仍然会生效;如果希望 PLAINTEXT 方式的访问不受影响,请为 PLAINTEXT 需要访问的 Topic 添加全部用户的可读写的权限。

四、避免数据丢失

由于生产端的原因导致数据丢失

生产者将数据发送到消息队列 CKafka 时,数据可能因为网络抖动而丢失,此时消息队列 CKafka 未收到该数据。可能情况:

  • 网络负载高或者磁盘繁忙时,生产者又没有重试机制。
  • 磁盘超过购买规格的限制,例如实例磁盘规格为9000GB,在磁盘写满后未及时扩容,会导致数据无法写入到消息队列 CKafka。
  • 突发或持续增长峰值流量超过购买规格的限制,例如实例峰值吞吐规格为100MB/s,在长时间峰值吞吐超过限制后未及时扩容,会导致数据写入消息队列 CKafka 变慢,生产者有排队超时机制时,导致数据无法写入到消息队列 CKafka。

解决方法

  • 生产者对自己重要的数据,开启失败重试机制。
  • 针对磁盘使用,在配置实例时设置好监控和 告警策略 ,可以做到事先预防。遇到磁盘写满时,可以在控制台及时升配(消息队列 CKafka 非独占实例间升配为平滑升配不停机且也可以单独升配磁盘)或者通过修改消息保留时间降低磁盘存储。

为了尽可能减少生产端消息丢失,您可以通过 buffer.memory 和 batch.size(以字节为单位)调优缓冲区的大小。缓冲区并非越大越好,如果由于某种原因生产者 done 掉了,那么缓冲区存在的数据越多,需要回收的垃圾越多,恢复就会越慢。应该时刻注意生产者的生产消息数情况、平均消息大小等(消息队列 CKafka 监控中有丰富的监控指标)。

  • 配置生产端 ACK
    • 当 producer 向 leader 发送数据时,可以通过 request.required.acks 参数以及min.isync.replicas 设置数据可靠性的级别。当 acks = 1时(默认值),生产者在 ISR 中的 leader 已成功收到数据可以继续发送下一条数据。如果 leader 宕机,由于数据可能还未来得及同步给其 follower,则会丢失数据。
    • 当 acks = 0时,生产者不等待来自 broker 的确认就发送下一条消息。这种情况下数据传输效率最高,但数据可靠性确最低。
    • 当 acks = -1或者 all 时,生产者需要等待 ISR 中的所有 follower 都确认接收到消息后才能发送下一条消息,可靠性最高。

即使按照上述配置 ACK,也不能保证数据不丢,例如,当 ISR 中只有 leader 时(ISR 中的成员由于某些情况会增加也会减少,最少时只剩一个 leader),此时会变成 acks = 1的情况。所以需要同时在配合 min.insync.replicas 参数(此参数可以在消息队列 CKafka 控制台 Topic 配置开启高级配置中进行配置),min.insync.replicas 表示在 ISR 中最小副本的个数,默认值是1,当且仅当 acks = -1或者 all 时生效。

建议配置的参数值

此参数值仅供参考,实际数值需要依业务实际情况而定。

  • 重试机制:

message.send.max.retries=3;

retry.backoff.ms=10000;

  • 高可靠的保证:

request.required.acks=-1;

min.insync.replicas=2;

  • 高性能的保证:

request.required.acks=0;

  • 可靠性 性能:

request.required.acks=1;

由于消费端的原因导致数据丢失

  • 还未真正消费到数据就提交 commit 了 offset,若过程中消费者挂掉,但 offset 已经刷新,消费者错过了一条数据,需要消费分组重新设置 offset 才能找回数据。
  • 消费速度和生产速度相差太久,而消息保存时间太短,导致消息还未及时消费就被过期删除。

解决方法

  • 合理配置参数 auto.commit.enable,等于 true 时表示自动提交。建议使用定时提交,避免频繁 commit offset。
  • 监控消费者的情况,正确调整数据的保留时间。监控当前消费 offset 以及未消费的消息条数,并配置告警,防止由于消费速度过慢导致消息过期删除。

五、其它注意事项

RecordTooLargeException:消息太大。

生产者的参数max.request.size:这个参数用来限制生产者客户端能发送的消息的最大值,默认值为1048576B,即1MB。一般情况下,这个默认值就可以满足大多数的应用场景了。如果要修改,这个参数与之对应的broker端的message.max.bytes参数也要修改,message.max.bytes的默认值也是1MB,broker端对应ckafka的topic维度,在控制台的调整是max.message.bytes:

如果将broker端的max.message.bytes参数配置为2MB,而max.request.size参数配置为3MB,那么当我们发送的一条大小为2.5MB的消息时,生产者客户端就会报出如下的异常:org.apache.kafka.common.errors.RecordTooLargeException:The request included a message larger than the max message size the server will accept.

六、总结

凡事预则立,不预则废。Ckafka作为解耦生产者与消费者的中间件,提供高吞吐性能、高可扩展性的消息队列服务。在性能、扩展性、业务安全保障、运维等方面具有超强优势,让您在享受低成本、超强功能的同时,免除繁琐运维工作。但也是基于正确的使用姿势上,为了避免一些不当的使用姿势,本人分享的一线经验总结,希望对你有所帮助,如有疑问欢迎在评论进行讨论。

0 人点赞