打车巨头Uber是如何构建大数据平台?

2021-09-23 11:23:53 浏览数 (1)

大家好,我是一哥,最近滴滴出的技术少了,给大家分享一下Uber的大数据平台是如何建设的?

随着 Uber 业务的扩张,为公司业务提供支持的基础数据池也在飞速膨胀,其处理成本水涨船高。当大数据成为我们最大的运维支出项目之一后,我们启动了一项降低数据平台成本的计划。该计划将问题分解为三大分支:平台效率、供应和需求。在这篇文章中,我们将讨论 Uber 为提高数据平台效率和降低成本所做的一系列工作。

Apache Hudi

我们在大数据平台中遇到的最明显的成本效益提升机会之一是高效的增量处理。我们的许多事实数据集可能会延迟到达或被更改。例如,在许多情况下,乘客直到他或她准备要求下一次行程时才会对上次行程的司机打分。信用卡的退款有时可能需要一个月的时间来处理。

如果没有高效的增量处理框架,我们的大数据用户必须每天扫描过去许多天的旧数据,才能让他们的查询结果保持新鲜度。一种更有效的方法是每天只处理增量更改,这就是 Hudi 项目的意义所在。

我们在 2016 年启动了 Hudi 项目,并于 2019 年将其提交给了 Apache Incubator Project。Apache Hudi现在是一个顶级项目,且我们在 HDFS 上的大部分大数据都是 Hudi 格式。这大大降低了 Uber 的计算能力需求。

大数据文件格式优化

我们的大部分 Apache®Hadoop®文件系统(HDFS)空间都被 Apache Hive 表占用了。这些表以 Apache Parquet 文件格式或 Apache ORC 文件格式存储。尽管我们计划在未来的某个时候将它们统一整合到 Parquet,但由于许多特殊需求(包括特定条件下的兼容性和性能),我们尚未实现这一目标。

Parquet 和 ORC 文件格式都是基于块的列格式,这意味着文件包含许多块,每个块包含大量的行(比如 10,000 行),存储在列中。

我们花了很多时间来分析 HDFS 上的文件,并决定进行以下优化工作,主要针对 Parquet 格式:

  1. 压缩算法:默认情况下,我们使用 GZIP Level 6 作为 Parquet 内部的压缩算法。最近关于 Parquet 支持 Facebook 的 ZSTD 算法的社区进展引起了我们的注意。在我们的实验中,与基于 GZIP 的 Parquet 文件相比,ZSTD Level 9 和 Level 19 能够将我们的 Parquet 文件大小分别减少 8% 和 12%。此外,ZSTD Level 9 和 Level 19 的解压速度都比 GZIP Level 6 要快。我们决定采用 ZSTD Level 9 重新压缩我们过期 1 个月后的数据,并采用 ZSTD Level 19 压缩我们过期 3 个月后的数据。这是因为在我们的实验中,ZSTD Level 9 比 Level 19 快两倍。请注意,重新压缩作业是后台维护作业,可以使用无保证的计算资源运行。鉴于此类资源相当丰富,我们基本上可以将这些重压缩作业视为免费的。
  2. 列删除:我们的许多 Hive 表——尤其是从 Apache Kafka®日志中提取的表——都包含许多列,其中一些还是嵌套的。当我们查看这些列时,很明显,其中一些列没有必要长期保留。比如说为了调试每个 Kafka 消息的元数据,以及由于合规性原因需要在一段时间后删除的各种字段都可以删掉。这种列格式让我们在技术上可以做到删除文件内的一些列时无需解压和重新压缩其他列。这让列删除成为了一种非常节省 CPU 的操作。我们在 Uber 实现了这样一个特性,并将它大量用于我们的 Hive 表,还把 代码 贡献回了 Apache Parquet。
  3. 行重排序:行顺序可以显著影响压缩后 Parquet 文件的大小。这是由于 Parquet 格式中的运行长度(Run-Length)编码特性,以及压缩算法利用局部重复的能力造成的。我们检查了 Uber 最大的一些 Hive 表,并对排序做了手动调整,将表大小减少了 50% 以上。我们发现的一个常见模式是简单地按用户 ID 对行排序,然后是按日志表的时间戳排序。大多数日志表都有用户 ID 和时间戳列。这让我们能够非常高效地压缩与用户 ID 关联的许多非规范化列。
  4. Delta 编码:我们开始按时间戳对行排序后,很快就注意到了 Delta 编码可以帮助我们进一步减少数据大小。因为与时间戳值本身相比,相邻时间戳之间的差异非常小。在某些情况下,日志具有稳定的节奏,就像心跳一样,因此这种差异是恒定的。但是,在我们广泛使用 Apache Hive、Presto®和 Apache Spark 的环境中,如 StackOverflow问题 中所述,在 Parquet 中启用 Delta 编码并非易事。我们还在探索这个方向。

