Presto on Apache Kafka 在 Uber的应用

2022-04-17 11:34:04 浏览数 (1)

本文翻译自Uber技术文章《Presto® on Apache Kafka® At Uber Scale》

Uber的目标是通过让世界运转来激发机遇,而大数据是其中非常重要的一部分。 Presto® 和 Apache Kafka® 在 Uber 的大数据堆栈中发挥着关键作用。 Presto 是查询联合的事实标准,已用于交互式查询、近实时数据分析和大规模数据分析。 Kafka 是支持许多用例的数据流的骨干,例如发布/订阅、流处理等。在接下来的文章中,我们将讨论我们如何将这两个重要的服务连接在一起,以通过Uber大规模Presto集群直接在 Kafka 上的实现轻量级、交互式 SQL 查询。

Presto在Uber的应用

Uber 使用开源 Presto 来查询几乎所有的数据源,包括动态的和静态的。 Presto 的多功能性使我们能够做出明智的、数据驱动的业务决策。 我们运营着大约 15 个 Presto 集群,跨越 5,000 多个节点。 我们每周有大约 7,000 名活跃用户,每天运行大约 500,000 个查询,从 HDFS 读取大约 50 PB。 如今,Presto 用于通过其可扩展的数据源连接器查询各种数据源,例如 Apache Hive™、Apache Pinot™、AresDb、MySQL、Elasticsearch 和 Apache Kafka。 您还可以在我们之前的一些博客中找到有关 Presto 的更多信息:

Engineering Data Analytics with Presto and Apache Parquet at Uber

Building a Better Big Data Architecture: Meet Uber’s Presto Team

Kafka在Uber的应用

Uber 拥有最大的 Apache Kafka 部署之一,每天处理数万亿条消息和数 PB。 如图 2 所示,今天我们将 Apache Kafka 定位为我们技术堆栈的基石。 它支持大量不同的工作流程,包括用于从 Rider 和 Driver 应用程序传递事件数据的发布-订阅消息总线、流式分析(例如 Apache Flink®)、将数据库更改日志流式传输到下游订阅者以及摄取各种数据进入 Uber 的 Apache Hadoop® 数据湖。 为了确保它的高性能、可靠和用户友好,我们进行了大量有趣的工作。 如:

Disaster Recovery for Multi-Region Kafka at Uber

Enabling Seamless Kafka Async Queuing with Consumer Proxy

Real-Time Exactly-Once Ad Event Processing with Apache Flink, Kafka, and Pinot

问题陈述

多年来,Uber 的数据团队已经看到对 Kafka 流分析的需求不断增加,因为对实时数据的及时、即席数据分析为数据科学家、工程师和运营团队提供了有价值的信息。

以下是 Kafka 团队的一个典型请求示例:运营团队正在调查为什么有几条消息没有被关键服务处理,这将对最终客户产生直接影响。 然后运维团队收集了报告问题的几个 UUID,并要求检查它们是否存在于服务的输入/输出 Kafka 流中。 如图 3 所示,该请求可以表述为查询:“UUID X 的订单是否在 Kafka 主题 T 中缺失。”

考虑的替代方案

这样的问题通常通过大数据中的实时分析来解决。 在该领域可用的各种技术中,我们专注于 2 类开源解决方案,即:流处理和实时 OLAP 数据存储。

Apache Flink、Apache Storm™ 或 ksql 等流处理引擎连续处理流并输出处理后的流或增量维护可更新视图。 这种流处理不适合上述问题,因为用户希望对过去的事件执行点查找或运行分析查询。

另一方面,Apache Pinot、Apache Druid™ 和 Clickhouse® 等实时 OLAP 数据存储更适合。 这些 OLAP 存储配备了先进的索引技术,因此它们能够索引 Kafka 流以提供低延迟查询。 事实上,Uber 几年前就采用了 Apache Pinot,如今 Pinot 是 Uber 数据平台中的一项关键技术,可为多个任务关键型实时分析应用程序提供支持。 您可以阅读我们之前关于在 Uber 使用 Pinot 的博客。

但是,实时 OLAP 需要一个重要的载入过程来创建一个从 Kafka 流中提取的表并调整该表以获得最佳性能。 此外,实时 OLAP 存储还需要存储和计算资源来提供服务,因此建议将此解决方案用于重复查询表并要求较低延迟的用例(例如面向用户的应用程序),但不适合临时故障排除或探索。

因此,这个问题促使 Kafka 和 Presto 团队共同探索一种轻量级的解决方案,考虑到以下几点:

  • 它重用了现有的 Presto 部署,这是一项已经在 Uber 进行了多年实战测试的成熟技术
  • 它不需要任何管理——可以随时发现 Kafka 主题,并且可以在创建后立即进行查询
  • Presto 以其跨多个数据源的强大查询联合功能而闻名,因此它允许 Kafka 与 Hive/MySQL/Redis 等其他数据源之间的关联,从而获得跨数据平台的洞察力

然而,这种 Presto 方法也有其局限性。 例如,它的性能不如实时 OLAP 存储,因为 Kafka 连接器没有构建索引,因此必须在一系列偏移量中扫描 Kafka 流。 此外,为了满足 Uber 的可扩展性要求,连接器还有其他挑战需要解决,我们将在下一节中详细说明。

在Uber的挑战

