Yelp 的 Spark 数据血缘建设实践!

2022-09-16 16:23:39 浏览数 (1)

在这篇博文中,我们介绍了 Spark-Lineage,这是一种内部产品,用于跟踪和可视化 Yelp 的数据是如何在我们的服务之间处理、存储和传输的。

Spark 和 Spark-ETL:在 Yelp,Spark被认为是一等公民,处理各个角落的批量工作,从处理评论到识别同一地区的相似餐厅,到执行有关优化本地业务搜索的报告分析。Spark-ETL 是我们围绕 Spark 的内部包装器,提供高级 API 来运行 Spark 批处理作业并抽象出 Spark 的复杂性。Spark-ETL 在 Yelp 被广泛使用,帮助节省了我们的工程师编写、调试和维护 Spark 作业所需的时间。

问题:我们的数据在数百个微服务之间进行处理和传输,并以不同的格式存储在包括 Redshift、S3、Kafka、Cassandra 等在内的多个数据存储中。目前我们每天有数千个批处理作业在运行,越来越难以理解它们之间的依赖关系。想象一下你自己是一名软件工程师,负责发布由几个关键 Yelp 服务使用的数据的微服务;您即将对批处理作业进行结构更改,并想知道您的服务的下游对象和内容将受到影响。或者想象自己扮演一个机器学习工程师的角色,他想在他们的模型中添加一个 ML 功能并问:“我可以自己运行检查以了解这个功能是如何生成的吗?”

Spark-Lineage: Spark-Lineage 就是为解决这些问题而构建的。它提供数据旅程的可视化表示,包括从起点到目的地的所有步骤,并提供有关数据去向、谁拥有数据以及在每个步骤中如何处理和存储数据的详细信息。Spark-Lineage 从每个 Spark-ETL 作业中提取所有必要的元数据,构建数据移动的图形表示,并让用户通过第三方数据治理平台以交互方式探索它们。

图 1. Spark-ETL 作业的 Spark-Lineage 视图示例

图 2. Spark-Lineage 概述

使用 Spark-ETL 运行 Spark 作业很简单;用户只需提供(1)通过 yaml 配置文件提供源和目标信息,以及(2)通过 python 代码从源到目标的数据转换逻辑。

图 3. Spark-ETL 作业的示例图

在后端,我们直接在 Spark-ETL 中实现 Spark-Lineage,以从每个批处理作业中提取所有具有依赖关系的源表和目标表对。更准确地说,我们使用NetworkX库来构建作业的工作流图,并在该作业的相应有向无环图 (DAG) 工作流中查找在它们之间具有路径的所有源表和目标表对。转换中的所有中间表都不会记录在 Lineage 中,因为它们是临时的。例如,(输入表 1,输出表 2)是图 3 中的一对,因为它们之间存在路径,而(输入表 2,输出表 2)则不是。对于每一对这样的对,我们向 Kafka 发送一条消息,包括源和目标的标识符,以及其他必要的元数据。然后这些消息从 Kafka 传输到 Redshift 中的专用表。

我们采用两步流程而不是直接将消息发送到一个地方的原因是 Redshift 有维护停机时间,而 Kafka 随时可以接收新发出的消息。另一方面,在 Redshift 中存储数据非常持久且易于查询以用于分析目的。在 Yelp,我们每天大约有数千个批次,平均每个作业发出大约 10 条消息。总的来说,Lineage 表每年增长几百万行,这可以由 Redshift 轻松处理。Spark-Lineage 然后使用 ETL 工具插件从 Redshift 表中读取并为用户提供服务。

构建 Spark-Lineages UI

首先,我们解析 Redshift 中上述步骤提供的元数据,并识别源和目标信息。此元数据首先被读入 Redshift 数据库中的临时表。我们暂存此数据的原因是为了识别在日常负载中引入的任何新作业或捕获对现有计划作业的任何更新。

然后,我们为每个 Spark-ETL 表创建一个链接(表、文件等的规范术语)以及从元数据中提取的附加信息。我们还使用它们各自的模式添加这些作业之间的关系。最后我们根据从 Spark-ETL 中提取的 DAG 建立源表和目标表之间的连接。

Spark-Lineages 的模拟 UI 如图 1 所示,用户可以在其中浏览或搜索所有 Spark 表和批处理作业,读取每个表和作业的详细信息,并跟踪它们之间的从源到结束的依赖关系.

了解机器学习功能

研究机器学习模型的数据科学家经常在构建新功能时寻找现有数据。在某些情况下,他们发现的数据可能基于关于应包含哪些数据的不同假设。例如,当模型不希望包括此类事件时,一个团队可能会将背景事件包括在给定用户已执行的所有最近事件的计数中。在这种情况下,Spark-Lineage 允许团队追踪哪些数据用于识别这些不同的决策,以及哪些数据可以缓解差异。

了解影响

识别和记录数据沿袭的主要优势之一是,它使 Yelpers 能够了解任何下游/上游依赖关系,以了解将合并到功能中的任何更改。它还提供了一种跨相关团队轻松协调的能力,以主动衡量变更的影响并做出相应的决策。

修复数据事件

在分布式环境中,有很多原因会导致批处理作业脱轨,从而导致数据不完整、重复和/或部分损坏。此类错误可能会静默一段时间,一旦被发现,就已经影响了下游作业。在这种情况下,响应包括冻结所有下游作业以防止损坏的数据进一步传播,跟踪所有上游作业以查找错误源,然后从那里回填所有下游不准确的数据。最后,我们在回填完成后恢复作业。所有这些步骤都需要尽快完成,Spark-Lineage 可能是快速识别腐败嫌疑人的理想场所。

