DDIA:MapReduce 进化之数据流引擎

2024-01-02 16:25:16 浏览数 (2)

尽管 MapReduce 在本世纪10年代最后几年中被炒的非常热,但它其实只是众多分布式系统编程模型中的一种。在面对不同的数据量、数据结构和数据处理类型时,很多其他计算模型可能更为合适。

但作为分布式系统之上的一种抽象, MapReduce 非常干净、简洁,很适合作为入门的学习对象,因此我们本章花了很多篇幅来讨论它。但需要指出,这里的简洁指的是易于理解,而非易于使用。恰恰相反,使用裸的 MapReduce 接口来完成复杂的处理任务时,实现会变得非常复杂。比如,你需要从头实现数据处理中常见的各种 join 算法。

为了降低直接使用裸 MapReduce 接口的复杂度,人们在其上封装了很多高层编程模型(Pig,Hive,Cascading 和 Crunch)。如果你知道 MapReduce 是怎么工作的,这些框架也很容易理解。且。有了这些高层框架,很多基本的批处理任务变得非常容易实现。

然而,MapReduce 执行模型存在很多问题,即使在其上多封装几层也并不能解决,反而会使得性能变得更差。一方面,MapReduce 容错性非常好,你可以使用该模型,在工作线程很不稳定的多租户系统上处理几乎任意尺度的数据(尽管非常慢)。另一方面,针对某些类型的数据处理,有些其他合适工具的速度可能要快上几个数量级。

在本章余下部分,我们会考察一些可替代 MapReduce 的批处理模型。从某种意义上来说,我们下一章要讨论的流处理模型,也可以认为是对针对批处理的一种优化。

中间状态的物化

之前讨论过,每个 MapReduce 任务都是互相独立的。每个任务和外界的唯一交互点就是分布式系统上的文件夹。如果想让某个任务的输出成为另一个任务的输入,你只需将第二个任务的输入文件夹配置为第一个任务的输出文件夹。跨 MapReduce 的调度框架必须保证:只有前序任务完全结束后,后序任务才能开始执行

如果我们需要将前序任务的输出数据进行大范围发布,那么 MapReduce 的这种结果物化机制(持久化到分布式系统中)是合理的。在这种情况下,其他任务无须关心是哪个任务生产了这些数据,而只需通过名字定位到输出数据所在文件夹即可,这符合我们工程中常用的解耦和复用的理念。

然而,在大多数情况下,我们事先就明确地知道某个任务的输出只会为同一团队的另一个任务所使用。在这种情况下,保存到分布式文件系统上的两个任务间的数据其实只是一种中间状态(intermediate state):只是一种将数据从前序任务传递到后继任务的方式。在诸如推荐系统等复杂的数据流中,通常会包含 50~100 个 MapReduce 任务,其中绝大部分任务间的数据都属于数据流中间状态。

将中间状态写入文件的过程称为物化(materialization)。我们在之前聚合:数据立方和物化视图一节中也提到过相关概念——物化视图(materialized view)。在当时上下文中,物化视图意味着将某些操作的结果写到外存中,而非每次都按需计算。

与 MapReduce 相对,在之前日志分析的例子中,我们使用 Unix 管道而非文件将不同的命令的输入输出进行耦合。Unix 管道并不会将中间结果物化,而是使用一个基于内存的小块缓存(buffer)将一个命令的输出导向另一个命令输入。

相比 Unix 管道,MapReduce 将工作流中间结果进行物化的方式有很多缺点:

  • 无谓等待。一个 MapReduce 任务只能在所有前置依赖任务完成后才能启动。然而由 Unix 管道缀连起来的命令却能够并行运行,只要一个任务开始产生输出,下一个任务就可以开始消费处理。由于机器配置和负载的不同,总会在某些机器上出现一些执行时间过长拖后腿的任务(struggler)。而 MapReduce 的这种等待机制,会让单个任务拖垮整个工作流。
  • Mapper 冗余。Mapper 职责非常简单,仅是读出前置 Reducer 产生的数据,并为之后 Reducer 的分片和排序做准备。在很多情况下,mapper 的职责其实可以并到前序任务的 Reducer 中:如果可以将 Reducer 的输出按照后继 Reducer 的要求准备好,则可将 Reducer 直接串起来,从而省去中间夹杂的 Mapper 阶段。
  • 数据冗余。在分布式文件系统中存储中间结果,意味着将数据在不同机器上冗余了几份。对于并不需要共享的中间结果来说,这种方式太过奢侈。

数据流引擎

为了解决 MapReduce 的这些问题,针对分布式系统中的批处理负载,人们开发了很多新的执行引擎。其中最知名的是 Spark、Tez 和 Flink。这几个处理引擎的设计有诸多不同之处,但有一点是相同的:他们将整个数据流看做一个任务,而非将其拆分成几个相对独立的子任务

由于这些引擎会显式地考虑跨越多个阶段的全局数据流,因此也常被称为数据流引擎(dataflow engines)。和 MapReduce 一样,这些引擎也会对每个数据记录在单个线程中,重复调用用户的定制函数(包裹用户逻辑)。并且会将输入数据集进行切片(partition),并行地执行(数据并行),然后将一个函数的输出通过网络传递给下一个函数作为输入。

和 MapReduce 不同的是,这些函数可以进行更灵活地组织,而不需要严格遵循 map 或者 reduce 格式。我们成这些函数为算子(operators),且 dataflow 引擎会提供多种选择,以将一个算子的数据输出导入到下一个算子(类似数据流接线方式):

  • repartition sort(sort merge join):一种方法是进行 repartition 并按 key 对 record 进行排序,就像 MapReduce 的 shuffle 阶段一样。该功能能够提供像 MapReduce 一样的 sort-merge join 和分区方式。
  • only repartition(partition hash join):另一种可能是接受多个输入,并且用同样的方式进行分区(partitioning),但是会跳过排序阶段。这对于分区哈希 join 很有用,因为该算子只关心记录的分区,但其顺序并不重要,因为总会过哈希表重新组织。
  • broadcast(broadcast hash join):对于广播哈希 join,一个算子的输出会被发送到多个待 join 分区算子。