HDFS 纠删码

纠删码(Erasure Coding)可以显著减少 HDFS 文件的复制因子。由于这种技术会增加 IOPS 负载,所以在 Uber,我们主要研究 3 2 和 6 3 模式,对应的复制因子分别为 1.67 倍和 1.5 倍。鉴于默认的 HDFS 复制因子是 3 倍,也就是说我们可以将 HDD 空间需求减少近一半!

不过,纠删码还有多种选择:

  1. Apache Hadoop3.0 HDFS 纠删码:这是在 Apache Hadoop 3.0 中实现的官方纠删码。这个实现的好处是它同时适用于大文件和小文件。其缺点是 IO 效率不高,因为纠删码的块非常碎片化。
  2. 客户端纠删码:这种编码首先由 Facebook 在 HDFS-RAID 项目中实现。这种方法的好处是它的 IO 效率非常高。当所有块都可用时,读取 IO 效率与块进行 3 路复制的基线相当。缺点是它不适用于小文件,因为每个块都是纠删码计算的一个单位。

在咨询了行业专家后,我们决定采用 Apache Hadoop 3.0 HDFS 纠删码,因为这是社区的方向。我们仍处于 Apache Hadoop 3.0 HDFS 纠删码的评估阶段,但我们相信这种技术将显著降低我们的 HDFS 成本。

YARN 调度策略改进

在 Uber,我们使用 Apache YARN 来运行大部分的大数据计算负载(Presto 除外,它直接运行在专用服务器上)。就像其他很多公司一样,我们一开始用的是 YARN 中的标准容量调度器(Capacity Scheduler)。容量调度使我们可以为每个队列配置具有 MIN 和 MAX 设置的分层队列结构。我们创建了一个以组织为第一级的 2 级队列结构,允许用户根据子团队、优先级或作业类型创建第二级队列。

虽然容量调度器为我们管理 YARN 队列容量的工作提供了一个良好的开端,但我们很快就遇到了管理 YARN 集群容量的困境:

  1. 高利用率:我们希望 YARN 集群的平均利用率(以分配的 CPU 和 MemGB/ 集群的总 CPU 和 MemGB 容量衡量)尽可能高;
  2. 满足用户期望:我们希望给用户提供明确的预期,告诉他们可以从集群中获得多少资源

我们的许多用户对 YARN 集群有尖锐但可预测的资源需求。例如,一个队列可能有一组日常作业,每个作业在一天中的特定时间开始,并在相似的时间段内消耗相似数量的 CPU/MemGB。

如果我们将队列的 MIN 设置为白天的峰值使用量,那么集群利用率将非常低,因为队列的平均资源需求远低于 MIN。

如果我们将队列的 MAX 设置为白天的高峰用量,那么随着时间的推移,队列可能会被滥用,让资源持续接近 MAX,进而可能影响其他队列中其他人的正常作业.

我们如何捕捉用户的资源需求并正确设定他们的预期呢?我们提出了以下想法,称为动态峰值(Dynamic MAX)。