Presto 已经有一个支持通过 Presto 查询 Kafka 的 Kafka 连接器。 但是,该解决方案并不完全适合我们在 Uber 拥有的大规模 Kafka 架构。 有几个挑战:

  • Kafka 主题和集群发现:在我们提供 Kafka 即服务的 Uber,用户可以随时通过自助服务门户将新主题加入 Kafka。 因此,我们需要 Kafka 主题发现是动态的。 但是,当前 Presto Kafka 连接器中的 Kafka 主题和集群发现是静态的,每次我们加入新主题时都需要重新启动连接器。
  • 数据模式发现:与 Kafka 主题和集群发现类似,我们将模式注册表作为服务提供,并支持用户自助登录。 因此,我们需要 Presto-Kafka 连接器能够按需检索最新的模式。
  • 查询限制:限制每个查询可以从 Kafka 消费的数据数量对我们来说很重要。 Uber 有许多大型 Kafka 主题,其字节速率可以高达 500 M/s。 众所周知,Presto-Kafka 查询与其他替代方案相比相对较慢,从 Kafka 拉取大量数据的查询将需要很长时间才能完成。 这不利于用户体验,也不利于 Kafka 集群的健康。
  • 配额控制:作为分布式查询引擎,Presto 可以以非常高的吞吐量同时消费来自 Kafka 的消息,这可能导致 Kafka 集群的潜在集群降级。 限制最大 Presto 消耗吞吐量对于 Kafka 集群的稳定性至关重要。

架构

Uber 的数据生态系统为用户提供了一种编写 SQL 查询并将其提交到 Presto 集群执行的方式。 每个 Presto 集群都有一个 coordinator 节点,负责解析 SQL 语句、规划查询、调度任务供 worker 节点执行。 Presto 中的 Kafka 连接器允许将 Kafka 主题用作表,其中主题中的每条消息在 Presto 中表示为一行。 在接收到查询时,协调器确定查询是否具有适当的过滤器。 验证完成后,Kafka 连接器从 Kafka 集群管理服务获取集群和主题信息。 然后它从模式服务中获取模式。 然后 Presto 工作人员与 Kafka 集群并行对话以获取所需的 Kafka 消息。 我们还在 Kafka 集群上为 Presto 用户设置了代理配额,以防止集群降级。

详细的改进

以下部分深入探讨了我们为克服现有 Presto Kafka 连接器的限制并使其适用于大规模用例而进行的改进。

Kafka 集群/主题和数据模式发现

我们进行了更改以启用按需集群/主题和模式发现。首先,Kafka 主题元数据和数据模式在运行时通过 KafkaMetadata 获取,我们提取 TableDescriptionSupplier 接口来提供这些元数据,然后我们扩展接口并实现一个新策略,在运行时从内部 Kafka 集群管理服务和模式注册表中读取 Kafka 主题元数据。 同样,我们重构了 KafkaClusterMetadataSupplier 并实现了一种在运行时读取集群元数据的新策略。 由于集群元数据是按需获取的,因此我们也能够在单个 Kafka 连接器中支持多个 Kafka 集群。 添加了所有这些元数据的缓存层,以减少访问 Kafka 集群管理模式服务的请求数量。

查询过滤

为了提高 Kafka 和 Presto 集群的可靠性,我们希望避免大型查询读取过多的数据。 为了实现这一点,我们添加了列过滤器强制,检查 _timestamp 或 _partition_offset 在 Presto Kafka 查询的过滤器约束中是否存在。 没有这些过滤器的查询将被拒绝。

Kafka 集群的配额控制

Kafka 是 Uber 的重要基础设施,有许多实时用例,Kafka 集群的退化可能会产生巨大的影响,因此我们希望不惜一切代价避免它。 作为一个分布式查询引擎,Presto 可以启动数百个消费者线程来同时从 Kafka 获取消息。 这种消费模式可能会耗尽网络资源并导致潜在的 Kafka Cluster 退化,这是我们想要防止的。

我们可以做的一件事是从 Presto Cluster 级别限制消耗率,但从 Presto 方面实现并不容易。 作为替代方案,我们决定利用 Kafka 的代理配额来实现我们的目标。 我们进行了更改,允许我们从连接器配置中指定 Kafka 消费者客户端 ID。 通过此更改,我们可以为 Presto 中的所有工作人员使用静态 Kafka 客户端 ID,并且他们将受制于相同的配额池。

当然,这种方法是有代价的:同时进行多个 presto 查询将需要更长的时间才能完成。 这是我们必须做出的牺牲。 实际上,由于我们有查询过滤器,大多数查询都能够在合理的时间内完成。

结论

在推出该功能后,我们已经看到进行临时探索的生产力大大提高。 在此之前,工程师需要花费数十分钟甚至更长的时间来查找我们上面提到的示例的数据,但现在我们可以编写一个简单的 SQL 查询 SELECT * FROM kafka.cluster.order WHERE uuid= ‘0e43a4-5213-11ec’ 并且可以在几秒钟内返回结果。

在撰写这篇博文时,越来越多的用户正在采用 Presto on Kafka 进行临时探索。 每天有 6,000 次查询,我们还从 Presto 用户那里得到了很好的反馈,他们说 Presto on Kafka 让他们的数据分析变得更加容易。

展望未来,我们计划将我们所做的改进贡献回开源社区。 您还可以查看我们的 PrestoCon 演讲,了解有关我们所做工作的更多详细信息。

在撰写这篇博文时,越来越多的用户正在采用 Presto on Kafka 进行临时探索。 每天有 6,000 次查询,我们还从 Presto 用户那里得到了很好的反馈,他们说 Presto on Kafka 让他们的数据分析变得更加容易。

展望未来,我们计划将我们所做的改进贡献回开源社区。 您还可以查看我们的 PrestoCon 演讲,了解有关我们所做工作的更多详细信息。

本文转载自Uber Engineering,原文链接:https://eng.uber.com/presto-on-apache-kafka-at-uber-scale/。

0 人点赞