浅谈数据流水线

2021-09-15 15:41:38 浏览数 (1)

当下我们听过很多热门的技术名词,例如:机器学习模型、推荐系统、高管驾驶舱、BI等等,在这些技术背后一个关键的角色就是:数据。这些数据通常不是单一的,原始的数据,而是需要从多个数据源获取,并经过复杂的提取、清洗、处理、加工等过程才能最终提供真正的价值。我们常说“数据是未来的石油”,其实也就是在说,数据并不是“开采”出来就可以直接提供价值的,而是要经过若干流程的“加工”和“提纯”才可以产生价值。而对于数据的加工和处理流程,我们通常将其称为数据流水线,也就是 Data Pipeline。

什么是数据流水线

那么什么是数据流水线呢?数据流水线从广义来讲,会包含源数据的接入、数据的处理、数据交付、数据管理、数据治理以及数据的访问模式等,同时也包含狭义上的数据流水线,也即对数据做相应的ETL处理,本文我们将重点关注于狭义的数据流水线。

它通常指从若干数据源中迁移数据,将迁移的数据进行复杂的数据处理之后,并加载到目标数据系统中的一系列流程,且数据的价值正是在每一步的流转中逐步产生的。数据流水线通常也是实现机器学习模型、数据分析、业务报表等技术的基础。

数据流水线的复杂性取决于数据源的数据结构、数据质量以及我们要实现的业务需求。一条最简单的数据流水线可以只包含从一个数据源(例如:网络日志文件)复制数据,经过基础的数据清洗(例如:去除空值、无效值),再加载到目标数据仓库(例如:Hive)中。但通常情况下,一条数据流水线会包含多个步骤,包括多数据源的数据萃取、多步骤的数据处理、多环节的数据验证、有时还可能包含机器学习模型的训练和运行等任务。且这些不同的任务通常来自于不同的系统,并可以用不同的技术或语言来实现。

一条简单的数据流水线

为什么要构建数据流水线

我们都知道,冰山露出在水面之外的部分只占冰山整体的一小部分,而只有这些露出水面的部分才可以被人看到,但冰山的大部分都是隐藏在海水之下的。对于数据类相关的项目亦是如此,高管可能只关注放在他办公室的管理驾驶舱或仪表盘,业务部门可能只关注在某个推荐模型所带来产品点击率是否得到提升,产品部门可能主要关注在某个分货优化模型是否带来了产品分货效率的提升等等。但这些人们所能看到的也只是冰山的一角罢了。要生成每一个报表,每一个模型,都依赖于一个复杂的机制,而这些隐藏在水面之下的部分通常很难被人所理解。因为要产生这些价值,都需要不断对数据进行加工和处理,不仅仅包含从数据源中提取数据并加载进入目标的数据系统,更包含过程中对于数据的清洗、数据的结构化和规范化、指标逻辑的处理与计算、数据隐私与安全性的处理、分析型数据立方体的预计算等任务。而对数据进行加工处理的这些过程,通过不断的抽象,就可以将冗杂的代码,甚至是有一些重复的代码,通过一种更高效的模式表达,也就是数据流水线。数据流水线就是为了能够高效组织并运行这些不同阶段的任务。因此,在上面提到的例子中,在每一个报表或模型生成的背后,都至少有一条数据流水线在后台运行着,以支撑报表或模型最终为用户产生价值。通过数据流水线的方式处理数据,就可以更好的追踪数据的血缘关系,也通过开发一些通用的中间层数据,从而减少重复的开发工作。同时,数据流水线也是一种将复杂的问题逐步分解的过程,在每一条数据流水线中只处理单一的事情,最终以将复杂的问题简单化,也更有利于维护数据的准确性。

如何构建数据流水线

现在我们知道在一条数据流水线中既包括了从数据源中提取数据、加载处理好的数据到目标数据源中,还包括对数据的清洗、处理、加工、建模、分析等等一系列复杂的任务。可以说,每构建一条数据流水线对于数据工程师都是一次新的挑战,因为我们总会遇到不同的数据源,不同的基础架构,也为了不同的数据分析目标和模型而构建。但值得庆幸的是,有一些比较通用的模式已经比较成熟了,我们可以直接按照模式中对数据流水线的抽象来构建不同的数据流水线,其中最经典的模式莫过于 ETL (Extract、Transform、Load)了,从狭义上来讲,很多时候我们就可以直接把 ETL 看做是数据流水线,而不是数据流水线所遵循的模式了。ETL 作为最广为人知的数据流水线模式,主要是指数据从提取,经过转换,再最终加载到目标数据源的过程,以提高数据的管理效率,可以帮助数据开发人员更快速的迭代以不断满足业务发展的数据需求。下面我们就对其稍微详细的介绍一下。

