Spark任务调度 | Spark,从入门到精通

2019-04-19 15:58:47 浏览数 (1)

欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,欢迎大家持续关注:)

往期直通车:Hello Spark!

Spark on Yarn

RDD原理与基础操作

图 1

如图 1 所示是 Spark 的执行过程,那么具体 Drvier 是如何把 Task 提交给 Executor 的呢?本文将通过 DAGScheduler 、TaskScheduler、调度池和 Executor 四部分介绍 Spark 的任务调度原理及过程。

/ DAGScheduler /

Spark 任务调度中各个 RDD 之间存在着依赖关系,这些依赖关系就形成有向无环图 DAG,DAGScheduler 负责对这些依赖关系形成的 DAG 并进行 Stage 划分,而 DAGScheduler 分为创建、Job 提交、Stage 划分、Task 生成四个部分。

DAGScheduler 创建—Job 提交—Stage 划分—Task 生成

代码语言:javascript复制
private[spark]
class DAGScheduler(
  private[scheduler] val sc: SparkContext,
  private[scheduler] val taskScheduler: TaskScheduler,
  listenerBus: LiveListenerBus,
  mapOutputTracker: MapOutputTrackerMaster,
  blockManagerMaster: BlockManagerMaster,
  env: SparkEnv,
  clock: Clock = new SystemClock())
extends Logging {

def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
  this(
    sc,
    taskScheduler,
    sc.listenerBus,
    sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
    sc.env.blockManager.master,
    sc.env)
}
.....
}

DAGScheduler 在 SparkContext 中创建,并且需要提供 TaskScheduler 的实例。在构造函数中的 MapOutputTrackerMaster 是运行在 Driver 端用来管理 ShuffleMapTask 的输出,下游的 Task 可以通过 MapOutputTrackerMaster 来获取 Shuffle 输出的位置信息。

代码语言:javascript复制
private[spark]
class DAGScheduler(
    private[scheduler] val sc: SparkContext,
    private[scheduler] val taskScheduler: TaskScheduler,
    listenerBus: LiveListenerBus,
    mapOutputTracker: MapOutputTrackerMaster,
    blockManagerMaster: BlockManagerMaster,
    env: SparkEnv,
    clock: Clock = new SystemClock())
  extends Logging {

  .....
  private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
  .....
}

DAGScheduler 是基于 Akka Actor 的消息传递机制来构建事件循环处理逻辑,如上段代码所示,在 DAGScheduler 初始化时创建了 eventProcessLoop 以处理各种 DAGSchedulerEvent,这些事件包括作业的提交、任务状态的变化、监控等。

DAGScheduler 创建—Job 提交—Stage 划分—Task 生成

图 2

如图 2 所示是 RDD 的 count 执行调用过程。其中,在 DAGScheduelr 的 submitJob 方法中会生成 JobId,并创建一个 JobWaiter 监听 Job 是否执行成功。一个 Job 内包含多个 Task,只有所有 Task 都执行成功该 Job 才会被 JobWaiter 标记为 Succes。

DAGScheduler 创建—Job 提交—Stage 划分—Task 生成

用户提交的计算任务是由多个 RDD 构成的 DAG, 当 RDD 在转换时需要进行 Shuffle,Shuffle 的过程中就将这个 DAG 划分成了多个 Stage。

由于后面的 Stage 需要前面的 Stage 提供 Shuffle 的结果,因此不同的 Stage 不能并行计算。那么 RDD 在哪些操作时需要进行 Shuffle 呢?这里涉及到 RDD 的两种依赖关系:宽依赖与窄依赖。

图 3

如图 3 左侧所示为窄依赖,由于 RDD 每个 partition 依赖固定数量的 parent RDD 的 partition,所以可以通过 Task 来处理这些 partition。而且这些 partition 相互独立,所以 Task 可以并行计算。宽依赖反之。

图 4

让我们举例说明 Stage 的划分过程,如图 4 所示从触发 Action 的 RDD G 开始划分,G 依赖 B 和 F,处理 B 和 F 的顺序是随机的,假设先处理 B。由于 G 和 B 是窄依赖关系,可以划分到同一个 Stage 。接着处理 F,此时 F 和 G 是宽依赖关系,所以将 F 划分到一个新的 Stage,以此类推划分其它 Stage。

接着以 Stage 1 为例看它的计算方式,如图 4 所示 RDD A 有三个 Partition,因此会生成三个 ShuffleMapTask,这三个 Task 会把结果输出到三个 Partition 中。

