DDIA:数仓和大数据的双向奔赴

2024-01-23 15:54:14 浏览数 (1)

在 MapReduce 流行这些年之后,针对大数据集的分布式批处理执行引擎已经逐渐成熟。到现在(2017年)已经有比较成熟的基础设施可以在上千台机器上处理 PB 量级的数据。因此,针对这个量级的基本数据处理问题可以认为已经被解决,大家的注意力开始转到其他问题上:

  1. 完善编程模型
  2. 提升处理性能
  3. 扩大处理领域

之前我们讨论过,由于 MapReduce 提供的编程接口实在太过难用,像 Hive, Pig,Cascading 和 Crunch 等处理 API 和框架逐渐流行。Apache Tez 更进一步,可以让原来的代码不做过多改动就可以迁移。Spark 和 Flink 也各自有其高层的数据流 API,基本借鉴自 FlumeJava。

这些数据流工具基本都是用关系型的算子来表达计算过程:

  1. 基于某些字段对数据集进行连接的 Join 算子
  2. 基于关键字对元组进行聚类的 Group 算子
  3. 基于条件对元组进行过滤的 Filter 算子
  4. 对元素进行聚合和统计的 Aggregate 算子

等等。这些算子内部实现时,会用到我们本章之前提到的各种 join 和 group 算法。

除了能够显著降低使用方的代码量外,这些高层的框架通常还支持交互式的使用。因此,你可以在 shell 中增量式的构建分析代码,且能够方便的多次跑以查看运行结果。当我们拿到一个新的数据集,需要做实验探索该如何对其进行分析时,这种交互式的方式非常方便。这其实也是我们之前讨论过的 Unix 编程哲学的一个体现。

这些高层的 API 不仅让用户可以更高效的使用体验,还能够提升任务在物理层面的执行效率。

向声明式方向靠拢

相比直接实现代码进行 Join,使用关系型的 Join 算子给了处理框架分析数据集特点、选择最高效 Join 算的优化空间。Hive,Spark 和 Flink 都有基于代价的优化器,可以对执行路径进行优化。甚至,可以交换 Join 的顺序,来最小化中间数据集的物化。

不同 Join 算法的选择对批处理任务的性能影响极大,但我们最好避免将选择的心智负担推给用户,而可以自动地根据情况进行优化。使用声明式风格的接口使这种自动优化称为可能:用户侧仅需要指定哪些数据集需要 Join,而查询优化器会根据数据特点动态的决定其最优 Join 方式。我们在数据查询语言一节中讨论过这种思想。

但从另一方面来说,MapReduce 和其后继的数据流框架和 SQL 这种完全的声明式语言又不一样。MapReduce 是基于回调函数来构建的:对于任意的一条或一批数据,用户可以自定义处理函数(Mapper 或者 Reducer),调用任何库代码、决定其输出格式。这种方式的优点是,你可以复用很多现成的库来减少开发工作量,比如 Parsing、自然语言分析、图像分析和一些数理统计算法方面的库。

在很长一段时间内,能够自由地跑任意的代码是批处理系统和 MPP 数据库的一个重要区分点。尽管数据库也支持 UDF(user defined function),但使用起来较为复杂,且不能很好的和编程语言的包管理工具(比如 Maven 之于 Java,npm 之于 JavaScript,Rubygems 之于 Ruby)相整合。

然而,在 Join 之外,更进一步地引入声明式功能也对数据流工具有诸多好处。例如,一个过滤函数只有很简单的过滤条件(过滤行)、或只是从原数据集中选择几列(过滤列),则针对每条数据都调用一遍回调函数会有很大的额外性能损耗。如果这些简单的过滤和投影能够用声明式的方式表达,则优化器可以充分利用面向列的存储格式(参见列存),只读取需要的列。Hive,Spark DataFrames 和 Impala 还使用了列式执行引擎(vectorized execution):

以一种 CPU 缓存友好的方式,紧凑地进行迭代(每次取一个 Cache Line,使用 SIMD 指令进行运算),以减少函数调用次数。

Spark 使用 JVM 字节码、Impala 使用 LLVM 来通过生成代码的方式优化这些 Join 内层循环。

通过在高层 API 中注入声明式的特性、在运行时使用优化器动态地优化,批处理框架长得越来越像 MPP 数据库(也获得了类似性能)。但同时,仍然保持原来允许运行任意库代码、读取任意格式数据的扩展性,让这些框架仍然可以保持原有的灵活性。

不同领域的特化

保留运行任意代码的自由度很有必要,但对于很多非常通用、反复出现的处理模式,我们有必要提供系统实现以方便用户复用。传统上,MPP 数据库通常充当商业智能(BI)分析和商业汇报领域的生态位,但这个方向只是批处理众多应用方向的一个。

另外一个越来越重要的方向是数值统计算法,其在推荐和分类的机器学习算法中常常用到。可复用的实现逐渐多了起来:例如 Mahout 在 MapReduce、Spark 和 Flink 之上实现了很多机器学习算法;MADlib 也在 MPP 数据库之上实现了类似的功能模块。