Extract(提取)

“提取”步骤通常是数据流水线的起点,主要是指从各种数据源中收集数据,为数据的转换和加载做准备。由于数据源中的数据还未经过任何处理和加工,通常其数据质量都无法保障,且现在大多数的组织中,为了支持各自业务的发展,通常组织内部都拥有多个数据源,且数据源的类型也不仅仅是数据库类型,通常还会包含文件、API或事件消息类型。因此,当我们决定要接入一个或多个数据源时,建议能够从以下几个方面加以考虑。数据源的所有权为了构建一个数据产品,数据团队通常需要从多个源系统中提取数据,这些系统可能由不同的团队和组织管理,甚至还需要接入一些第三方平台,以获取外部数据提供输入。例如:一个电商平台,它的订单和商品数据可能存在关系型数据库 Postgres 中,但它们可能还需要同时接入第三方网站分析平台来追踪其网站上用户对网站的使用情况,这种情况下就至少需要接入两个不同的数据源以了解客户行为。了解源系统的所有权是很重要的。对于第三方系统来说,它们可能会对开放出来的数据有所限制,并不是所有的数据信息都是开放可供访问的。对于第三方系统数据的访问方式来说,当前大多数服务提供商都会提供 Rest API 供客户访问,但很少有服务提供商会直接提供数据库供客户连接。在这种情况下,我们就要在数据流水线中提供能够接入 Rest API 类型数据的基础设施,以支持此类数据的获取。同样,对于大多数内部的系统也是如此。通常情况下,组织内部大多数业务系统的数据库在设计之初很少考虑到之后被数据团队大规模提取的场景,比如:数据库中未设置“最新更新时间”类似字段,导致数据库难以做到增量更新等。同样还有数据库访问方式的问题,能否直接连接到它们的数据库读取数据还是要再同步数据库到备库中才能供数据团队提取数据,这些问题都是需要和对方系统的团队成员确认清楚的,有时还会需要对方系统做出一些更改,幸运的话,他们可能会很愿意合作,但通常情况下合作都会遇到一些阻碍。数据接入形式和数据结构无论是内部数据源或是外部第三方数据源,当我们在接入一个新的数据源时,都要先搞明白对方的数据接入形式以及对应数据的数据结构。常见的数据接入形式如下:

  • 业务系统背后连接的数据库,例如:MySQL、Postgres、MongoDB 等
  • Rest API
  • 消息流,例如:Apache Kafka
  • 文件类型,例如:CSV 文件、Excel 文件、网络访问日志等,这些文件通常会存储在网络存储服务(例如:FTP)或云存储服务(例如:Amazon S3)中。
  • 数据仓库或数据湖,例如:Apache Hive
  • HDFS 或 HBase数据库

除了要理清楚如何能够接入到数据源中,还要明白数据源的数据结构是什么样的。常见的数据结构类型如下:

  • 结构化数据
    • MySQL 或 Postgres 等关系型数据库
  • 半结构化数据
    • Rest API 返回的 JSON数据
    • MySQL 或 Postgres 等数据库中存储的 JSON 类型的数据
    • JSON 类型的文件
    • XML文件非结构化数据
  • 非结构化数据
    • CSV 或其他类型的文件
    • 图片、视频等媒体文件