DAGScheduler 创建—Job 提交—Stage 划分—Task 生成

任务生成首先要获取需要计算的 Partition,如果是最后的 Stage 所对应的 Task 是 ResultTask,那么先判断 ResultTask 是否结束,若结束则无需计算;对于其它 Stage 对应的都是 ShuffleMapTask,因此只需要判断 Stage 中是否有缓存结果。判断出哪些 Partition 需要计算后生成对应的 Task,然后封装到相应的 TaskSet 中,并提交给 TaskScheduler。TaskSet 中包含了一组处理逻辑完全相同的 Task,但它们的处理数据不同,这里的每个 Task 负责一个 partition。

/ TaskScheduler /

TaskScheduler 是在 SparkContext 中通过 createTaskScheduler 把引用传给 DAGScheduler 的构造函数。每个 TaskScheduler 都会对应一个 SchedulerBackend,TaskScheduler 负责 Application 中不同 job 之间的调度,在 Task 执行失败时启动重试机制,并且为执行速度慢的 Task 启动备份的任务;而 SchdulerBackend 负责与 Cluster Manager 交互,获取该 Application 分配到的资源,然后传给 TaskScheduler。

TaskScheduler 执行流程主要分成两个部分:Driver 端执行和 Executor 执行,他们的执行步骤分别如下:

Driver 端执行 TaskSchedulerImpl#submitTasks 将Task加入到TaskSetManager当中 ScheduleBuilder#addTaskSetManager 根据调度优先级确定调度顺序 CoarseGrainedSchdulerBackend#reviveOffers DriverActor#makeOffers TaskSchedulerImpl#resourceOffers 响应资源调度请求,为每个Task分配资源 DriverActor#launchTasks 将tasks发送到Executor Executor上执行 ReceiveWithLogging#launchTasks Executor#launchTask

/ 调度池 /

调度池顾名思义就是存放了一堆待执行的任务,它决定 TaskSetManager 的调度顺序,然后由 TaskSetManager 根据就近原则来确定 Task 运行在哪个 Executor。

那么它是如何决定 TaskSetManager 的调度顺序的呢? 调度池主要有两个决策策略:FIFO 和 FAIR。

图 5

首先以整体看 FIFO 和 FAIR 的执行对比图图 5,可以看出在左侧 FIFO 只有一个调度池,即 rootPool,里面包含了待调度的 TaskSetManager;而右侧 FAIR 在 rootPool 调度池中包含了多个子调度池,比如图中的 production 和 test 调度池。

在 FIFO 算法中需要保证 JobId 比较小的优先执行,如果是同一个 Job 则 StageId 比较小的先被调度。FAIR 算法则提供参数配置,如图 6 所示是一份配置文件:

图 6

接着看看我们的 Spark 集群是如何配置的。

代码语言:javascript复制
private[spark] trait SchedulingAlgorithm {
  def comparator(s1: Schedulable, s2: Schedulable): Boolean
}

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    res < 0
  }
}

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble

    var compare = 0
    if (s1Needy && !s2Needy) {
      return true
    } else if (!s1Needy && s2Needy) {
      return false
    } else if (s1Needy && s2Needy) {
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    }
    if (compare < 0) {
      true
    } else if (compare > 0) {
      false
    } else {
      s1.name < s2.name
    }
  }
}

首先获取 S1 和 S2 两个调度池中的运行状态 Task 个数,若 S1 的运行状态 Task 数小于该调度池的最小资源数,而 S2 相反,那么此时优先调度 S1 中的 Task;如果 S1 和 S2 中的运行状态 Task 数都小于该调度池的最小资源数,那么就依据资源占用率决定调度优先级;如果 S1、S2 的运行状态 Task 数都大于所属调度池的最小资源数,那么就对比它们的已运行 task 个数与分配权重的比例,得出来比例较小的优先调度。

/ Executor /

图 7

图 8

如图 8 所示,Executor 是在 worker 收到 master 的 LaunchExecutorde 消息后创建的。在 TaskScheduler 阶段提交 Task 之后 Driver 会序列化封装 Task 的依赖文件和自身信息,然后在 Executor 上反序列化得到 Task。在准备好了 Task 的执行环境之后就通过 TaskRunner 去执行计算,得到执行状态。值得注意的是,在得到计算结果发回 Driver 的过程中,如果文件太大会被直接丢弃(可以通过 spark.driver.maxResultSize 来设定大小)。

0 人点赞