100PB级数据分钟级延迟:Uber大数据平台(下)

2022-03-30 16:31:03 浏览数 (1)

文章来源:Uber官方博客 作者:Reza Shiftehfar 翻译:欧高炎

导读:本文是介绍Uber大数据平台原创翻译文章的第2篇,系统介绍了Uber的大数据团队从2014年开始如何根据业务需求逐步改进其大数据平台架构,具有很高的参考和借鉴价值。阅读本文前请先阅读上一篇文章:100 PB数据分钟级延迟:Uber大数据平台介绍(上)

第3代:为长期计划重建我们的大数据平台

到2017年初,我们的大数据平台被整个公司的工程和运营团队使用,使他们能够在同一个地方访问新数据和历史数据。用户可以通过同一个UI门户轻松访问不同大数据平台的数据。我们的计算集群中有超过100PB的数据和100000个vcores。每天支持100,000个Presto查询, 10,000个Spark作业,以及 20,000个Hive查询。我们的Hadoop分析架构遇到了可扩展性限制,许多服务受到高数据延迟的影响。

幸运的是,我们的底层基础架构可以水平扩展以满足当前的业务需求。因此我们有足够的时间研究数据内容,数据访问模式和用户特定需求,以便在构建下一代之前确定最紧迫的问题。我们的研究揭示了四个主要的痛点:1)突破HDFS水平扩展限制;2)加快Hadoop数据处理速度;3)在Hadoop和Parquet中支持数据更新和删除;4)实现更快的ETL和建模。

介绍Hudi

为了满足上述要求(突破HDFS水平扩展限制、加快Hadoop数据处理速度、在Hadoop和Parquet中支持数据更新和删除、实现更快的ETL和建模),我们构建Hudi(HadoopUpserts anD Incremental)。Hudi是一个开源Spark库,在HDFS和Parquet之上提供一个抽象层来支持更新和删除操作。Hudi可以在任何Spark作业中使用,可以水平扩展,并且其运行只依赖于HDFS。因此,Hudi可以对任意大数据平台进行扩展,以支持对历史数据的更新和删除操作。

Hudi使我们能够在Hadoop中更新、插入和删除现有的Parquet数据。此外,Hudi允许数据用户增量地提取更新的数据,显著提升了查询性能,同时支持对派生建模表的增量更新。

我们的Hadoop生态系统中的原始数据是根据时间划分的,任何旧分区都可能在以后接收更新请求。因此,对于依赖于这些原始源数据表的数据用户或ETL作业,了解哪个日期分区包含更新数据的唯一方法是扫描整个源表并根据已有知识来过滤数据。更加麻烦的是,这些计算代价昂贵的查询操作的运行频率还非常高。

有了Hudi,用户可以简单地传递最近检查点时间戳,并检索该时间戳之后更新的数据,而无需运行扫描整个源表的昂贵查询。更新的数据包括添加到最近日期分区的新记录和对旧数据的更新(例如,今天发生的新行程和对6个月前某个行程数据的更改)。

使用Hudi库,我们的数据提取模式从基于源数据快照的模式转换到增量的提取的模式,数据延迟从24小时减少到不到1小时。图5描述了集成了Hudi的大数据平台:

图5:第三代大数据平台采取了更快的增量数据提取模式(使用开源Marmaray框架)和更高效的存储和数据服务(使用开源Hudi库)。

通用数据提取

Hudi并不是我们第三代大数据平台的唯一补充。我们还通过ApacheKafka处理存储和大数据团队之间对上游数据库的更改。上游数据库事件(以及不同应用和服务的传统日志消息)使用统一的Avro编码(包括标准的全局源数据头信息,例如时间戳、行键、版本、数据中心信息和发起主机)流入Kafka。Streaming团队和大数据团队都使用这些存储更改日志事件作为其源输入数据以进行进一步处理。

我们的数据提取平台Marmaray以小批量运行并从Kafka提取上游存储更改日志,使用Hudi库在Hadoop的现有数据上执行更改。前面已经提到,Hudi支持upsert操作,允许用户添加新记录并更新或删除历史数据。Spark上的提取作业每10-15分钟运行一次,Hadoop中原始数据延迟约为30分钟(考虑到1-2个提取作业失败或者重启)。为避免因多次将相同的源数据提取到Hadoop而导致效率低下,我们禁止在提取期间对数据进行任何转换。我们的原始数据提取框架实际上成了EL平台,而不是传统的ETL平台。在此模型下,我们鼓励用户在上游数据以其原始嵌套格式到达后,在Hadoop中以批处理的模式进行转换操作。

