Spark:Dynamic Resource Allocation【动态资源分配】

2022-05-12 14:25:37 浏览数 (1)

代码语言:javascript复制
1. 问题背景
2. 原理分析
   2.1 Executor生命周期
   2.2 ExecutorAllocationManager上下游调用关系
3. 总结与反思
4. Community Feedback

1.问题背景

用户提交Spark应用到Yarn上时,可以通过spark-submit的num-executors参数显示地指定executor个数,随后,ApplicationMaster会为这些executor申请资源,每个executor作为一个Container在Yarn上运行。Spark调度器会把Task按照合适的策略分配到executor上执行。所有任务执行完后,executor被杀死,应用结束。在job运行的过程中,无论executor是否领取到任务,都会一直占有着资源不释放。很显然,这在任务量小且显示指定大量executor的情况下会很容易造成资源浪费。

在探究Spark如何实现之前,首先思考下如果自己来解决这个问题,需要考虑哪些因素?大致的方案很容易想到:如果executor在一段时间内一直处于空闲状态,那么就可以kill该executor,释放其占用的资源。当然,一些细节及边界条件需要考虑到:

  • executor动态调整的范围?无限减少?无限制增加?
  • executor动态调整速率?线性增减?指数增减?
  • 何时移除Executor?
  • 何时新增Executor了?只要由新提交的Task就新增Executor吗?
  • Spark中的executor不仅仅提供计算能力,还可能存储持久化数据,这些数据在宿主executor被kill后,该如何访问?
  • 。。。

2.原理分析

2.1 Executor生命周期

首先,先简单分析下Spark静态资源分配中Executor的生命周期,以spark-shell中的wordcount为例,执行命令如下:

代码语言:javascript复制
# 以yarn模式执行,并指定executor个数为1
$ spark-shell --master=yarn --num-executors=1

# 提交Job1 wordcount
scala> sc.textFile("file:///etc/hosts").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_   _).count();

# 提交Job2 wordcount
scala> sc.textFile("file:///etc/profile").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_   _).count();

# Ctrl C Kill JVM

上述的Spark应用中,以yarn模式启动spark-shell,并顺序执行两次wordcount,最后Ctrl C退出spark-shell。此例中Executor的生命周期如下图:

static-allocation

从上图可以看出,Executor在整个应用执行过程中,其状态一直处于Busy(执行Task)或Idle(空等)。处于Idle状态的Executor造成资源浪费这个问题已经在上面提到。下面重点看下开启Spark动态资源分配功能后,Executor如何运作。

spark_dynamic_allocation_executor_lifecycle

下面分析下上图中各个步骤:

  1. spark-shell Start:启动spark-shell应用,并通过--num-executor指定了1个执行器。
  2. Executor1 Start:启动执行器Executor1。注意:Executor启动前存在一个AM向ResourceManager申请资源的过程,所以启动时机略微滞后与Driver。
  3. Job1 Start:提交第一个wordcount作业,此时,Executor1处于Busy状态。
  4. Job1 End:作业1结束,Executor1又处于Idle状态。
  5. Executor1 timeout:Executor1空闲一段时间后,超时被Kill。
  6. Job2 Submit:提交第二个wordcount,此时,没有Active的Executor可用。Job2处于Pending状态。
  7. Executor2 Start:检测到有Pending的任务,此时Spark会启动Executor2。
  8. Job2 Start:此时,已经有Active的执行器,Job2会被分配到Executor2上执行。
  9. Job2 End:Job2结束。
  10. Executor2 End:Ctrl C 杀死Driver,Executor2也会被RM杀死。

上述流程中需要重点关注的几个问题:

  • Executor超时:当Executor不执行任何任务时,会被标记为Idle状态。空闲一段时间后即被认为超时,会被kill。该空闲时间由spark.dynamicAllocation.executorIdleTimeout决定,默认值60s。对应上图中:Job1 End到Executor1 timeout之间的时间。
  • 资源不足时,何时新增Executor:当有Task处于pending状态,意味着资源不足,此时需要增加Executor。这段时间由spark.dynamicAllocation.schedulerBacklogTimeout控制,默认1s。对应上述step6和step7之间的时间。
  • 该新增多少Executor:新增Executor的个数主要依据是当前负载情况,即running和pending任务数以及当前Executor个数决定。用maxNumExecutorsNeeded代表当前实际需要的最大Executor个数,maxNumExecutorsNeeded和当前Executor个数的差值即是潜在的新增Executor的个数。注意:之所以说潜在的个数,是因为最终新增的Executor个数还有别的因素需要考虑,后面会有分析。下面是maxNumExecutorsNeeded计算方法:
代码语言:javascript复制
  private def maxNumExecutorsNeeded(): Int = {
    val numRunningOrPendingTasks = listener.totalPendingTasks   listener.totalRunningTasks
    math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
              tasksPerExecutorForFullParallelism)
      .toInt
  }
  • 其中numRunningOrPendingTasks为当前running和pending任务数之和。
  • executorAllocationRatio:最理想的情况下,有多少待执行的任务,那么我们就新增多少个Executor,从而达到最大的任务并发度。但是这也有副作用,如果当前任务都是小任务,那么这一策略就会造成资源浪费。可能最后申请的Executor还没启动,这些小任务已经被执行完了。该值是一个系数值,范围[0~1]。默认1.
  • tasksPerExecutorForFullParallelism:每个Executor的最大并发数,简单理解为:cpu核心数(spark.executor.cores)/ 每个任务占用的核心数(spark.task.cpus)。
问题1:executor动态调整的范围?无限减少?无限制增加?调整速率?

要实现资源的动态调整,那么限定调整范围是最先考虑的事情,Spark通过下面几个参数实现:

  • spark.dynamicAllocation.minExecutors:Executor调整下限。(默认值:0)
  • spark.dynamicAllocation.maxExecutors:Executor调整上限。(默认值:Integer.MAX_VALUE)
  • spark.dynamicAllocation.initialExecutors:Executor初始数量(默认值:minExecutors)。

三者的关系必须满足:minExecutors <= initialExecutors <= maxExecutors

注意:如果显示指定了num-executors参数,那么initialExecutors就是num-executor指定的值。

问题2:Spark中的Executor既提供计算能力,也提供存储能力。这些因超时被杀死的Executor中持久化的数据如何处理?

如果Executor中缓存了数据,那么该Executor的Idle-timeout时间就不是由executorIdleTimeout决定,而是用spark.dynamicAllocation.cachedExecutorIdleTimeout控制,默认值:Integer.MAX_VALUE。如果手动设置了该值,当这些缓存数据的Executor被kill后,我们可以通过NodeManannger的External Shuffle Server来访问这些数据。这就要求NodeManager中spark.shuffle.service.enabled必须开启。

2.2 ExecutorAllocationManager上下游调用关系

Spark动态分配的主要逻辑由ExecutorAllocationManager类实现,首先分析下与其交互的上下游关系,如下图所示:

spark_dynamic_allocation

主要的逻辑很简单:ExecutorAllocationManager中启动一个周期性任务,监控当前Executor是否超时,如果超时就将其移除。当然Executor状态的收集主要依赖于Spark提供的SparkListener机制。周期性任务逻辑如下:

代码语言:javascript复制
private[spark] class ExecutorAllocationManager {

  // Executor that handles the scheduling task.
  private val executor =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")

  def start(): Unit = {
    。。。
    val scheduleTask = new Runnable() {
      override def run(): Unit = {
        try {
          schedule()
        } catch {...}
      }
    }
    executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
    。。。
  }
  
  private def schedule(): Unit = synchronized {
    val now = clock.getTimeMillis
    // 同步当前所需要的Executor数
    updateAndSyncNumExecutorsTarget(now)

    val executorIdsToBeRemoved = ArrayBuffer[String]()
    // removeTimes是<executorId, expireTime>的映射。
    removeTimes.retain { case (executorId, expireTime) =>
      val expired = now >= expireTime
      if (expired) {
        initializing = false
        executorIdsToBeRemoved  = executorId
      }
      !expired
    }
    // 移除所有超时的Executor
    if (executorIdsToBeRemoved.nonEmpty) {
      removeExecutors(executorIdsToBeRemoved)
    }
  }
}

以上就是对于Spark的动态资源分配的原理分析,相关源码可以参考Apache Spark:ExecutorAllocationManager。完整的配置参数见:Spark Configuration: Dynamic Allocation。

3.总结与反思

  1. Pascal之父Nicklaus Wirth曾经说过一句名言:程序=算法 数据结构。对于Spark动态资源分配来说,我们应更加关注算法方面,即其动态行为。如何分配?如何伸缩?上下游关系如何?等等。
  2. 回馈社区:回馈是一种输出,就迫使我们输入的质量要足够高。这是一种很有效的技能提升方式。万事开头难,从最简单的typo fix/docs improvement起步。

4. Community Feedback

  1. 完善Executor相关参数的文档说明。SPARK-26446: Add cachedExecutorIdleTimeout docs at ExecutorAllocationManager
  2. fix bug:SPARK-26588:Idle executor should properly be killed when no job is submitted

参考

  • Dynamic Resource Allocation
  • Spark Configuration: Dynamic Allocation
  • Apache Spark

0 人点赞