每种不同结构的数据都有其各自的优势及挑战。对于结构化数据来说,由于它结构良好,非常易于提取和处理,但同时这种结构一般是为了业务系统的实现而结构化设计的,在提取到数据之后,通常还需要一些工作来对数据进行清理,并重新建模以适应数据项目中之后的数据分析需求。半结构化数据(例如:JSON)当前已经得到较多的使用了,但是和结构化数据不同的是,我们不能保证每个数据集中的每个数据都拥有相同的数据结构。因此,这就为在数据流水线中如何保证数据不会丢失,以及数据的完整性提出了要求。而非结构化数据其实在组织中也是非常常见的,许多企业在进行数字化转型的过程中有大量的历史数据文件保留了下来,而为了应对后续数据分析的需求,这些文件的数据输入又是十分重要的,在这种情况下,能够高效的将文件这种非结构化数据接入到数据项目中就非常重要了。例如:产品销量的预测需要大量的历史数据作为支撑,NLP 模型需要依赖于大量的文本数据来训练等。因此,当我们识别出来在一个数据项目中需要对接文件类型的数据时,我们就要考虑如何能够高效的、增量的接入文件数据,并对文件数据进行解析和处理。数据量在接入数据源时,还要考虑到数据源的数据量的大小,是 TB 级别的数据,还是 PB 级别的数据,以及数据源的更新频率是什么样的。首先,数据源中数据量的大小并不绝对意味着数据中所蕴含价值的高低,因此对于大数据集或小数据集,我们在接入数据时都应该同等对待。很多情况下我们可能需要同时接入大数据集与小数据集,然后将它们 join 起来再一同进入数据处理阶段,这时还需要谨慎地对数据进行建模,以避免大小量级的数据源之间的互相影响,因此需要针对不同量级的数据源设计不同的技术与接入方法。对于大数据集,我们通常会在首次基于 Apache Sqoop 进行全量数据提取,之后通过 SparkSQL 按照数据的最近更新时间进行增量的更新;而对于小数据集的提取可以灵活一些,由于其数据量不大,进行数据提取时所消耗的资源不多,这样我们既可以通过全量 增量的方式进行数据提取,也可以每次都进行全量的数据提取。数据质量数据源不仅数据结构与接入方式多种多样,就连数据源的质量也都千差万别。在接入数据源时,要能够有效的对数据源的质量进行识别,了解数据源在设计时的一些限制或者缺陷,并尝试在数据流水线的某个环节对其进行修复。常见的脏数据通常具备以下特征:

  • 重复记录
  • 让人模棱两可的记录
  • 被孤立的记录,即某个记录的外键值引用了不存在的主键
  • 记录不完整或丢失字段
  • 记录编码错误
  • 记录之间的格式不一致,例如:日期有些存储为 2020-12-06,有些记录却存储为 2020.12.06

除了上述这些特征,脏数据通常还可能以其他各种形式存在。当前应该还没有任何银弹能够保证数据源的数据是绝对干净、完整和一致的,因此在数据流水线中对数据进行处理时只能假设我们将会遇到最脏的数据,在数据流水线的不同阶段对数据源进行不断的清洗和验证,以不断得到最干净整洁的数据。数据源的带宽和延迟问题了解了数据源的数据结构和数据质量等问题之后,还有一点最好也要加以考虑,就是数据源的带宽和延迟问题。由于我们通常接入的是大规模的数据,在这种情况下,当我们接入的是源系统的数据库,那么可能会有延迟的问题,一旦我们大规模的读取数据,是否会对源系统造成影响,是否需要同步数据源到备库之后再供我们来提取数据呢。当接入的是 Rest API 或消息类型的数据时,那么数据的推送速率是多少呢。总之,当我们对数据源了解的越多,就越能够帮助我们设计出高效的数据处理流水线。Transform(转换)一般情况下我们会将数据仓库分为 ODS(Operation Data Store,又称操作数据层)和 DW(Data Warehouse,又称数据仓库层)两层。当我们从数据源中提取到数据之后,我们会先将数据存储在 ODS 层对数据进行清洗,将脏数据或不完整的数据进行处理或者过滤,之后再从 ODS 层向 DW 层进行转换,这时我们通常会按照不同的业务域对不同的数据进行主题域的划分,再对不同的主题域进行数据建模,之后再根据不同的业务指标或规则对数据进行计算、分析和聚合。数据清洗主要是希望解决数据源的数据质量问题,如之前我们提到的,数据源中可能会存在重复、模棱两可、被孤立的数据记录等问题存在,我们希望在数据清洗过程中将数据质量进行提高。但是,数据清洗工作不是一蹴而就的,而是一个反复不断的过程,只能不断的发现问题,再解决问题。由于我们不知道源数据可能会出现什么形式的质量问题,并且也不可能阻止暂时有质量问题的数据进入 ODS 层,但我们一定知道哪些数据是有用的,因此我们要注意在进行数据清洗的过程中一定不要将有用的数据过滤掉,且对每一个数据清洗规则或过滤规则进行验证和确认。数据转换过程主要包含数据的统一、数据粒度的确认以及指标和规则的计算工作。在数据转换过程中,需要对不同系统提取来的数据进行整合,将不同数据源中的相同类型的数据进行统一,例如:在 A 系统中的一个客户编码是 AA001,但同一个客户在 B 系统中的客户编码却是BB001,这样在转换过程中就需要和客户确认,明确其规则,尽可能对其统一从而转换为一个编码。此外,在业务系统中一般都存储的是非常明细的数据,但数据仓库中的数据是为了分析的,不需要特别明细的数据,这时在转换过程中,可以将业务系统中的数据按照数据仓库的粒度进行聚合。最后,就是在对数据进行分析的过程中,我们通常会按照设计好的指标体系或不同的业务规则,在数据转换的过程中对各个指标和规则进行计算,并将计算好的数据存储起来,这样就可以在后续的数据分析过程中使用。Load(加载)了解了数据的提取和转换,数据加载就比较容易理解了。当数据从数据源中提取出来,经过 ODS 层和 DW 层的数据清洗和转换,再将计算好的数据持久化存储到目标数据源中的过程就是数据加载。通常在实际的工作中,数据加载还需要结合所采用的数据库系统(Oracle、MySQL、Impala 等)来确定最佳的数据加载方案,从而最大化的节约 CPU、硬盘 IO 以及网络资源。