此外,在 Spark-Lineage 中提及负责团队建立了工作职责,因此维护团队或现场团队可以在正确的时间与正确的团队联系。这避免了与多个团队进行多次对话以确定工作的所有者,并减少了可能对业务报告产生不利影响的任何延迟。

Feature Store

Yelp 的 ML Feature Store 收集和存储特征并将其提供给消费者以构建机器学习模型或运行 Spark 作业,并为数据分析师提供决策见解。Feature Store 提供了许多好处,其中包括:

  1. 避免重复工作,例如来自不同团队尝试构建相同功能;
  2. 确保训练和服务模型之间的一致性; 和
  3. 帮助工程师轻松发现有用的功能。

Data Lineage 可以通过各种方式帮助改进 Feature Store。我们使用 Lineage 来跟踪功能的使用情况,例如功能的使用频率以及由哪些团队使用,以确定功能的受欢迎程度,或者功能可以带来多少性能提升。由此,我们可以执行数据分析来推广或推荐好的特性,或者指导我们生成我们认为对我们的机器学习工程师有益的类似特性。

合规性和可审计性

Lineage 中收集的元数据可供法律和工程团队使用,以确保按照法规和政策处理和存储所有数据。它还有助于在数据处理管道中进行更改以符合新法规,以防将来引入更改。

这篇文章介绍了 Yelp Spark-Lineage,并展示了它如何帮助跟踪和可视化我们服务之间的数据生命周期,以及 Spark-Lineage 在 Yelp 不同领域的应用。对于对 Spark-Lineage 的具体实现感兴趣的读者,我们在下面提供了服务器端和客户端的细分(附录)。

服务端实现

数据标识符

Spark-Lineage 需要跟踪的最基本的元数据是数据的标识符。我们提供了 2 种方法来识别输入/输出表:schema_id和数据的位置

  • Schema_id: Yelp 的所有现代数据都被模式化并分配了一个 schema_id,无论它们是存储在 Redshift、S3、Data Lake 还是 Kafka 中。
  • 位置:另一方面,数据存储之间的表位置不是标准化的,但通常它是 (collection_name, table_name, schema_version) 的三元组,尽管它们通常为每个数据存储称为不同的东西,以符合术语该数据存储的。

无论哪种方式,如果我们得到一个标识符,我们就可以得到另一个。查找模式信息可以通过 CLI 或 PipelineStudio——一个简单的 UI 以交互方式探索模式,或者直接在 Spark-Lineage UI 上完成,与 PipelineStudio 相比具有更高级的功能。通过提供两个标识符之一,我们可以看到表中每一列的描述以及表的模式如何随着时间的推移而演变等。

这两个标识符中的每一个都有自己的优点和缺点,并且相互补充。例如:

  • schema_id 提供了一种更规范的方式来访问数据信息,但该位置更容易记住并且对用户更友好。
  • 在模式更新的情况下,schema_id 将不再是最新的,而使用对 (collection_name, table_name) 查找时将始终返回最新的模式。 使用schema_id,我们也可以发现最新的schema,但需要多一步。

跟踪其他信息

Spark-Lineage 还提供以下信息:

  • 运行日期:我们收集每次运行作业的日期。由此我们可以推断出它的运行频率,这比根据yaml文件中的描述更可靠,因为未来可以改变频率。如果我们一个月没有收到任何运行,我们仍然保持作业的输出表可用,但将它们标记为已弃用,以便用户知道这一点。
  • 结果:我们还跟踪每次作业运行的结果(成功/失败)。 如果出现故障,我们不会通知作业的所有者,因为在 Yelp,我们有专门的监控和警报工具。 我们将这些数据用于与上述相同的目的; 如果服务多次失败,我们将标记输出表,让用户知道这一点。
  • 作业名称和 yaml 配置文件:这有助于用户快速找到必要的信息以了解作业的逻辑,以及作业的所有者,以防用户想联系以获取后续问题。
  • Spark-ETL 版本、服务版本和 Docker 标签:每次运行时也会跟踪此信息,并用于更多技术目的,例如调试。 一个用例是,如果 ML 工程师最近发现了某个特性的统计变化,他可以查找并比较今天运行的特定代码与上个月的运行代码。

客户端实现

Spark ETL 作业的表示:作为表示 Spark ETL 作业的第一步,创建一个名为“Spark ETL”的新域。这样可以轻松进行目录搜索,并在专用区域中存储 Redshift 临时表中的 Spark-ETL 作业的详细信息。一旦域可用,就会在数据治理平台中创建唯一链接(用于 spark ETL 作业),作业名称作为标识符。

添加元数据信息: Spark ETL 作业的详细信息(例如,存储库、源 yaml 等)附加到上面创建的相应链接。每个元数据信息都被赋予一个与相关作业相关的唯一 ID 和值。为 Spark ETL 作业实现的当前机制可以扩展以表示将来的附加信息。

分配责任:当所有者的信息从 Kafka 提取到 Redshift 时,数据治理平台中作业链接的责任部分可以修改为包括“技术管家”——负责 Spark ETL 作业的工程团队,包括生产和维护实际的源数据,并负责数据的技术文档和数据问题的故障排除。

建立沿袭:一旦 Spark-ETL 作业和所需的元数据信息在数据治理平台中可用,我们建立 2 向关系来描述源到 Spark ETL 作业和 Spark ETL 作业到目标关系。这些关系是使用 REST POST API 调用建立的。创建关系后,将自动创建沿袭并可供使用。有多个视图可用于描述关系,但“沿袭视图”一直捕获依赖关系,直到 Tableau 仪表板(参见图 1)。

本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://cloud.tencent.com/developer/article/2109408

0 人点赞