一文搞懂Spark的Task调度器(TaskScheduler)[通俗易懂]

2022-11-07 15:42:33 浏览数 (1)

TaskScheduler的核心任务是提交TaskSet到集群运算并汇报结果。

  1. 为TaskSet创建和维护一个TaskSetManager, 并追踪任务的本地性及错误信息。
  2. 遇到Straggle任务会放到其他结点进行重试。
  3. 向DAGScheduler汇报执行情况, 包括在Shuffle输出丢失时报告fetch failed错误等信息。

TaskScheduler底层调度器

  • 1. TaskScheduler原理剖析
  • 2. TaskScheduler源代码解析
    • 2.1 TaskScheduler 实例化源代码
    • 2.2 TashScheduler初始化源代码
    • 2.3 TaskScheduler启动源代码
    • 2.4 TaskScheduler提交任务源代码

1. TaskScheduler原理剖析

通过之前 DAGScheduler的介绍可以 知道, DAGScheduler 将划分的一系列 Stage (每个Stage封装一个TaskSet) , 按照Stage的先后顺序依次提交给底层的TaskScheduler去执行。 下面来分析TaskScheduler接收到DAGScheduler的Stage任务 后, 是如何管理Stage (TaskSet) 的生命周期的。

TaskSchedulerlmpl在createTaskScheduler方法中实例化后, 就立即调用自己的initialize 方法把StandaloneSchedulerBackend的实例对象传进来 , 从而赋值给TaskSchedulerlmpl的backend。 在TaskSchedulerlmpl的血tialize方法中, 根据调度模式的配置创建 实现了 Schedul­erBuilder接口的相应实例对象, 并且创建的对象会立即调用buildPools创建 相应数量的Pool 存放和管理TaskSetManager的实例对象。 实现SchedulerBuilder接口的具体类都是Scheduler­Builder的内部类。

(1)FIFOSchedulableBuilder: 调度模式是SchedulingMode.FIFO , 使用先进先出策略调度。这是默认模式,在该模式下,只有一个TaskSetManager池。 (2)FairSchedulableBuilder: 调度模式是SchedulingMode.FAIR, 使用公平策略调度。

在createTaskSched uler方法返回后,TaskSchedulerlmpl通过DAGScheduler的实例化过程设置 DAGScheduler的实例对象, 然后调用自己的start方法。 在 TaskSchedulerlmpl 调用start 方法时, 会调用StandaloneSchedulerBackend的start方法 , 在StandaloneSchedulerBackend的start方法中会最终注册应用程序AppClient。 TaskSchedulerlmpl的start方法中还会根据配置判断是否周期性地检查任务的推测执行。 TaskSchedulerlmpl启动后, 就可以接收 DAGScheduler的submi tMissingTasks方法提交过来的TaskSet 进行进一步处理。 TaskSchedulerlmpl在submitTasks 中初始化 一个TaskSetManag­er 对其生命周期 进行管理, 当TaskSchedulerlmpl得到Worker结点上的Executor 计算资源时, 会通过TaskSetManager来发送具体的Task到Executor上执行计算。 如果Task执行过程中有错误导致失败 , 会调用TaskSetManager来处理Task失败的情况,进而通知DAGScheduler结束当前的Task。 TaskSetManager 会将失败的Task再次添加到待执行的Task队列中。

Spark Task允许失败的次数默认是4次,在TaskSchedulerlmpl初始化时通过spark. task. maxFailures 设置该默认值。

如果Task执行完毕,执行的结果会反馈给TaskSetManager,由TaskSetManager通知DAGScheduler。DAGScheduler根据是否还存在待执行的Stage,继续迭代提交对应的TaskSet给TaskScheduler去执行,或者输出Job的结果。

2. TaskScheduler源代码解析

下面通过源代码解析来看一下 TaskScheduler 是如何调度和管理 TaskSet 的任务。

2.1 TaskScheduler 实例化源代码

TaskScheduler 和 DAGScheduler 都在 SparkContext 实例化的时候一同实例化。 Spark Context 源代码中与 TaskScheduler 实例化相关的代码如下。

代码语言:javascript复制
  private var _taskScheduler: TaskScheduler = _//任务调度器
