TaskScheduler详解及源码介绍

2022-11-07 16:33:13 浏览数 (1)

大家好,又见面了,我是你们的朋友全栈君。

TaskScheduler负责提交任务,并且请求集群管理器调度任务。

  1. 提交任务
  2. 请求集群管理器调度任务 Spark的集群管理器有三种:独立集群管理器、Hadoop Yarn、Apache Mesos。可以参考Spark集群管理器介绍-博客园了解一下。

1 创建TaskScheduler:createTaskScheduler

创建TaskScheduler的源代码为SparkContext.createTaskScheduler,如下所示。该方法会根据master的配置匹配部署模式,每种部署模式中都会创建两个类(TaskSchedulerImpl、SchedulerBackend)的实例,只是TaskSchedulerImpl都相同,SchedulerBackend不同。

代码语言:javascript复制
/**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
*/
private def createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler) = {
import SparkMasterRegex._
// When running locally, don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1
master match {
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_N_REGEX(threads) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount = if (threads == "*") localCpuCount else threads.toInt
if (threadCount <= 0) {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*, M] means the number of cores on the computer with M failures
// local[N, M] means exactly N threads with M failures
val threadCount = if (threads == "*") localCpuCount else threads.toInt
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://"   _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
if (sc.executorMemory > memoryPerSlaveInt) {
throw new SparkException(
"Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
memoryPerSlaveInt, sc.executorMemory))
}
val scheduler = new TaskSchedulerImpl(sc)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
val masterUrls = localCluster.start()
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
localCluster.stop()
}
(backend, scheduler)
case masterUrl =>
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '"   master   "'")
}
try {
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case se: SparkException => throw se
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
}
}

2 TaskScheduler的实现类:TaskSchedulerImpl

TaskSchedulerImpl的源代码如下:

代码语言:javascript复制
private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
val maxTaskFailures: Int,
isLocal: Boolean = false)
extends TaskScheduler with Logging {
...
}

TaskSchedulerImpl的构造过程:

  1. 从SparkConf中读取配置信息,包括每个任务分配的CPU数、调度模式(调度模式又FAIR和FIFO两种,默认为FIFO,可以修改属性spark.scheduler.mode来改变)等。源代码为:
代码语言:javascript复制
  val conf = sc.conf
// How often to check for speculative tasks
val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")
// Duplicate copies of a task will only be launched if the original copy has been running for
// at least this amount of time. This is to avoid the overhead of launching speculative copies
// of tasks that are very short.
val MIN_TIME_TO_SPECULATION = 100
private val speculationScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")
// Threshold above which we warn user initial TaskSet may be starved
val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")
// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
// default scheduler is FIFO
private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString)
private lazy val barrierSyncTimeout = conf.get(config.BARRIER_SYNC_TIMEOUT)
  1. 创建TaskResultGetter,它的作用是通过线程池对Worker上的Executor发送的Task的执行结果进行处理。
代码语言:javascript复制
  //源码:TaskSchedulerImpl的部分源码
// This is a var so that we can reset it for testing purposes.
private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)

