一种并行,背压的Kafka Consumer

2022-05-23 12:11:13 浏览数 (1)

介绍

几乎所有 Kafka Consumer 教程都是下面的代码:

代码语言:javascript复制
KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props)
// Subscribe to Kafka topics
consumer.subscribe(topics);

while (true) {
    // Poll Kafka for new messages
    ConsumerRecords<String, String> records = consumer.poll(100);
    // Processing logic
    for (ConsumerRecord<String, String> record : records) {
        doSomething(record);
    }
  }
}

基本上就是创建一个Kafka consumer,然后订阅对应的topics,然后就可以无限消费数据了,消费到数据后对每一条消息进行处理,这个过程我们叫做‘拉取然后循环处理’(poll-then-process loop)。

这相当简单,易于实施,人们可能一直在生产中使用它而没有任何问题。但是,此模型存在各种问题,我们将在下一节中详细介绍。

 问题

可能没有按照预期的那样获取数据

看上面的代码,我们开发者可能会认为 poll 是一种向 Kafka 发出需求信号的方式。我们的消费者仅在完成对先前消息的处理后才进行轮询以获取更多消息。如果它的处理速度很慢,Kafka 将充当‘减震器’,确保即使在生产速度高得多的情况下我们也不会丢失任何消息。另一方面,当处理速度较慢时,连续获取数据之间的间隔也会增加,这是有问题的,因为 max.poll.interval.ms 配置有一个默认的(5 分钟)上限:

max.poll.interval.ms 使用消费者组管理时调用 poll() 之间的最大延迟。这为消费者在获取更多记录之前可以空闲的时间量设置了上限。如果在此超时到期之前未调用 poll(),则认为消费者失败,组将进行rebalance,以便将分区重新分配给另一个成员。

换句话说,如果我们的消费者没有在每个 max.poll.interval.ms 中至少调用一次 poll ,那它就像死了一样。发生这种情况时,Kafka 会执行一个rebalance过程,将已死消费者的当前工作分配给其消费者组的其他成员。这在已经很慢的处理速率中引入了更多的开销和延迟。

更糟糕的是,如果处理导致一个消费者的速度变慢,很可能会导致其他消费者接管其工作时出现同样的问题。此外,假定的死亡消费者在下一次轮询时尝试重新加入组时也可能导致重新平衡(请记住,这是一个无限循环!)。这两者都使得rebalance一次又一次地发生,进一步减缓了消费。

现在,还有另一种配置可以帮助解决这种情况:

max.poll.records 单次调用 poll() 返回的最大记录数。请注意, max.poll.records 不会影响底层的获取行为。消费者将缓存来自每个获取请求的记录,并从每次轮询中返回它们。

将此设置为较低的值,我们的消费者将在每次轮询时处理更少的消息。因此轮询间隔将减少。或者,我们也可以将 max.poll.interval.ms 增加到更大的值。如果我们不能摆脱 poll-then-process 循环,这应该可以暂时解决问题。然而,它并不理想。

首先,这些配置是在我们启动消费者时设置的,但它们是否工作取决于消息或应用程序。我们可能会为每个应用程序专门设置它们,但最终,我们正在玩猜谜游戏并祈祷我们很幸运。

其次,在最坏的情况下,rebalance过程开始可能需要两倍于 max.poll.interval.ms 的持续时间:

  • Kafka 必须等待 max.poll.interval.ms 来检测我们的消费者不再轮询
  • 当 Kafka 决定rebalance时,其他消费者只会在下一次poll时知道这个决定

我们从不希望rebalance花费更多时间,因此设置更高的 max.poll.interval.ms 并不是很好。

最后,这些配置意味着我们的消费者被“期望”频繁地轮询,至少每 max.poll.interval.ms 一次,无论它在做什么类型的处理。如果不包含这种期望,poll-then-process 循环不仅会误导开发人员,而且注定会失败。

消息处理是异步的

Kafka 只保证一个分区内消息的顺序。来自不同分区的消息是不相关的,可以并行处理。这就是为什么在 Kafka 中,一个主题中的分区数是并行度的单位。

理论上,我们可以通过运行与主题上的分区数量一样多的消费者来轻松实现最大并行度。然而,实际上,这开销太大,更不用说它对增加rebalance机会的影响,因为有很多的消费者可能是来来去去(come and go)。

如果我们再次查看我们的消费者代码,它可以订阅多个主题并可能接收来自多个分区的消息。然而,在处理这些消息时,它会一一处理。这不是最优的。

