Hello Spark! | Spark,从入门到精通

2019-04-19 16:03:11 浏览数 (1)

欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,欢迎大家持续关注:)

/ 什么是 Spark? /

Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架,是专为大规模数据处理而设计的快速通用的大数据处理引擎及轻量级的大数据处理统一平台。

当我们在谈 Spark 的时候可能是指一个 Spark 应用程序,替代 MapReduce 运行在 Yarn上,存储在 HDFS 上的一个大数据批处理程序;也可能是指使用包含 Spark sql、Spark streaming 等子项目;甚至 Tachyon、Mesos 等大数据处理的统一平台,或者称为 Spark 生态。

图 1

发展至今,Spark 已不仅仅是 MapReduce 的替换方案,它已经发出成了一个包含众多子项目的 Spark 生态。如图 1 所示,Spark 生态可分为四层:

  • 数据存储层,以 HDFS 、Tachyon 为代表的一些分布式文件存储系统或各种数据库;
  • 资源管理层,Yarn、Mesos 等资源管理器;
  • 数据处理引擎;
  • 应用层,以 Spark 为基础产生的众多项目;

Spark SQL 提供 HiveQL(通过 Apache Hive 的 SQL 变体 Hive 查询语言)与Spark 进行交互的 API。每个数据库表被当做一个 RDD,Spark SQL 查询被转换为 Spark 操作。Spark Streaming 对实时数据流进行处理和控制,它允许程序能够像普通 RDD 一样处理实时数据。

接下来的系列文章将会详细介绍 Spark 生态中的其他模块与各个子项目,接下来将通过与 MapReduce 的对比来介绍数据处理引擎Spark的特点及其原理。

/ Spark 的特点 /

根据谷歌和百度的搜索结果显示,Spark 的搜索趋势已与 Hadoop 持平甚至赶超,标志着 Spark 已经成为计算部分的事实标准,也就是说大数据技术绕不开 Spark 了。

在大数据的存储、计算、资源调度中,Spark 主要解决计算问题,即主要替代 Mapreduce 的功能,底层存储和资源调度很多公司仍然选择使用 HDFS、Yarn 来承载。为什么众多企业在 Hadoop 生态框架里都选择用 Spark 作为处理引擎?让我们仔细看看它有什么特点。

1.速度快。Spark 基于内存进行计算( 也有部分计算基于磁盘) ;

2.容易上手开发。 Spark 基于 RDD 的计算模型, 比 Hadoop 基于 Map-Reduce 的计算模型要更易于理解、易于上手开发实现各种复杂功能,如二次排序、 topN 等复杂操作时更加便捷。;

3.超强的通用性。 Spark 提供了 Spark RDD、 Spark SQL、 Spark Streaming、 Spark MLlib、 Spark GraphX 等技术组件, 可以一站式地完成大数据领域的离线批处理、 交互式查询、 流式计算、 机器学习、图计算等常见的任务;

4.集成 Hadoop。 Spark 可以完美集成 Hadoop。 Hadoop 的 HDFS、 Hive、HBase 负责存储, Yarn 负责资源调度, Spark 负责大数据计算是比较流行的大数据解决方案。

4.极高的活跃度。 Spark 目前是 Apache 基金会的顶级项目, 全世界有大量的优秀工程师是 Spark 的 committer, 并且世界上很多顶级的 IT 公司都在大规模地使用Spark。

看看同样是负责计算问题的 MapReduce,如图 2 所示是 MapReduce 计算 WordCount。

图 2

MapReduce 解决了大数据处理中多种场景问题,但是它的局限性也很明显:

  • MapReduce 只提供 Map 和 Reduce 两个操作,欠缺表达力,复杂的计算需要大量的 Job 才能完成。
  • 中间结果也放在 HDFS 文件系统中,迭代计算的话效率很低。
  • 适用 Batch 数据处理,对于交互式数据处理而言实时数据处理的支持不够。
  • 需要写很多底层代码,难上手。如上所示的 WordCount 程序至少需要三个 java 类:Map 类、Reduce 类、Job 类,这里不详细列出。

许多项目针对它的局限性进行了改进(如 Tez 等),接着看图 3 中 Spark 的具体操作流程:

图 3

首先我们可以看到 Spark 提供了丰富的算子(textFile、FlatMap、Map、ReduceByKey 等),在计算的中间结果也没有存储到 HDFS 的操作。然后,对于上图的 WordCount 程序,Spark 只需要如下一行代码:

代码语言:javascript复制
sc.textFile(s"${path}").flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_   _).saveAsTextFile("hdfs://xxx")

图 4 列举了 Spark 和 MapReduce 作为数据处理引擎的一些对比。值得一提的是关于数据处理的规模,Spark 在诞生后,社区里有很多质疑 Spark 处理数据规模的声音,随后官方给出了对于一 PB 数据排序的实验,并且处理时间打破了当时的记录。但我们也不能忽视,在实际生产过程中,我们面对的不是一个程序或者一个任务,在同一个集群,如果有很多的 Spark 程序没有得到很好的优化,会浪费大量的内存,从而让一些程序需要排队等待,在这种情况下,Spark 处理的数据规模可能会小于 MapReduce 处理的数据规模。(之后的系列文章也会介绍关于 Spark 内存调优的相关内容)

图 4

关于最后一点容错性,MapReduce 中每一步操作的结果都会被存入磁盘,在计算出现错误时可以很好的从磁盘进行恢复;Spark 则需要根据 RDD 中的信息进行数据的重新计算,会耗费一定的资源。Spark 提供两种方式进行故障恢复:通过数据的血缘关系再执行一遍前面的处理;Checkpoint 将数据集存储到持久存储中。理论上如果选择在每个完成的小步骤上加 CheckPoint,Spark 的容错性能可以和 MR 达到一样的稳健。当然,很少有人会这么做。