自从对我们的大数据平台实施这些更改以来,由于避免了不必要和低效的提取操作,我们节省了大量的计算资源。因为我们现在可以避免在提取过程中易于出错的转换,原始数据的可靠性也得到了显著提高。现在,用户可以在原始源数据的上层通过任何大数据引擎进行数据转换。此外,如果出现任何问题,用户可以重新运行其转换。同时可以通过使用更多计算资源和更高的程度并行性来更快地完成批转换作业,以满足用户服务协议。

增量数据建模

考虑到需要从上游数据存储中提取大量数据进Hadoop(截至2017年超过3,000个原始Hadoop表),我们还构建了一个通用提取平台。在这个平台中,我们以统一和可配置的方式将原始数据提取到Hadoop中。我们的大数据平台增量地更新Hadoop表,能够快速地访问源数据(数据延迟为10-15分钟)。但是,为了确保建模表也具有低延迟,我们必须避免建模的ETL作业中的低效操作(例如完全派生表复制或完整扫描原始数据数据表)。实际上,Hudi允许ETL作业仅从原始表中提取已更改的数据。建模作业仅仅需要在每一步迭代运行过程中给Hudi传入一个检查点时间戳,就可以从原始表中获取新的或更新的数据流(不用管日期分区数据实际存储在哪里)。

在ETL作业中使用Hudi写入器(Hudi Writer),我们可以直接在派生建模表直接对旧分区和表进行更新,而无需重新创建整个分区或表。因此,我们的建模ETL作业使用Hudi读取器增量地从源表中提取已更改的数据,并使用Hudi写入器增量地更新派生的输出表。现在,ETL作业可以在30分钟内完成,Hadoop中的所有派生表都仅有1小时以内的端到端延迟。

为了向Hadoop表的数据用户提供访问所有数据/新数据/更新数据的多种选项,使用Hudi存储格式的Hadoop原始表提供了两种不同的读取模式:

1. 最新模式视图。提供特定时间点Hadoop表的整体视图。此视图包括所有记录的最新合并值以及表中的所有现有记录。

2. 增量模式视图。从特定Hadoop表中提取给定时间戳以后的新记录和更新记录。此视图仅返回自最近检查点以来最近插入或已更新的行。此外,如果特定行自上一个检查点以来被多次更新,则此模式将返回所有这些中间更改的值(而不是仅返回最新的合并行)

图6描述了所有以Hudi文件格式存储的Hadoop表的这两个读取视图:

图6:通过Hudi写入器更新的原始表有两种不同的读取模式:最新模式视图返回所有记录的最新值;增量模式视图仅返回自上次读取后更新的记录。

用户通常根据需要在这两种表视图之间进行切换。使用专用查询基于最新状态分析数据时,他们会采用最新模式视图(例如提取美国每个城市的每周总旅行次数)。另一方面,当用户有一个迭代作业或查询仅仅需要获取自上次执行后的更新数据或新数据时,他们会使用增量模式视图。对于所有的Hadoop表,上面两种视图都是随时可用的,用户可以根据需要在两种模式之间进行切换。

标准化数据模型

除了提供同一个表的不同视图外,我们还对数据模型进行了标准化。对所有原始Hadoop数据,我们提供以下两种类型的表:

1. 更改日志历史记录表。包含为特定上游表收到的所有更改日志的历史记录。此表使用户能够扫描给定表的更改历史记录,并且可以按键合并以提供每行的最新值。

2. 合并快照表。包含上游表的最新合并视图。此表包含每一个键接受的所有历史更改日志的压缩合并视图。

图7描述了如何使用给定更改日志流为特定上游源数据生成不同的Hive原始表:

图7:对Hive数据模型的标准化大大改善了整个大数据生态系统的数据质量。此模型包含一个合并的快照表,其中包含每个row_key的最新值和每个row_key的历史变更记录。

然而,更新日志流可能不包含给定键的整个行(所有列)。虽然合并的快照表始终提供特定键的所有列,更新日志历史表则可能是稀疏的,因此我们可以通过避免发送整行来提高效率。如果用户希望从更新日志历史记录表中提取更改的值并将其与合并的快照表连接以创建完整的数据行,我们还会在更新日志历史记录表中的合并快照表中包含相同键的日期分区。