动态峰值算法使用以下设置:

  1. 将队列的 MIN 设置为队列的平均使用率
  2. 队列的 MAX 设置公式如下:
代码语言:javascript复制
Dynamic_MAX = max(MIN, MIN * 24 – Average_Usage_In_last_23_hours * 23)

Dynamic_MAX 在每小时开始时计算,并应用于该小时的队列 MAX。这里的动态峰值算法背后的想法是:

  1. 如果队列在过去 23 小时内根本没有使用,我们允许队列峰值最多达到其 MIN 的 24 倍。这通常足以处理我们绝大多数的尖峰负载。
  2. 如果队列在过去 23 小时内平均使用量在 MIN 水平,那么我们只允许队列在下一个小时的使用量不高于 MIN。有了这个规则,队列在 24 小时内的平均使用量不会超过 MIN,从而避免了上面提到的滥用情况。

上述动态峰值算法很容易向用户解释:基本上,他们的使用量最多可以飙升到他们队列 MIN 的 24 倍,但为了公平起见,他们在 24 小时内的累积使用量不能超过 MIN 级别的集群平均使用量。

实际上,我们将 MIN 设置为队列平均使用量的 125%,以应对最高 25% 的每日使用差异。这反过来意味着我们 YARN 集群的平均利用率(以 CPU/MemGB 分配衡量)将在 80% 左右,这对于成本效率指标来说是一个相当不错的利用率水平。

避开高峰时间段

YARN 资源利用率的另一个问题是整个集群级别仍然存在一种日常模式。许多团队决定在 00:00-01:00 UTC 之间运行他们的 ETL 管道,因为据说那是最后一天的日志准备就绪的时候。这些管道可能会运行 1-2 个小时。这让 YARN 集群在那些高峰时段非常忙碌。

我们计划实现一套基于时间的费率算法,而不是向 YARN 集群添加更多机器,因为后者会降低平均利用率并损害成本效率。基本上,当我们计算过去 23 小时的平均使用量时,我们会应用一个根据一天中时点而变化的比例因子。例如,0-4 UTC 高峰时段的比例因子为 2 倍,其余时间为 0.8 倍。

联邦集群

随着我们的 YARN 和 HDFS 集群不断膨胀,我们开始注意到了一个性能瓶颈。由于集群大小不断增加,HDFS NameNode 和 YARN ResourceManager 都开始变慢。虽然这主要是一个可扩展性挑战,但它也极大影响了我们的成本效率目标。

为了解决这个问题,摆在我们面前有两个策略选项:

  1. 继续提升单节点性能:比如我们可以使用配备了更多 CPU 虚拟核心和内存的机器。我们还可以运行栈跟踪和火焰图来找出性能瓶颈并一一优化。
  2. 集群的集群(联邦):我们可以创建一个由许多集群组成的虚拟集群。每个底层集群都会有一个可以发挥 HDFS 和 YARN 最优性能的大小设置。上面的虚拟集群将处理所有负载路由逻辑。

出于以下原因,我们选择了第二个选项:

  1. 世界上大多数 HDFS 和 YARN 集群都比我们在 Uber 需求的规模要小。如果我们运行超大集群,很可能会遇到很多在小集群中不会出现的未知错误。
  2. 为了让 HDFS 和 YARN 能够扩展到 Uber 的集群规模,我们可能需要更改源代码以在性能和复杂特性之间做出各种权衡。例如,我们发现容量调度器有一些复杂的逻辑会减慢任务分配的速度。但是,为摆脱这些逻辑而做的代码更改将无法合并到 Apache Hadoop 主干中,因为其他公司可能需要这些复杂的特性。