我们通过 Spark 与 MapReduce 对比。看到了 Spark 对 MapReduce 局限性的改进,还有它快速、通用的特点。接下来将通过 Spark 的设计思想和执行过程来说明它为什么可以做到这些特点。

/ Spark 的基本原理 /

图 5

如图 5 所示,在 Spark 集群中由一个节点作为 driver 端创建 SparkContext。Spark 应用程序的入口负责调度各个运算资源,协调各个 Worker Node上 的 Executor。根据用户输入的参数会产生若干个 workr,workr 节点运行若干个 executor,一个 executor 是一个进程,运行各自的 task,每个 task 执行相同的代码段处理不同的数据。

图 6

如图 6 所示是 Spark 的具体执行过程,client 提交作业,通过反射 invoke 执行用户代码 main 函数,之后开始启动 CoarseGrainedExecutorBackend 和初始化 SparkContext。

*SparkContext 初始化包括初始化监控页面 SparkUI、执行环境 SparkEnv、安全管理器 SecurityManager、stage 划分及调度器 DAGScheduler、task 作业调度器 TaskSchedulerImpl 、与 Executor 通信的调度端 CoarseGrainedSchedulerBackend。

DAG Scheduler 将作业划分后,依次提交 stage 对应的 taskSet 给 TaskSchedulerImpl,TaskSchedulerImpl 会 submit taskset 给 driver 端的 CoarseGrainedSchedulerBackend 后端,接着 CoarseGrainedSchedulerBackend 会一个一个的 LaunchTask。在远端的 CoarseGrainedExecutorBackend 接收到 task 提交 event 后,会调用 Executor 执行 task,最终 task 是在 TaskRunner 的 run 方法内运行。

那么在过程 4 中 DAG Scheduler 如何划分作业?如果产生 stage、task 等给 Executor 执行呢?接着我们看作业划分执行的示例。

图 7

图 7 描述了一个 Spark 程序,从 HDFS 上读取数据产生 RDD-A 然后 flatmap 操作到 RDD-B,读取另一部分数据的到RDD-C,然后 map 操作的到 RDD-D,RDD-D 聚合操作 RDD-E,RDD-B 和 RDD-E 加入后得到 RDD-F,然后再将结果存储到 HDFS 上。

Spark 根据 RDD 之间的不同点依赖关系切分成不同的阶段(Stage),途中有四个阶段,其中 Stage0 和 Stage2 由于没有依赖关系是可以并行执行的。但Stage2需要等待Stage1执行完毕。RDD-D 到 RDD- F 的聚合操作以及 Stage0 和 Stage2 得到的 RDD- B 和 RDD-E join在一起的到 RDD-F,这个过程会产生 shaffle。没有依赖关系的Stage是可以并行执行的,但是对于job,Spark是串行执行的,如果想要并行执行Job,可以在Spark程序中进行多线程编程。

在这个 DAG 图中,Spark 能够充分了解数据之间的血缘关系,这样某些任务失败后可以根据血缘关系重新执行计算获取失败了的 RDD。

*宽依赖和窄依赖

窄依赖是指父RDD的每个分区只被子RDD的一个分区所使用,子RDD分区通常对应常数个父RDD分区;

宽依赖是指父RDD的每个分区都可能被多个子RDD分区所使用,子RDD分区通常对应所有的父RDD分区。这个概念在下面的例子中会涉及。

Spark 提供了丰富的算子,操作也更加通用。那么这种划分作业、执行并行计算的方案如何使 Spark 产生基于内存计算的快速效果呢?都说 Spark 擅长迭代计算,那么我们通过一个经典的迭代问题 PageRank 算法来与 MapReduce 比较一下。

图 8,via http://www.jos.org.cn/jos/ch/reader/create_pdf.aspx?file_no=5557&journal_id=jos

图 8 是 MapReduce 进行 pagerank 算法的一次迭代过程,需要注意的是灰色的部分都是需要存储到磁盘的数据。

图 9 ,via http://www.jos.org.cn/jos/ch/reader/create_pdf.aspx?file_no=5557&journal_id=jos

图 9 所示是 Spark 执行 pageRank 算法的一次迭代过程,相较于 MapReduce 做了很多改进。首先在内存足够的情况下 Spark 允许用户将常用的数据缓存到内存中,加快了系统的运行速度;其次 Spark 对数据之间的依赖关系有了明确的划分,根据宽依赖与窄依赖关系进行任务的调度,可以实现管道化操作,使系统灵活性得以提高。

图 10:MapReduce 进行 pagerank 算法的二次迭代,via http://www.jos.org.cn/jos/ch/reader/create_pdf.aspx?file_no=5557&journal_id=jos

图 11:Spark 进行 pagerank 算法的二次迭代,via http://www.jos.org.cn/jos/ch/reader/create_pdf.aspx?file_no=5557&journal_id=jos

如图所示 Spark 可以将具有窄依赖关系的 RDD 分区分配到一个任务中,进行管道化操作,任务内部数据无需通过网络传输且任务之间互不干扰,因此 Spark 两次迭代只有三次 shuffle。

在一次迭代过程中,MapReduce 与 Spark 在性能上可能并没有很大的差别,但是随着迭代次数的增加,两者的差距逐渐显现出来。Spark 根据依赖关系采用的任务调度策略使得 shuffle 次数相较于 MapReduce 有了显著降低,因此 Spark 的设计十分适合进行迭代运算。

回顾本篇文章,我们依次从概念、特点及原理三个角度初步介绍了 Spark,下一篇我们将具体介绍 Spark on Yarn 的运作流程与机制,敬请期待。

附:Spark 相关术语表

0 人点赞