.......
private[spark] def taskScheduler: TaskScheduler = _taskScheduler
private[spark] def taskScheduler_=(ts: TaskScheduler): Unit = { 

_taskScheduler = ts
}
.......
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
.......
//启动任务调度器
_taskScheduler.start()

本博客仅介绍Spark的Standalone部署模式,Spark Context的createTaskScheduler方法中与Standalone部署模式相关的代码如下。

代码语言:javascript复制
private def createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler) = { 

import SparkMasterRegex._
//在本地运行时,请勿尝试在失败时重新执行任务
val MAX_LOCAL_TASK_FAILURES = 1
/** * 确保默认执行者的资源满足一项或多项任务要求。 * 此功能适用于未设置执行程序核心配置的集群管理器,其他功能则在ResourceProfile中检查。*/
def checkResourcesPerTask(executorCores: Int): Unit = { 

val taskCores = sc.conf.get(CPUS_PER_TASK)//配置中每个任务分配的核数
/** * SKIP_VALIDATE_CORES_TESTING: * 此配置用于单元测试,以允许跳过任务cpus到内核验证,以允许在本地模式下运行时模拟独立模式的行为。 * 默认情况下,独立模式不指定执行者内核的数量,它仅使用主机上可用的所有内核。 * */
if (!sc.conf.get(SKIP_VALIDATE_CORES_TESTING)) { 

//检查每个executor的核数至少满足一个任务的需求
validateTaskCpusLargeEnough(sc.conf, executorCores, taskCores)
}
val defaultProf = sc.resourceProfileManager.defaultResourceProfile
ResourceUtils.warnOnWastedResources(defaultProf, sc.conf, Some(executorCores))
}
//根据不同的运行模式,进行不同的初始化
master match { 

........
//Spark Standalone部署模式下TaskScheduler和SchedulerBackend分别由各自对应的实现类TaskSchedulerImpl和StandaloneSchedulerBackend,来实例化对象
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://"   _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
............
} catch { 

case se: SparkException => throw se
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
}
}

2.2 TashScheduler初始化源代码

TaskScheduler在创建之后会调用其初始化方法进行初始化。TaskScheduler (实际上在实现类TaskSchedulerlmpl的initialize方法中)在初始化的过程中设置对SchedulerBackend 对象的引用, 实例化SchedulerBuilder具体实现类的对象用来创建和管理TaskSetManager池。 TaskSchedulerlmpl源代码中的相关代码如下:

代码语言:javascript复制
....
// default scheduler is FIFO
private val schedulingModeConf = conf.get(SCHEDULER_MODE)
val schedulingMode: SchedulingMode =
try { 

SchedulingMode.withName(schedulingModeConf)
} catch { 

case e: java.util.NoSuchElementException =>
throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf")
}
//root的名字暂时设置为空值
val rootPool: Pool = new Pool("", schedulingMode, 0, 0)
.....
def initialize(backend: SchedulerBackend): Unit = { 

//设置对 SchepulerBackend 对象的引用
this.backend = backend
schedulableBuilder = { 

schedulingMode match { 

//调度模式是 FIFO
case SchedulingMode.FIFO =>
//FairSchedulableBuilder的rootPool里面直接添加TaskManager
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
//FaiiSchedulahleBuilder的rootPool根据配置文件可以挂若干个子pool, 每个 pool里面都添加TaskManager
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: "  
s"$schedulingMode")
}
}
//构建TaskManager池
schedulableBuilder.buildPools()
}

2.3 TaskScheduler启动源代码

TaskScheduler 实例对象在 DAGScheduler 实例化之后启动, 并且 TaskScheduler 启动的过程由 TaskSchedulerlmpl 具体实现。 在启动过程中, 主要是调用 SchedulerBackend 的启动方法, 然后对不是本地部署模式并且开启任务的推测执行(设置 spark. speculation 为 true)情况, 根据配置判断是否周期性地调用 TaskSetManager 的 checkSpeculatableTasks 方法检查任务 的推测执行。 StandaloneSchedulerBackend的 start 方法中会最终注册应用程序 AppClient。 TaskScheduler 的启动源代码如下所示:

