本篇主要内容包括spark 计算引擎与调度管理的实现方式
- Spark 计算引擎原理
- Spark 调度管理原理
- Spark 存储管理原理
- Spark 监控管理
一 :Spark 计算引擎原理
- 通过RDD创建DAG
- 通过DAG生成逻辑计划
- 划分Stage,生成Task
- 调度并执行Task
- 分布式执行Task
通过上面图可以很清楚的看到从Job的action到中间调度在到最后的具体执行的过程,下面针对该图做一个实例,来更加清楚的理解。
首先,我们启动了spark-shell 来读取本地的文件,然后做wordcount操作,然后统计出一共多少行。
那么通过这么一个简单的job操作,来看一下spark ui 里面具体的DAGScheduler方式
从上图我们可以看出flatmap 和 map 为一个stage0,在reducebykey的时候,又划分了一个stage1 ,那么stage的划分是根据shuffle或者说根据依赖关系来的,后面会更加详细的说到。
接下来说一下shuffle,shuffle是什么呢?在第一节的时候,有提到shuffle整个概念,并且简单的说到了宽依赖和窄依赖,或者我们叫做完全依赖和部分依赖。
shuffle的目的或者我们说shuffle的作用就是数据分类和数据聚合。通俗而言,就是讲跨节点间的数据进行聚合和归并操作,•Shuffle是分布式计算框架的核心数据交换方式,其实现方式直接决定了计算框架的性能和扩展性,shuffle操作是会导致数据计算的效率有所降低,那么如何讲shuffle所带来的损失降到最低呢?下面来一起了解一下spark中对于shuffle处理逐步改进的方案。
spark shuffle分为两个阶段,一个是write阶段,一个是read阶段
spark shuffle write阶段
write阶段分为两种:Hash-based 和 Sort-based
Hash-based:这个是最初的spark版本时,使用的shuffle write 方式
Hash-based 实现结构图(摘自网络):
如上图所示,每一个Task在计算完之后,会将结果集存储到本地的一份文件中,那么在进行shuffle操作时,这种实现方式会有M*N条链接,如果我们的bucket数量比较多的话,那么这个是很耗费资源的。所以后来spark shuffle write 改为sort-based方式
sort-based 实现结构图(摘自网络)
如上图所示,每一个task在计算完之后,会生成一个文件,每次的结果集会追加到该文件中,同时,会有一个索引文件记录了该块数据的位置,那么在进行write时,连接数的数量就大大减少了。
spark shuffle read阶段
在进行shuffle操作的时候,spark内部隐式的创建了一个transformation操作,用于做shuffle操作
shuffle read结构图(摘自网络)
shuffle read阶段,spark内部有一个单独的类BlockStoreShuffleFetcher去获取数据,之后获取到mata信息,存入到Set中,如果数据是在本地那么直接通过BlockManager.getBlockData进行本地数据读取,如果数据实在远程Executor中,那么会通过NettyBlockTransferService.fetchBlocks去获取。
(关于spark shuffle的操作后续有单独的几个篇章来讲解,这里就不过都多阐述了,篇幅有点长)
二:Spark调度管理原理
Spark 调度管理系统是Spark程序得以运转的核心,其中作业调度是调度管理模块的枢纽,调度的前提是判断多个作业任务的依赖关系(Stage),作业任务之间存在因果的依赖关系,也就是说,有些任务必须要先执行,然后相关依赖的任务才能执行,任务之间不能出现循环依赖,所以本质上就是DAG。
作业调度相关类型,以DAGScheduler,也就是基于DAG图的调度类为核心
Spark 调度相关概念
Task(任务):单个分区数据集上的最小处理单元
TaskSet(任务集):有一组关联的,但互相直接没有Shuffle依赖关系的任务组成
Stage(调度阶段):一个任务集对应的调度阶段
Job (作业):由一个RDD Action 生成的一个或多个调度阶段所组成的一次计算作业
Application(应用程序):Spark 应用程序,有一个或者多个作业组成
Spark 调度相关概念逻辑关系图
Spark 作业调度顶层逻辑
每个RDD Action类型的算子,内部都是一次隐式的作业提交
DAGScheduler最重要的任务之一就是计算作业和任务的依赖关系,制定调度逻辑。
DAGScheduler在SparkContext初始化的过程中被实例化,一个SparkContext应创建一个DAGScheduler
DAGScheduler内部维护着各种“任务/调度阶段/作业”的状态互相之间的映射表,用于在任务状态,集群状态更新时,能够正确的维护作业的运行逻辑
Spark 作业调度流程图
Spark 作业调度交互流程
Spark 作业调度-调度阶段的拆分
当一个RDD操作触发计算,向DAGScheduler提交作业时,DAGScheduler需要从RDD依赖链的末端RDD出发,遍历整个RDD依赖链,划分调度阶段,并决定各个调度阶段之间的依赖关系调度阶段的划分是以ShuffleDependency为依据,也就是说当某个RDD的运算需要将数据进行shuffle操作时,整个包含了Shuffle依赖关系的RDD将被用来作为输入信息,构建一个新的调度阶段Spark 作业调度-finalStage的提交在划分调度阶段的步骤中会得到一个或多个有依赖关系的调度阶段,其中直接触发RDD关联的调度阶段称为FinalStage,然后DAGScheduler进一步从这个FinalStage生成一个作业实例,这两者的关系进一步存储在映射表中,用于在该调度阶段全部完成做一些后续处理,比如:状态报告,清理作业相关数据等。
Spark 作业调度-状态监控&任务结果获取
DAGScheduler对外暴露了一系列的回调函数,对于TaskScheduler而言,这些回调函数主要包括任务的开始结束失败,任务集的失败,DAGScheduler根据这些任务的生命周期进一步维护作业呵调度阶段的状态信息
Spark 作业调度-任务结果获取
一个具体任务在Executor中执行完毕后,其结果需要以某种形式返回给DAGScheduler根据调度的方式不同,返回的方式也不同。对于FinalStage所对应的任务,返回给DAGScheduler的是运算结果本身,而对于中间调度阶段对应的任务ShuffleMapTask返回给DAGScheduler的是一个MapStatus对象,MapStatus对象管理了ShuffleMapTask的运算输出结果在Blockmanager里的项目存储信息,而非结果本身。根据任务结果的大小不同,ResultTask返回的结果又非为两类,如果结果足够小,则直接放在DirectTaskResult对象内,如果超过特定尺寸则在Executor端会将
DirectTaskResult先序列化,再把序列化的结果作为一个数据快存放在BlockManager中,然后将BlockManager返回的BlockID放在IndirectTaskResult对象中,返回给TaskScheduler。TaskScheduler进而调用TaskResultGetter将IndirectTaskResult中的BlockID取出并通过BlockManager最终取得对应的DirectTaskResult。
Spark 作业调度总结
Spark的调度管理是Spark作业运行和资源分配的核心,调度的层次依次是底层计算资源,任务调度,作业调度,应用调度。了解这些层次之间的逻辑关系,可以更方便的对Spark的运行状态监控以及对于集群的配置优化。