为了能在不分叉的情况下利用开源 Hadoop 生态系统,我们决定构建集群的集群这种设置。具体来说,我们使用了基于路由的 HDFS 联邦和 YARN 联邦。它们都来自开源 Apache Hadoop。截至目前,我们已经建立了数十个 HDFS 集群和少数 YARN 集群。基于 HDFS 路由的联邦一直是我们大数据可扩展性工作的基石,它也提高了成本效率。

通用负载均衡

前文介绍了 P99 和平均利用率挑战。第 3 部分中关于廉价和大硬盘的解决方案则会涉及 IOPS P99 的重要性。

在本节中,我们将通过以下方式讨论适用于 HDFS 和 YARN 的通用负载均衡方案:

  1. HDFS DataNode 磁盘空间利用率均衡:每个 DataNode 可能都有不同的磁盘空间利用率比率。在每个 DataNode 中,每个 HDD 都可能有不同的磁盘空间利用率。所有这些都需要做均衡,以实现较高的磁盘空间平均利用率。
  2. YARN NodeManager 利用率均衡:在任何时间点,YARN 中的每台机器都可以有不同级别的 CPU 和 MemGB 分配和利用率。同样,我们需要均衡分配和利用率,以实现较高的平均利用率。

上述解决方案之间有很多相似性,启发我们提出了通用负载均衡思想,它适用于我们大数据平台内外的更多用例,例如微服务负载均衡和主存储负载均衡。所有这些用例之间的共同联系是,它们的目标都是缩小 P99 与平均值之间的差距。

查询引擎

我们在 Uber 的大数据生态系统中使用了几个查询引擎:Hive-on-Spark、Spark 和 Presto。这些查询引擎与文件格式(Parquet 和 ORC)相结合,为我们的成本效率工作创建了一个有趣的权衡矩阵。我们使用的其他选项还包括 SparkSQL 和 Hive-on-Tez 等,它们让我们的权衡决策变得更加复杂了。

以下是我们在提高查询引擎成本效率方面所做的主要工作:

  1. 专注于 Parquet 文件格式:Parquet 和 ORC 文件格式共享一些共同的设计原则,如行组、列存储、块级和文件级统计。但它们的实现是完全独立的,并且与我们在 Uber 使用的其他专有系统具有不同的兼容性级别。随着时间的推移,我们在 Spark 中看到了更好的 Parquet 支持,在 Presto 中看到了更好的 ORC 支持。鉴于对文件格式新特性的需求不断增长,我们必须决定专注在一种主要的文件格式上,于是我们最后选择了 Parquet。单一的主要文件格式使我们能够将精力集中在一个单一的代码库中,并随着时间的推移积累相应的专业知识。
  2. 嵌套列修剪(Nested Column Pruning):Uber 的大数据表具有嵌套程度非常高的数据。这部分是因为我们的许多上游数据集都以 JSON 格式存储(请参阅 设计无Schema),并且我们对它们强制实施了 Avro schema。于是,对嵌套列修剪的支持成为了 Uber 查询引擎的一个关键特性,否则深度嵌套的数据将需要从 Parquet 文件中完全读出才行——即使我们只需要嵌套结构中的单个字段. 我们为 Spark 和 Presto 添加了嵌套列修剪支持。这些改进显著提高了我们的整体查询性能,我们还将它们回馈给了开源社区。
  3. 常见查询模式优化:在我们的负载中看到接近一千行的 SQL 查询的情况并不少见。虽然我们使用的查询引擎都有一个查询优化器,但它们并没有针对 Uber 常见的模式有专门的优化。其中一个例子是一些 SQL 构造,如“RANK() OVER PARTITION”和“WHERE rank = 1”,其目的是提取另一列值最大的行中一列的值,也就是数学术语中的“ARGMAX”。当查询被重写为使用内置函数“MAX_BY”时,像 Presto 这样的引擎可以运行得更快。

根据我们的经验,很难预测哪个引擎最适合哪种 SQL 查询。Hive-on-Spark 通常对于大量随机数据有很高的可扩展性。反过来,对于涉及少量数据的查询,Presto 往往非常快。我们正在积极关注开源大数据查询引擎领域的改进,并将继续利用它们优化我们的负载以提升成本效率。