如何管理数据流水线

随着组织内数据流水线的数量以及复杂性的增加,对数据流水线中的任务的管理也变得越来越复杂,这时就需要引入任务调度平台了,通过任务调度平台对数据流水线中的多个任务进行管理和调度。假如我们有一条数据流水线,其中包含了一个用 Python 编写的数据提取的任务,以及用 Spark SQL编写的数据转换任务等,这些任务必须在全天按照特定的时间和顺序来执行。虽然在 Linux 系统中,也可以通过传统的 cron job 启动定时任务达到类似的效果,但随着任务数量的增多,以及任务之间依赖的复杂度提升,要想管理这些任务及其之间的依赖关系就是一个非常大的挑战了,但好在当下已经有很多用来进行任务的调度和编排的工具可供选择了,例如:Apache Airflow、Apache Oozie、Azkaban。数据流水线中的任务的执行通常都是有方向的,也就是说它们通常以一个或多个任务开始,并以另一个或几个任务作为结束,且后面的任务在其所依赖的任务未完成之前是不会运行的,这样就可以保证任务从开始到结束是有一个明确的有方向的执行路径的。此外,数据流水线中的任务也是非循环的,也就是说一个任务只能依赖于其前面任务的执行,无法再指向先前已经完成的任务,即无法循环执行一个已经完成的任务,否则的话,数据流水线就可能会无穷尽的运行下去了。考虑到数据流水线中“有方向”和“非循环”这两个约束,基本所有的任务调度工具都会将其工作流编排为有向无环图(DAG)的形式。如下图所示是一条简单的数据流水线。在该示例中,必须先完成任务 A,才能启动任务 B 和任务 C,且直到任务 B 和任务 C 全部完成之后,任务 D 才可以开始运行,当任务 D 完成之后,这条数据流水线才算完成。

采用 DAG 形式表示的 ETL 任务

此外,虽然我们用 DAG 来表示一组任务的执行方式,但这只是一种表示方式,并不是定义任务逻辑的实际位置,在任务调度平台中可以定义各种类型的任务。还以上图为例,其中 query_and_extract 任务可能是一个 python 脚本用来从数据源中提取数据,clean_and_transform_A 任务可能是一段 SQL 脚本用来对数据进行清洗和转换,而load_data_to_dw 可能仅仅是一段 shell 脚本来将处理过的数据加载到数据仓库中。任务编排工具只是负责任务执行顺序的编排和运行,这些任务仍然会以不同的形式在各自不同的节点上运行。

总结

本文对数据流水线及其主要的构建模式 ETL 进行了简要的介绍。源数据会在数据流水线的各个阶段中移动,从数据的采集,到数据的清洗和转换,再到数据的加载,数据的价值也在一步步的转移过程中逐步体现出来。一条数据流水线可能会帮助构建供财务使用的 OLAP 多维分析数据集,也可能会帮助数据科学家来处理数据并训练算法模型等等。总之,数据流水线就像一条数据供应链一般,它可以帮助处理、优化和丰富数据,供各种业务和应用程序使用,从而不断的为企业和组织带来价值。

0 人点赞