现在,假设我们的处理逻辑非常简单,我们可以只使用线程池来并行化它吗?例如,通过向线程池提交一个处理任务,对于每条消息?

嗯,它仅在我们不关心处理排序和保证(例如最多一次、至少一次等)时才有效。因此在实践中它不是很有用。

一个更好的模型

概述

poll-then-process 循环的许多挫折来自不同的关注点——轮询、处理、偏移提交——混合在一起的情况。结果,当我们将它们分成独立的组件时,我们最终得到了一个改进的模型,它可以适当地支持并行处理和背压。下面更详细地描述了每个组件。

Work Queues

Work Queues 是 Poller 和 Executor 之间的通信通道:

  • 分配的 TopicPartition 与work queue(工作队列)之间存在一对一的映射。每次轮询后,Poller 将来自每个分区的新消息推送到其对应的work queue中,保留原始顺序。每个work queue也有一个可配置的大小。满时,它会向 Poller 施加背压,以便它可以跟进适当的操作。
  • work queue(工作队列)是异步的,它将轮询和消息处理分离,允许它们独立发生。这与 poll-then-process 循环形成对比,后者是循环中的两个连续步骤。

Poller

简而言之,Poller 封装了 Kafka 中与 poll 相关的一切:

  • 它监视rebalance事件——例如通过注册 ConsumerRebalanceListener——并协调其他单元来处理它们。
    • 对于每个新分配的 TopicPartition,它都会建立一个新的工作队列。
    • 对于每一个被撤销(或丢失)的TopicPartition,它都会命令Executor和Offset Manager对相关工作进行包装,并拆除相应的工作队列。
  • 它使用短的(例如 50 毫秒)可配置的时间间隔定期轮询 Kafka。由于这比默认的 max.poll.interval.ms 低很多倍,同时也不受消息处理的影响,我们避免了困扰 poll-then-process 循环的“rebalance风暴”。Kafka 不会因为没有足够频繁地轮询而将我们的消费者误认为已死。此外,我们会更早知道是否会发生另一次rebalance。
  • 当我们更频繁地轮询时,我们还可以使用较低的 max.poll.interval.ms 来加快rebalance过程。
  • 对于每个 Executor 无法跟上消息传入速率的 TopicPartition,其对应的工作队列将变满,并对 Poller 进行背压。轮询器需要有选择地暂停此 TopicPartition,以便后续轮询不会从中提取更多消息。当队列再次被释放时,它将恢复相同的 TopicPartition 以从下一次轮询开始获取新消息。这就是为什么我们可以继续拉取数据。这也是我们使用较短间隔的原因,以便我们可以更快地“恢复”。

pause(Collection<TopicPartition> partitions) 暂停从请求的分区中提取。未来对 poll(Duration) 的调用将不会从这些分区返回任何记录,直到使用 resume(Collection) 恢复它们。

Executor

Executor 就像一个线程池,它在其中维护多个worker来处理消息:

  • Executor 和 worker 的数量是可调的,以针对不同的工作负载进行优化,例如 CPU 限制、I/O 限制等。
  • 每个队列由一个worker处理。
  • 一个worker可以负责多个队列。
  • 对于每个队列,worker 会一一处理其消息。

通过这种设置,一个分区内的消息按顺序处理,而来自不同分区的消息并行处理。

Offset Manager

Kafka 中的每条消息都与一个偏移量(offset)相关联——一个整数,表示它在当前分区中的位置。通过存储这个数字,我们实质上为我们的消费者提供了一个检查点。如果它失败并返回,它知道从哪里继续。因此,在 Kafka 中实现各种处理保证至关重要:

  • 如果我们在 Kafka 中存储偏移量,它负责手动提交偏移量。
  • 如果我们决定使用外部存储管理偏移量,它负责从该存储中检索和保存。
  • 它允许 Poller 和 Executor 以同步或异步方式保存偏移量 - 以“一劳永逸”的方式。
  • 可以配置偏移管理器的存储行为:批量、使用计时器重复等等...

Kafka 的自动提交呢?Confluent声称:

使用自动提交可以让您“至少一次”(at least once)交付:Kafka 保证不会丢失任何消息,但重复消息是可能的。

这适用于交付,但是,它不为处理提供任何保证:

  • 它不是最多一次(at-most-once):如果一些消息被成功处理,并且我们的消费者在下一个自动提交事件之前崩溃,这些消息将被重新处理。
  • 这不是至少一次(at-least-once):如果自动提交启动,并且我们的消费者随后崩溃,一些消息会丢失。

因此,我们总是将 enable.auto.commit 设置为 false 并让 Offset Manager 手动管理偏移量。