下一步计划和待解决挑战

大数据与在线服务同主机托管

虽然我们决定让大数据负载在线上服务不需要自己的主机时借用后者的主机,但让两个负载在同一主机上运行会带来许多额外的挑战。

在托管对性能的影响方面有许多研究论文。我们方法的主要不同点在于,我们计划为大数据负载提供非常低的优先级,以尽量减少其对在线服务的影响。

融合在线和分析存储

我们的很多数据集都存储在线上存储系统(无 schema 存储在闪存上的 MySQL 数据库中)和分析存储系统(存储在硬盘驱动器上的 HDFS 中的 Hive 表)中。此外,为了提供即时查询速度,我们还投资了 Pinot 等存储引擎。所有这些带来了相同逻辑数据的许多副本,虽说副本是以不同的格式存储的。

是否有可能实现一个可以同时处理在线和分析查询的统一存储系统呢?这将显著降低存储成本。

水电项目:利用维护作业来“存储”额外的计算能力

集群中的计算能力与电力供应很像。它通常在供应侧是固定的,并且在需求激增或不一致的情况下会受到影响。

抽水蓄能水力发电 可以将多余的电力以水的重力势能的形式储存起来,然后在需求高峰时将其转换回电能。

我们可以将这种思想应用在计算能力上吗?的确可以!这里要介绍的一项关键思想是维护作业,它们是可以在第二天甚至一周内随时发生的后台任务。典型的维护作业包括 LSM 压缩、压缩、二级索引构建、数据清理、纠删码修复和快照维护等。几乎所有没有严格 SLA 的低优先级作业都可以视为维护作业。

在我们的大多数系统中并没有明确拆分维护和前台工作。例如,我们的大数据摄取系统写入 ZSTD 压缩的 Parquet 文件,这会占用大量 CPU 资源并生成非常紧凑的文件。换一种方式,我们还可以让摄取系统编写轻度压缩的 Parquet 文件,这些文件占用更多磁盘空间但 CPU 用量更少。然后我们有一个维护作业,它会稍后运行来重新压缩文件。通过这种方式,我们可以显著减少前台 CPU 的需求。

维护作业只需非保证的计算能力就能运行。正如我们之前所描述的,我们有足够的资源用于此目的。

大数据用量的定价机制

鉴于我们用的是多租户大数据平台,我们经常会遇到难以满足所有客户资源需求的情况。我们如何优化有限硬件预算的总效用?带有高峰时间乘数的 Dynamic_MAX 是最佳选项吗?

我们相信实际上还有更好的解决方案。但是,这将需要提出更精细的定价机制。我们想探讨的例子包括:每个团队可以在我们的集群上花费一种代币,或者用户可以用某种积分来提高他们的工作优先级,等等。

总 结

在这篇博文中,我们分享了 Uber 在提高大数据平台成本效率方面的工作成果和理念,包括文件格式改进、HDFS 纠删码、YARN 调度策略改进、负载均衡、查询引擎和 Apache Hudi 等。这些改进显著降低了平台成本。此外,我们还探索了一些开放性挑战,例如分析和在线托管以及定价机制等。然而,正如我们之前文章中概述的框架所展示的那样,仅靠平台效率的提升并不能确保较高的运维效率。控制数据的供应和需求也是同样重要的,我们将在下一篇文章中讨论这个主题。

作者介绍:

Zheng Shao 是 Uber 的高级工程师。他领导着整个基础设施成本效率技术项目,重点关注大数据成本效率。他还是 Apache Hadoop PMC 成员和名誉 Apache Hive PMC 成员。

Mohammad Islam 是 Uber 的高级工程师。他共同领导数据成本效率项目,还领导数据安全和合规项目。他还是 Apache Oozie 和 Tez PMC 的成员。

0 人点赞