图8显示了我们的大数据平台的不同组件之间的关系:

图8:构建更具可扩展性的数据传输平台使我们能够在一种服务下以标准方式轻松聚合所有数据流水线,并支持数据源和数据接收器之间的多对多连接。

第4代:下一步是什么?

自2017年推出第三代大数据平台以来,整个公司的用户可以快速可靠地访问Hadoop中的数据。但是依然还有进一步提升的空间。下面介绍一下在改进Uber的大数据平台方面我们正在做的努力,包括提高数据质量、降低数据延迟、提高效率、增强可扩展性和提高可靠性。

数据质量

为了提高数据质量,我们确定了两个关键的改进方向。首先,尽量避免非模式确认的数据。例如如果某些上游数据仓库在存储之前没有强制执行或检查数据模式时(例如存储值为JSON块的键值对),导致不良数据进入Hadoop生态系统,从而影响所有依赖此数据的下游用户。为了防止不良数据的涌入,我们正在将对上游数据内容执行强制性模式检查,并在数据存在任何问题(例如未经过模式确认)时拒绝数据记录。

第二个方向是改善数据内容本身的质量。使用模式检查能确保数据包含正确的数据类型,但它们不检查实际数据值(例如整数而不是[0,150]之间的正数)。为了提高数据质量,我们正在扩展我们的架构服务以支持语义检查。这些语义检查(Uber特定的数据类型)允许我们在基本结构类型检查之外对数据内容添加额外约束。

数据延迟

我们的目标是将Hadoop中的原始数据延迟减少到五分钟以内,将建模表的数据延迟减少到十分钟以内。这将允许更多用例从流处理转向使用Hudi的增量数据拉取进行更高效的小批量处理。

我们还在扩展Hudi项目,以支持其他视图模式,包括现有的读取优化视图,以及新的实时视图(分钟级别的数据延迟)。这个实时视图依赖于我们的的开源解决方案(Hudi的一部分),我们称之为Merge-On-Read或Hudi 2.0。

数据效率

为了提高数据效率,我们正在努力避免我们的服务依赖于专用硬件,且将服务尽量docker化。此外,我们统一了Hadoop生态系统内部和外部的资源调度,以尽量桥接公司的Hadoop和非数据服务之间的鸿沟。这允许所有作业和服务以统一的方式进行调度,而不用管它们具体在什么媒介上运行。随着Uber的发展,数据本地化将成为Hadoop应用的一大问题,成功的统一资源管理器可以将所有现有的调度程序集中在一起(例如Yarn、Mesos和Myriad)。

可扩展性和可靠性

作为我们改善平台可扩展性和可靠性的努力的一部分,我们确定了几个极端的情况下的问题。虽然数据提取平台是以通用可插拔的模式开发的,实际上游数据提取仍然包括许多依赖于源的流水线配置。这使得提取流水线变得脆弱且提高了运营成千上万这类流水线的维护成本。

为了确保对任意数据源的统一提取, Uber大数据团队和数据存储团队合作启动了一个项目,以统一所有上游数据源更新日志的内容、格式和元数据,而不管其具体技术架构。该项目将确保与这些特定上游技术相关的信息只是作为额外的元数据被添加到实际更新日志值中(而不用针对不同的数据源设计完全不同的更新日志内容)。无论上游源是什么,都可以统一进行数据提取。

我们的Hudi的新版本将允许数分钟内为所有数据源生成更大的Parquet文件(从当前的128MB提高到1GB)。它还将消除当前版本对更新与插入比率的敏感性。Hudi 1.0依赖于一种名为copy-on-write的技术,只要有更新的记录,它就会重写整个源Parquet文件。这显著增加了写入放大,特别是当更新与插入的比率增加时。并且妨碍了在HDFS中创建大的Parquet文件。Hudi的新版本正在克服上述限制。具体方法是将更新的记录存储在单独的增量文件中,然后通过某种协议异步合并到Parquet文件中(当有足够数量的更新数据时再重写大的Parquet文件,以此来分摊写入开销)。将Hadoop数据存储在较大的Parquet文件中以及更可靠的源独立数据提取平台将使我们的分析数据平台在未来几年随着业务的蓬勃发展而继续改进。

未来展望

Uber的数据组织依赖于跨部门职能协作,包括的数据平台团队、数据基础团队、数据流和实时平台团队以及大数据团队。我们旨在构建支持Uber分析数据基础架构的所需库和分布式服务。

0 人点赞