实现处理保证

让我们通过几个示例用例来了解组件如何协同工作以满足不同的处理保证。

最多一次(At-most-once)

对于最多一次,我们只需要在处理消息之前提交偏移量。我们可以在处理每条消息之前立即执行此操作。但是,在引入更多成本的同时,并没有给我们更强的保证。因此,Poller 对此负责。每次轮询后,它将告诉偏移管理器保存这些偏移量并等待来自 Kafka 的成功确认,然后再将消息排队以进行处理。

在rebalance事件之前,它只需要向 Executor 发送一个即发即弃的信号以停止处理。然后它取消工作队列并返回等待rebalance。丢失的消息是那些仍在队列中或正在处理中的消息。如果我们想在不影响rebalance持续时间的情况下优化更少的丢失,我们可以使用更小的队列大小。

至少一次(At-least-once)

对于至少一次,我们只需要确保仅在成功处理消息后才保存偏移量。因此,如果我们要处理 10 条消息,我们不需要为所有消息保存偏移量,而只需要保存最后一条消息。

在此设置中,Executor 将在每次完成对消息的处理时向 Offset Manager 发出信号。偏移量管理器跟踪每个分区的最新偏移量 - 并决定何时将它们提交给 Kafka。例如,我们可以将 Offset Manager 设置为每 5 秒提交一次。无论新消息是否出现,都会发生这种情况。

在rebalance事件之前,Poller 设置了一个硬性截止日期,并通知 Executor 结束其正在进行的处理,并通知 Offset Manager 以跟进最后一次提交。如果截止日期已经过去,或者 Poller 收到了其他人的响应,它会取消工作队列并返回等待rebalance。

为了优化减少重复处理,我们可以:

  • 使用较宽松的截止日期,留出更多时间“结束”。但是,它也增加了重新平衡的时间。
  • 将偏移管理器设置为更频繁地提交。

确切一次(Exactly-once),外部管理的偏移量

在这种情况下,需要在一个事务中进行偏移保存和消息处理。这意味着 Executor 和 Offset Manager 使用同步调用紧密合作以实现它。

在rebalance事件之后,轮询器向偏移管理器询问当前分配的已保存偏移量。然后它会在恢复轮询之前尝试恢复保存的位置。

public void seek(TopicPartition partition, long offset) 覆盖消费者将在下一次轮询(超时)时使用的获取偏移量。

在rebalance事件之前,Poller 会通知 Executor 并等待其响应。Executor 回滚其正在进行的事务并返回到 Poller。Poller 然后取消工作队列并返回等待rebalance。

总结

我们分析了 loop-then-process 循环的各种问题,并提出了一个更合适的模型来理解和实现 Kafka Consumer。缺点是它要复杂得多,对于初学者来说可能并不容易。我们将这种复杂性归咎于 Kafka 及其低级 API。

在实践中,我们可能不会自己做,而是使用一个现成的库,它可能基于也可能不基于类似模型:Alpakka Kafka、Spring for Kafka、zio-kafka 等......即便如此,所提出的模型对于评估这些解决方案或实施新的解决方案也很有用。

来源:

https://www.toutiao.com/article/7095235111150879262/?log_from=f5c5aad449665_1652927718906

“IT大咖说”欢迎广大技术人员投稿,投稿邮箱:aliang@itdks.com

来都来了,走啥走,留个言呗~

 IT大咖说  |  关于版权

由“IT大咖说(ID:itdakashuo)”原创的文章,转载时请注明作者、出处及微信公众号。投稿、约稿、转载请加微信:ITDKS10(备注:投稿),茉莉小姐姐会及时与您联系!

感谢您对IT大咖说的热心支持!

  • 相关推荐
  • 推荐文章
  • DBA的福音|分享免费oracle性能监控调优工具
  • 一个golang实现的全文检索引擎,支持亿级数据,毫秒级查询
  • 跨系统数据一致性问题经验实战
  • 还在用Alpine做Docker镜像?看看大牛怎么说
  • 掌握mysql的这些操作,让你事半功倍
  • SpringBoot 监控 SQL 运行情况?
  • 仅数MB,准确率99.9%的离线IP地址定位库,0.0x毫秒级查询
  • 跟xshell说再见,推荐免费的终端连接器WindTerm
  • ICLR 2022 | 走向深度图神经网络:基于GNTK的优化视角
  • 不会还有人不懂Stream源码吧?10年架构师带你一次性搞懂
  • 2 万字详解,彻底讲透 Elasticsearch

0 人点赞