Spark内核原理
Spark的核心是根据RDD来实现的,Spark Scheduler则为Spark核心实现的重要一环,其作用就是任务调度。Spark的任务调度就是如何组织任务去处理RDD中每个分区的数据,根据RDD的依赖关系构建DAG,基于DAG划分Stage,将每个Stage中的任务发到指定节点运行。基于Spark的任务调度原理,可以合理规划资源利用,做到尽可能用最少的资源高效地完成任务计算。
以词频统计WordCount程序为例,Job执行是DAG图:
RDD 依赖
RDD 的容错机制是通过将 RDD 间转移操作构建成有向无环图来实现的。从抽象的角度看,RDD 间存在着血统继承关系,其本质上是 RDD之间的依赖(Dependency)关系。
从图的角度看,RDD 为节点,在一次转换操作中,创建得到的新 RDD 称为子 RDD,同时会产生新的边,即依赖关系,子 RDD 依赖向上依赖的 RDD 便是父 RDD,可能会存在多个父 RDD。可以将这种依赖关系进一步分为两类,分别是窄依赖(NarrowDependency)和 Shuffle 依赖(Shuffle Dependency 在部分文献中也被称为 Wide Dependency,即宽依赖)。
窄依赖(Narrow Dependency)
窄依赖中:即父 RDD 与子 RDD 间的分区是一对一的。换句话说父RDD中,一个
分区内的数据是不能被分割的,只能由子RDD中的一个分区整个利用。
上图中 P代表 RDD中的每个分区(Partition),我们看到,RDD 中每个分区内的数据在上面的几种转移操作之后被一个分区所使用,即其依赖的父分区只有一个。比如图中的 map、union 和 join 操作,都是窄依赖的。注意,join 操作比较特殊,可能同时存在宽、窄依赖。
Shuffle 依赖(宽依赖 Wide Dependency)
Shuffle 有“洗牌、搅乱”的意思,这里所谓的 Shuffle 依赖也会打乱原 RDD 结构的操作。具体来说,父 RDD 中的分区可能会被多个子 RDD 分区使用。因为父 RDD 中一个分区内的数据会被分割并发送给子 RDD 的所有分区,因此 Shuffle 依赖也意味着父 RDD与子 RDD 之间存在着 Shuffle 过程。
上图中 P 代表 RDD 中的多个分区,我们会发现对于 Shuffle 类操作而言,结果 RDD 中的每个分区可能会依赖多个父 RDD 中的分区。需要说明的是,依赖关系是 RDD 到 RDD 之间的一种映射关系,是两个 RDD 之间的依赖,如果在一次操作中涉及多个父 RDD,也有可能同时包含窄依赖和 Shuffle 依赖。
如何区分宽窄依赖
区分RDD之间的依赖为宽依赖还是窄依赖,主要在于父RDD分区数据与子RDD分区数据关系:
- 窄依赖:父RDD的一个分区只会被子RDD的一个分区依赖;
- 宽依赖:父RDD的一个分区会被子RDD的多个分区依赖,涉及Shuffle;
为什么要设计宽窄依赖??
1)、对于窄依赖来说
Spark可以并行计算
如果有一个分区数据丢失,只需要从父RDD的对应个分区重新计算即可,不需要重新计算整个任务,提高容错。
2)、对应宽依赖来说
划分Stage的依据,产生Shuffle
DAG和Stage
在图论中,如果一个有向图无法从任意顶点出发经过若干条边回到该点,则这个图是一个有向无环图(DAG图)。而在Spark中,由于计算过程很多时候会有先后顺序,受制于某些任务必须比另一些任务较早执行的限制,必须对任务进行排队,形成一个队列的任务集合,这个队列的任务集合就是DAG图,每一个定点就是一个任务,每一条边代表一种限制约束(Spark中的依赖关系)。
Spark中DAG生成过程的重点是对Stage的划分,其划分的依据是RDD的依赖关系,对于不同的依赖关系,高层调度器会进行不同的处理。
- 对于窄依赖,RDD之间的数据不需要进行Shuffle,多个数据处理可以在同一台机器的内存中完成,所以窄依赖在Spark中被划分为同一个Stage;
- 对于宽依赖,由于Shuffle的存在,必须等到父RDD的Shuffle处理完成后,才能开始接下来的计算,所以会在此处进行Stage的切分。
在Spark中,DAG生成的流程关键在于回溯,在程序提交后,高层调度器将所有的RDD看成是一个Stage,然后对此Stage进行从后往前的回溯,遇到Shuffle就断开,遇到窄依赖,则归并到同一个Stage。等到所有的步骤回溯完成,便生成一个DAG图。
把DAG划分成互相依赖的多个Stage,划分依据是RDD之间的宽依赖,Stage是由一组并行的Task组成。Stage切割规则:从后往前,遇到宽依赖就切割Stage。
Stage计算模式:pipeline管道计算模式,pipeline只是一种计算思想、模式,来一条数据然后计算一条数据,把所有的逻辑走完,然后落地。准确的说:一个task处理一串分区的数据,整个计算逻辑全部走完。
词汇表
http://spark.apache.org/docs/2.4.5/cluster-overview.html
The following table summarizes terms you’ll see used to refer to cluster concepts:
Term | Meaning |
---|---|
Application | User program built on Spark. Consists of a driver program and executors on the cluster. |
Application jar | A jar containing the user's Spark application. In some cases users will want to create an "uber jar" containing their application along with its dependencies. The user's jar should never include Hadoop or Spark libraries, however, these will be added at runtime. |
Driver program | The process running the main() function of the application and creating the SparkContext |
Cluster manager | An external service for acquiring resources on the cluster (e.g. standalone manager, Mesos, YARN) |
Deploy mode | Distinguishes where the driver process runs. In "cluster" mode, the framework launches the driver inside of the cluster. In "client" mode, the submitter launches the driver outside of the cluster. |
Worker node | Any node that can run application code in the cluster |
Executor | A process launched for an application on a worker node, that runs tasks and keeps data in memory or disk storage across them. Each application has its own executors. |
Task | A unit of work that will be sent to one executor |
Job | A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g. save, collect); you'll see this term used in the driver's logs. |
Stage | Each job gets divided into smaller sets of tasks called stages that depend on each other (similar to the map and reduce stages in MapReduce); you'll see this term used in the driver's logs. |
Spark内存迭代
我们说Spark的性能对比MR是划时代的。主要原因是基于内存的迭代,具体是如何迭代的呢?
我们先明白一个概念:DAG
前面说过,DAG是一个有向无环图,而有向无环图中的每一个节点,就是一个个的算子。
首先,MR的计算模型中,只有2个算子,一个Map 一个Reduce。
仅有的两个算子,就导致了许多复杂的任务很难用这两个算子计算出来。
很多复杂任务需要进行MR任务的迭代计算,也就是一个MR结束后下一个MR紧接着启动。
如果将这一整个复杂任务描述为DAG的话,类似于:
反之看一下算子丰富的Spark任务,如果这个复杂任务用Spark开发,其DAG可能是类似这样:
所以,我们说Spark比MR效率高主要就是2个原因:
- MR计算模型中,Map算子和Reduce算子进行数据传输需要通过硬盘进行
- MR计算模型的算子匮乏,只有Map和Reduce两个算子,导致复杂任务需要串接多个MR任务,中间的传输都经过HDFS硬盘
也就是M和R之间走硬盘,多个MR之间也走硬盘,同时涉及到多次的MapReduce任务的启动和释放,对效率很影响。
反观Spark(Flink),由于算子丰富,任务基本上都能一个Spark任务搞定,这就避免了多个Spark任务串联。同时,在Spark内部,多个算子之间的数据沟通是通过内存或者网络进行直接传输的,避免了低效的硬盘传输。
为什么可以内存传输或者网络直传呢?
Spark的最小执行单位是Task也就是单个线程。Task运行在Executor内。一个节点可以有多个Executor,一个集群可以有多个节点。
一个算子可以被并行执行,每个并行就是一个线程(一个task)
如果算子A的所有Task在Executor1、3中执行,算子B的所有Task运行在Executor2、4中执行。
算子AB的关系是 先计算A然后基于A的结果计算B
那么执行可能为:
如果Executor1和3在同一个节点之上,那么内存传输即可
如果Executor3和5在不同节点上,那么数据走网络传输即可
Spark会尽量安排DAG中的数据流转在内存中流转。尽量避免网络。
实在不行走网络,也比MR的硬盘快了太多了。
Spark基本概念
官方文档:http://spark.apache.org/docs/2.4.5/cluster-overview.html#glossary
Spark Application运行时,涵盖很多概念,主要如下表格:
1.Application:应用,就是程序员编写的Spark代码,如WordCount代码
2.Driver:驱动,就是用来执行main方法的JVM进程,里面会执行一些Drive端的代码,如创建SparkContext,设置应用名,设置日志级别...
3.SparkContext:Spark运行时的上下文环境,用来和ClusterManager进行通信的,并进行资源的申请、任务的分配和监控等
4.ClusterManager:集群管理器,对于Standalone模式,就是Master,对于Yarn模式就是ResourceManager/ApplicationMaster,在集群上做统一的资源管理的进程
5.Worker:工作节点,是拥有CPU/内存的机器,是真正干活的节点
6.Executor:运行在Worker中的JVM进程!
7.RDD:弹性分布式数据集
8.DAG:有向无环图,就是根据Action形成的RDD的执行流程图---静态的图
9.Job:作业,按照DAG进行执行就形成了Job---按照图动态的执行
10.Stage:DAG中,根据shuffle依赖划分出来的一个个的执行阶段!
11.Task:一个分区上的一系列操作(pipline上的一系列操作)就是一个Task,同一个Stage中的多个Task可以并行执行!(每一个Task由线程执行),所以也可以这样说:Task(线程)是运行在Executor(进程)中的最小单位!
12.TaskSet:任务集,就是同一个Stage中的各个Task组成的集合!
Job调度流程
Spark运行基本流程
1.当一个Spark应用被提交时,首先需要为这个Spark Application构建基本的运行环境,即由任务控制节点(Driver)创建一个SparkContext(还会构建DAGScheduler和TaskScheduler)
2.SparkContext向资源管理器注册并申请运行Executor资源;
3.资源管理器为Executor分配资源并启动Executor进程,Executor运行情况将随着心跳发送到资源管理器上;
4.SparkContext根据RDD的依赖关系构建成DAG图,并提交给DAGScheduler进行解析划分成Stage,并把该Stage中的Task组成的Taskset发送给TaskScheduler。
5.TaskScheduler将Task发放给Executor运行,同时SparkContext将应用程序代码发放给Executor。
6.Executor将Task丢入到线程池中执行,把执行结果反馈给任务调度器,然后反馈给DAG调度器,运行完毕后写入数据并释放所有资源。
Spark Application应用的用户代码都是基于RDD的一系列计算操作,实际运行时,这些计算操作是Lazy执行的,并不是所有的RDD操作都会触发Spark往Cluster上提交实际作业,基本上只有一些需要返回数据或者向外部输出的操作才会触发实际计算工作(Action算子),其它的变换操作基本上只是生成对应的RDD记录依赖关系(Transformation算子)。
当RDD调用Action函数(比如count、saveTextFile或foreachPartition)时,触发一个Job执行,调度中流程如下图所示:
Spark RDD通过其Transactions操作,形成了RDD血缘关系图,即DAG,最后通过Action的调用,触发Job并调度执行。
Spark的任务调度总体来说分两路进行:Stage级的调度和Task级的调度
DAGScheduler负责Stage级的调度,主要是将DAG依据RDD宽依赖切分成若干Stages,并将每个Stage打包成TaskSet交给TaskScheduler调度。
TaskScheduler负责Task级的调度,将DAGScheduler给过来的TaskSet按照指定的调度策略分发到Executor上执行,调度过程中SchedulerBackend负责提供可用资源,其中SchedulerBackend有多种实现,分别对接不同的资源管理系统。
一个Spark应用程序包括Job、Stage及Task:
Job/DAG是以Action方法为界,遇到一个Action方法则触发一个Job;
Stage是Job的子集,以RDD宽依赖(即Shuffle)为界,遇到Shuffle做一次划分;
Task是Stage的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个task。