其他有用的算法还有—— k 最近邻算法k-nearest neighbors)——一种在多维空间中搜索与给定数据条目相似度最高的数据算法,是一种近似性搜索算法。近似搜索对于基因组分析算法也很重要,因为在基因分析中,常需要找不同但类似的基因片段。近年来较火的向量数据库也是主要基于该算法。

批处理引擎被越来越多的用到不同领域算法的分布式执行上。随着批处理系统越来越多支持内置函数高层声明式算子、MPP 数据库变的越来越可编程灵活度高,他们开始长的越来越像——说到底,本质上他们都是用于存储和处理数据的系统。

小结

在本章,我们探讨了批处理的话题。我们从 Unix 的命令行工具 awk、grep 和 sort 开始,探讨其背后的思想被如何应用到 MapReduce 框架和更近的数据流框架中。这些核心设计原则包括:

  1. 输入数据不可变
  2. 一个组件的输出可以喂给另一个组件成为输入
  3. 通过组合“解决好一件事的小工具”来解决复杂问题

在 Unix 世界中,让所有命令行具有可组合性的统一抽象是——文件和管道,在 MapReduce 中,这个抽象是分布式文件系统。之后我们注意到,数据流工具通过增加各自的“类管道”的数据传输方式,避免了将中间结果物化到分布式文件系统中的额外损耗,但最外侧的输入和输出仍然是在 HDFS 上。

分布式处理框架最主要解决的两个问题是:

  • 分片 在 MapReduce 中,会根据输入数据的文件块(file chunk)的数量来调度 mappers。mappers 的输出会在二次分片、排序、合并(我们通常称之为 shuffle)到用户指定数量的 Reducer 中。该过程是为了将所有相关的数据(如具有相同 key)集结到一块。 后 MapReduce 时代的数据流工具会尽量避免不必要的排序(因为代价太高了),但他们仍然使用了和 MapReduce 类似的分区方式。
  • 容错 MapReduce 通过频繁的(每次 MapReduce 后)刷盘,从而可以避免重启整个任务,而只重新运行相关子任务就可以从其故障中快速恢复过来。但在错误频率很低的情况下,这种频繁刷盘做法代价很高。数据流工具通过尽可能的减少中间状态的刷盘(当然,shuffle 之后还是要刷的),并将其尽可能的保存在内存中,但这意味着一旦出现故障就要从头重算。算子的确定性可以减少重算的数据范围(确定性能保证只需要算失败分区,并且结果和其他分区仍然一致)。

接下来我们讨论了几种基于 MapReduce 的 Join 算法,这些算法也常被用在各种数据流工具和 MPP 数据库里。他们很好的说明了基于数据分区的算法的工作原理:

  • Sort-merge joins 分桶排序。将多个待 join 的输入数据使用一个 MapReduce 处理,在 Mapper 中提取待 join key ,然后通过再分区、排序和合并,会将具有相同 join key 的 records 送到同一个 Reducer 中进行 join。然后 Reducer 函数会将 join 结果进行输出。
  • Broadcast hash joins 小表广播。如果 join 中的一个表数据量很小,可以完全加载进内存的哈希表里,则不用对其进行分片。我们可以将大表进行分片,分发给各个 mapper,每个 Mapper 将小表加载到内存里,然后逐个遍历大表每个 record,提取相应 join key,再与小表中的记录值进行 Join。
  • Partitioned hash joins 分桶哈希。如果两个待 join 输入使用相同的方式进行分片(相同的 key、相同的哈希函数和分区数),则广播哈希算法可以在每个分区内单独应用。

分布式批处理引擎使用了受限的编程模型:回调函数需要是无状态的,且除了输出之外没有其他的副作用。在此设定下,框架可以向应用层屏蔽很多分布式系统的实现细节:当遇到宕机或者网络问题时,子任务可以安全的进行重试;失败任务的输出可以自由抛弃;如果有多个冗余计算过程都成功了,则只有其中一个可以作为输出对后面可见。

由于框架的存在,用户侧的批处理代码无需关心容错机制的实现细节:即使在物理上有大量错误重试的情况下,框架可以保证在逻辑上最终的输出和没有任何故障发生是一致的。这种可靠性语义保证(reliable semantics)通常远强于我们在在线服务中常见到的、将用户的请求写到数据库中的容错性。

批处理任务的基本特点是——读取输入,进行处理,产生输出的过程中,不会修改原数据。换句话说,输出是输入的衍生数据。其中一个重要特点是,输入数据是有界的(bounded):输入的大小是固定的、事先确定的(比如输入是包含一组日志的数据或者一个快照点的数据)。唯其有界,处理任务才能知道什么时候输入读取结束了、什么时候计算完成了。

但在下一章中,我们将会转到流处理(stream processing)上,其中,输入是无界的(unbounded)——你的任务面对的是不知道何时结束的无限数据流。在这种情况下,任何时刻都有可能有新的数据流入,任务会永不结束。我们之后可以看到,虽然批处理和流处理在某些方面有相似之处,但对于输入的无界假设,会在构建系统时对我们的设计产生诸多影响。

参考资料

[1]

DDIA 读书分享会: https://ddia.qtmuniao.com/

DDIA 学习会

0 人点赞