代码语言:javascript复制
 override def start(): Unit = { 

//调用SchedulerBackend的start方法启动
backend.start()
//不是本地模式,并且开启了推测执行
if (!isLocal && conf.get(SPECULATION_ENABLED)) { 

logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(
() => Utils.tryOrStopSparkContext(sc) { 
 
// 最终会调用调度池中的TaskSetManager 中的checkSpeculatableTasks来检查推测执行的任务
checkSpeculatableTasks() },
SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}

2.4 TaskScheduler提交任务源代码

TaskSchedulerlmpl启动后, 就可以接收DAGScheduler的submitMissingTasks方法提交过来的TaskSet进行进一步处理了。 对千ShuffleMapStage类型的Stage, DAGScheduler初始化一 组ShuffleMapTask实例对象;对于ResultStage类型的Stage, DAGScheduler初始化一组Re­sultTask实例对象。 最后, DAGScheduler将这组ResultTask实例对象封装成TaskSet实例对象 提交给TaskSchedulerlmpl。

注意,ShuffleMapTask是根据Stage所依赖的RDD的partition分布产生跟partition数量相等的Task, 这些Task根据partition的本地性分布在不同的集群结点;ResultTask负责输出整个Job的结果。

DAGScheduler的submitMissingTasks方法的部分关键代码如下。

代码语言:javascript复制
//创建任务列表
val tasks: Seq[Task[_]] = try { 

val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match { 

case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { 
 id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions  = id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
partitionsToCompute.map { 
 id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
} catch { 

......
}
if (tasks.nonEmpty) { 

logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 "  
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
//调用taskScheduler中的submitTasks方法提交任务集合
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties,
stage.resourceProfileId))
} else { 

.......
}

TaskSchedulerlmpl 在 submitTasks 中初始化一个 TaskSetManager, 并通过SchedulerBuilder 对其生命周期进行管理, 最后调用 SchedulerBackend 的 reviveOffers 方法进行 TaskSet 所需资源的分配。 在 TaskSet 得到足够的资源后, 在 SchedulerBackend 的 launchTasks 方法中将 Task­Set 中的 Task 一个一个地发送到 Executor去执行。submitTasks 源代码如下所示:

代码语言:javascript复制
 override def submitTasks(taskSet: TaskSet): Unit = { 

val tasks = taskSet.tasks
logInfo("Adding task set "   taskSet.id   " with "   tasks.length   " tasks "
  "resource profile "   taskSet.resourceProfileId)
this.synchronized { 

//初始化创建一个 TaskSetManager
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
// 在此阶段,将所有现有TaskSetManager标记为僵尸,因为我们要添加一个新的。
//这对于处理corner的情况是必要的。假设一个阶段具有10个分区,并具有2个TaskSetManager:TSM1(僵尸)和TSM2(活动)。 TSM1有一个正在运行的分区10任务,它已完成。 TSM2完成了分区1-9的任务,并认为他仍处于活动状态,因为分区10尚未完成。 但是,DAGScheduler获取所有10个分区的任务完成事件,并认为阶段已完成。 如果是shuffle阶段,并且由于某种原因缺少map outputs,则DAGScheduler将重新提交它并为其创建TSM3。 由于一个阶段不能有多个活动任务集管理器,因此必须将TSM2标记为僵尸(实际上是)。
stageTaskSets.foreach { 
 case (_, ts) =>
ts.isZombie = true
}
stageTaskSets(taskSet.stageAttemptId) = manager
//schedulableBuilder将新建的TaskSetManager实例对象添加到关联的Pool中
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
//对于非本地部署模试,如果没有接收到Task,就周期性地警告或者取消Task
if (!isLocal && !hasReceivedTask) { 

starvationTimer.scheduleAtFixedRate(new TimerTask() { 

override def run(): Unit = { 

if (!hasLaunchedTask) { 

logWarning("Initial job has not accepted any resources; "  
"check your cluster UI to ensure that workers are registered "  
"and have sufficient resources")
} else { 

this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
//Spark Standalone部署模式调用StandaloneSchedulerBackend的reviveOffers方法进行TaskSet所需资源的分配,在得到足够的资源后,将TaskSet中的Task一个一个地发送到Executor去执行
backend.reviveOffers()
}

关于本片博客中涉及的StandaloneSchedulerBackend相关方法,将在下一篇博客中详细介绍。

如果喜欢的话希望点赞收藏,关注我,将不间断更新博客。

希望热爱技术的小伙伴私聊,一起学习进步

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/183598.html原文链接:https://javaforall.cn

0 人点赞