这种风格的处理引擎思想来自于 Dryad 和 Nephele 等系统,相比 MapReduce 模型,有如下优点:

  • 按需 shuffle:对于排序等高代价负载,只有在需要的时候才会执行,而不是总强制发生在 map 和 reduce 之间。
  • 省掉无用 Mapper:由于 map 本身并没有进行 repartition,因此可以将其合并到前一个算子中的 reduceer 阶段。
  • 数据传输优化:由于所有 join 和依赖等数据拓扑是显式声明的,调度器可以事先知道哪些数据在哪里被需要。因此可以尽可能地做局部性优化(locality optimization)。例如,可以尽量将消费某分区数据的任务放到生产该数据的机器上执行,从而通过共享内存而非网络来共享数据。
  • 中间结果只存一份:通常来说,只需要将算子的中间结果,在内存中或者本地硬盘中放一份就够了,而不用写到分布式文件系统中。在 MapReduce 中 Mapper 的输出其实也是用了此优化,只不过 dataflow 引擎将该思想扩展到了所有中间状态的存储中。
  • 算子执行流水化:大部分算子只要有输入了就可以执行,而不用等到前置任务都完成了才能够执行。
  • 进程复用:同一个工作流中,前面算子所使用的 JVM 进程池可以为之后算子所复用,而不用像 MapReduce 一样每个任务都要开一个新的 JVM 进程。

你可以使用数据流引擎实现和 MapReduce 数据流一样的计算逻辑,并且由于上面的优化,执行速度通常更快。由于算子是 map 和 reduce 的泛化,同样处理逻辑的代码,仅简单调整下配置,便可以无缝的跑在两种数据流引擎上:

  1. 基于 MapReduce 的数据流引擎(如 Pig,Hive 或者 Cascading)
  2. 新型的的数据流引擎(如 Tez 或者 Spark)

Tez 是一个依赖 YARN 的 shuffle 服务在节点间进行数据拷贝的轻量级库;而 Spark 和 Flink 是各有其自身一套完整的网络通信层、调度模块和用户 API 的重量级框架。我们稍后将会讨论这些高层接口(high-level API)。

容错

将所有中间状态持久化到分布式文件系统中的一个好处是——持久性(durable),这会使得 MapReduce 的容错方式变得非常简单:如果某个任务挂了,仅需要在其他机器上重新启动,并从文件系统中读取相同的输入即可。

Spark、Flink 和 Tez 都会避免将中间状态写到 HDFS 中,因此他们采用了完全不同的容错方式:如果某个机器上的中间结果丢了,就回溯工作流的算子依赖(DAG 依赖),找到最近可用的数据按照工作流重新计算(最差的情况会一直找到输入数据,而输入数据通常存在于 HDFS 上)。

为了能够通过重新计算来容错,框架必须跟踪每一部分数据的计算轨迹(DGA 依赖,或者说数据谱系,data lineage)——涉及哪些输入分片、应用了哪些算子。Spark 使用弹性分区数据集(RDD)抽象来追踪数据的祖先;Flink 使用了快照来记录所有算子状态,以从最近的检查点(checkpoint)重启运行出错的算子。

当通过重算来容错时,最重要的是要明确计算过程(即算子)是否为确定性的(deterministic):即,给定同样的输入数据,多次运行同一算子总会产生同样的输出吗?当算子的数据已经发到下游后出错时,该问题变的非常重要。如果算子重新运行时产生的数据和之前不一致,则下游算子很难在新老数据间进行冲突处理。对于非确定性算子的容错方案,通常是将下游算子也都清空状态一并重启。

为了规避这种级联重启,用户需要确保每个算子逻辑是确定的。但需要注意的是,计算过程中有很多情况会引入不确定性:

  1. 很多编程语言不保证哈希表遍历顺序的稳定
  2. 很多概率和统计算法会显式地依赖随机数
  3. 所有使用系统时钟或者外部数据源的算子也是非确定的

这些导致不确定性的原因需要从算子逻辑中移除,以保证能够通过重算进行容错。比如,可以通过使用固定种子的伪随机算法来消除随机数的不确定性。

当然,通过重算数据进行出错恢复并非总是正确选择。在中间数据相比原始输出要小很多、计算过程非常耗 CPU 等情况下,相比重算,将中间数据物化到文件中代价可能会更低。

物化的一些讨论

回到 Unix 哲学上,MapReduce 可类比为将每个命令的输出都写入临时文件中,而现代数据流引擎则更像 Unix 管道。Flink 更是直接基于流水线的思想构建的:即,将上游算子的输出增量地送给下游算子,下游算子一经收到数据,便可开始着手处理

但有些算子,比如排序,不可避免的需要等待所有输出,才可以开始处理并产生输出。这是因为后来的数据,可能会具有较小的 key,因此需要被放到输出流前面。所有需要排序的算子都需要等待输入数据到齐,但其他大部分算子都是可以流水化执行的。

当工作流任务完成后,其输出通常要进行持久化,以让用户能够引用并使用——最常见的,就是写回分布式文件系统。因此,当使用数据流引擎时,数据流的输入和最终输出通常都会物化在 HDFS 上。和 MapReduce 一样,数据流任务的输入也是不可变的,输出不会在原地更新,而会写入其他地方。相比 MapReduce,这些数据流引擎的提升就是避免将所有子任务的中间状态也写入分布式文件系统中。

0 人点赞