其中,该线程池由Executors.newFixedThreadPool创建,默认4个线程,线程名字以task-result-getter开头,线程工厂默认是Executors.defaultThreadFactory。根据下面两部分源码逐个解释:

  • 默认4个线程 TaskResultGetter类中的THREADS常量,通过字符串”spark.resultGetter.threads”得到值4。后面将THREADS作为参数,传入进ThreadUtils.newDaemonFixedThreadPool方法,再传入进Executors.newFixedThreadPool方法,依次向下传入,最终传入进ThreadPoolExecutor的构造方法,作为参数corePoolSize,通过this.corePoolSize = corePoolSize;设置线程池的默认线程数为4。
  • 线程名字以task-result-getter开头 在方法newDaemonFixedThreadPool方法的英文注释中说得很清楚。“Thread names are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.”即线程名的格式是prefix-ID,其中prefix即TaskResultGetter类中通过语句ThreadUtils.newDaemonFixedThreadPool(THREADS, "task-result-getter")传入的字符串”task-result-getter”,ID是惟一的、按顺序分配的整数。
  • 线程工厂默认是Executors.defaultThreadFactory 说实话,没找到哎 TaskResultGetter —> protected val getTaskResultExecutor: ExecutorService = ThreadUtils.newDaemonFixedThreadPool(THREADS, "task-result-getter") —> def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = { //使用到的线程工厂 val threadFactory = namedThreadFactory(prefix) ... } —> def namedThreadFactory(prefix: String): ThreadFactory = { new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix "-%d").build() } —> public ThreadFactory build() {return build(this);} —> private static ThreadFactory build(ThreadFactoryBuilder builder) { ... final ThreadFactory backingThreadFactory = builder.backingThreadFactory != null ? builder.backingThreadFactory : Executors.defaultThreadFactory(); Thread thread = backingThreadFactory.newThread(runnable); ... } —> //源码来自:Executors.java static class DefaultThreadFactory implements ThreadFactory {...} 追根溯源到最后,最初的线程工厂实例即backingThreadFactory,它是类Executors.defaultThreadFactory的实例。
代码语言:javascript复制
//源码:TaskResultGetter的部分源码
/**
Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
*/
private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
extends Logging {
//默认创建4个线程。
private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4)
// Exposed for testing.
protected val getTaskResultExecutor: ExecutorService =
ThreadUtils.newDaemonFixedThreadPool(THREADS, "task-result-getter")
...
}
代码语言:javascript复制
//源码:newDaemonFixedThreadPool方法的源码,来自ThreadUtils.scala文件。
/**
* Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a
* unique, sequentially assigned integer.
*/
def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
}
代码语言:javascript复制
//源码:ThreadPoolExecutor
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
*        if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
*        pool
* @param keepAliveTime when the number of threads is greater than
*        the core, this is the maximum time that excess idle threads
*        will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
*        executed.  This queue will hold only the {@code Runnable}
*        tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
*        creates a new thread
* @param handler the handler to use when execution is blocked
*        because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
*         {@code corePoolSize < 0}<br>
*         {@code keepAliveTime < 0}<br>
*         {@code maximumPoolSize <= 0}<br>
*         {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
*         or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

Spark的任务调度模式有两种FAIR(公平调度)、FIFO(先入先出),任务的最终调度实际都是落实到接口SchedulerBackend的具体实现上。 在SchedulerBackEnd中到底是怎么调度的呢???,看过几篇文章太简略,笔者需要再学习一下,后面继续补充。关于此有一篇不错的文章,即Spark理论学习笔记(一)-一笑之奈何-博客园

3 TaskScheduler的启动

TaskScheduler的启动源码:

SparkContext.scala中启动TaskScheduler的代码:

代码语言:javascript复制
    //源码来自SparkContext.scala
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()

_taskScheduler是TaskSchedulerImpl的实例,TaskSchedulerImpl是TaskScheduler的实现类,并且TaskSchedulerImpl重写了TaskScheduler中的start()方法,所以_taskScheduler.start()实际启动了TaskSchedulerImpl中的start()方法。由下面的源码可知,TaskScheduler在启动的时候,实际调用了backend的start()方法。

代码语言:javascript复制
//源码来自TaskSchedulerImpl.scala
override def start() {
backend.start()
}

**那么backend启动后,又做了什么呢?**接着看第四章

4 TaskScheduler提交任务

在下面的源码中,backend.reviveOffers()这一句用于提交任务。调用taskScheduler的submitTasks方法,最终会转到调用backend的reviveOffers方法。所谓的taskScheduler调度任务,实际是由schedulerBackend调度任务。

代码语言:javascript复制
  //源码来自:TaskSchedulerImpl.scala
override def submitTasks(taskSet: TaskSet) {
...
}

4 SchedulerBackend

代码语言:javascript复制
/**
* Used when running a local version of Spark where the executor, backend, and master all run in
* the same JVM. It sits behind a [[TaskSchedulerImpl]] and handles launching tasks on a single
* Executor (created by the [[LocalSchedulerBackend]]) running locally.
*/
private[spark] class LocalSchedulerBackend(
conf: SparkConf,
scheduler: TaskSchedulerImpl,
val totalCores: Int)
extends SchedulerBackend with ExecutorBackend with Logging {
...
override def start() {
val rpcEnv = SparkEnv.get.rpcEnv
val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores)
localEndpoint = rpcEnv.setupEndpoint("LocalSchedulerBackendEndpoint", executorEndpoint)
listenerBus.post(SparkListenerExecutorAdded(
System.currentTimeMillis,
executorEndpoint.localExecutorId,
new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty)))
launcherBackend.setAppId(appId)
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
...
}
代码语言:javascript复制
/**
* A backend interface for scheduling systems that allows plugging in different ones under
* TaskSchedulerImpl. We assume a Mesos-like model where the application gets resource offers as
* machines become available and can launch tasks on them.
*/
private[spark] trait SchedulerBackend {
private val appId = "spark-application-"   System.currentTimeMillis
def start(): Unit
def stop(): Unit
def reviveOffers(): Unit
def defaultParallelism(): Int
...
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/183089.html原文链接:https://javaforall.cn

0 人点赞