Spark源码分析-Spark-on-K8S任务调度

2022-03-30 17:22:44 浏览数 (2)

toc

概述

从整个spark作业执行流程来看,作业调度分为:

  • stage划分和调度
  • stage内的task调度

由于stage的划分和调度是spark作业逻辑层面上的事,不涉及到物理集群资源,我们不需要关心。

无论是yarn还是k8s,作为计算资源提供方,我们关注的是对他们提供的底层计算资源的分配(Executor)和使用(Task)。spark的k8s模块的主要工作就是管理executor pod的数量和生命周期,并在活跃的pod上发起任务调度。

这里说的"发起",是因为所有的任务调度都由TaskSchedulerImpl来确定,spark-k8s调度模块(yarn同理)只是在适当的时机去发起调用,Taskscheduler最终决定将具体的任务调度到具体的Executor(Pod)。换句话说,TaskScheduler也是通用的。

总结下,spark k8s调度模块要做的事情:

  1. 根据作业配置维护一定数量的Executor(Pod)
  2. 在Executor资源足够的情况下,发起Task调度

任务调度模块设计

整个调度系统采用"发布-订阅"、生产者-消费者模式,可以类比kafka等消息系统。按模块交互流程,可以分为:

  1. 生产者:向存储模块上报ExecutorPod的状态
  2. 存储系统:存储ExecutorPod集合的snapshot系列,也就是所有executorPod的状态集合,每次局部或全局上报ExecutorPod都会形成新的Snapshot
  3. 消费者:接收ExecutorPod的状态,并根据状态控制业务逻辑,如task调度,删除driver端的executor数据结构,底层删除K8s的pod等等
  4. bootstrap引导:生成消息系统的各个模块类,并通过SchedulerBackEnd把各个模块串联,使消息系统运行起来。

下面是存储模块类ExecutorPodsSnapshotsStoreImpl说明。

代码语言:txt复制
Controls the propagation of the Spark application's executor pods state to subscribers that
  react to that state.
  
  Roughly follows a producer-consumer model. Producers report states of executor pods, and these
  states are then published to consumers that can perform any actions in response to these states.
  
  Producers push updates in one of two ways. An incremental update sent by updatePod() represents
  a known new state of a single executor pod. A full sync sent by replaceSnapshot() indicates that
  the passed pods are all of the most up to date states of all executor pods for the application.
  The combination of the states of all executor pods for the application is collectively known as
  a snapshot. The store keeps track of the most up to date snapshot, and applies updates to that
  most recent snapshot - either by incrementally updating the snapshot with a single new pod state,
  or by replacing the snapshot entirely on a full sync.
  
  Consumers, or subscribers, register that they want to be informed about all snapshots of the
  executor pods. Every time the store replaces its most up to date snapshot from either an
  incremental update or a full sync, the most recent snapshot after the update is posted to the
  subscriber's buffer. Subscribers receive blocks of snapshots produced by the producers in
  time-windowed chunks. Each subscriber can choose to receive their snapshot chunks at different
  time intervals.
  
  The subscriber notification callback is guaranteed to be called from a single thread at a time.

任务调度代码实现

按照模块顺序,说明每个模块的类及作用。

生产者

生产者就是从k8s中不断获取ExecutorPod状态,并上报给存储模块。熟悉k8s client的话,都知道client的list-watch机制,简单说采用定时全量同步 实时增量相结合的方式监听k8s资源状态。这里的生产者也类似,有增量同步和全量同步两种:

  1. ExecutorPodsWatchSnapshotSource:增量更新Pod状态,调用snapshotsStore.updatePod,代码片段
代码语言:txt复制
#注册监听pod事件
watchConnection = kubernetesClient.pods()
.withLabel(SPARK\\_APP\\_ID\\_LABEL, applicationId)
.withLabel(SPARK\\_ROLE\\_LABEL, SPARK\\_POD\\_EXECUTOR\\_ROLE)
.watch(new ExecutorPodsWatcher())

#事件处理:调用snapshotsStore更新
override def eventReceived(action: Action, pod: Pod): Unit = {
val podName = pod.getMetadata.getName
logDebug(s"Received executor pod update for pod named $podName, action $action")
snapshotsStore.updatePod(pod)
}
  1. ExecutorPodsPollingSnapshotSource:全量获取pod,调用snapshotsStore.replaceSnapshot #独立线程定时调度全量获取pod状态
代码语言:txt复制
  def start(applicationId: String): Unit = {
     pollingFuture = pollingExecutor.scheduleWithFixedDelay(
     new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS)
  }

 #runnable逻辑
 private class PollRunnable(applicationId: String) extends Runnable {

  override def run(): Unit = Utils.tryLogNonFatalError {

  logDebug(s"Resynchronizing full executor pod state from Kubernetes.")
  snapshotsStore.replaceSnapshot(kubernetesClient
    .pods()
    .withLabel(SPARK_APP_ID_LABEL, applicationId)
    .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
    .withoutLabel(SPARK_EXECUTOR_INACTIVE_LABEL, "true")
    .list()
    .getItems
    .asScala.toSeq)
 }
}

存储模块

存储模块(ExecutorPodsSnapshotsStoreImpl)主要功能:

  1. 接收producer更新ExecutorPod状态的调用,对应updatePod(增量)、replaceSnapshot(全量)方法。
  2. 注册并通知consumer,对应addSubscriber、notifySubscribers方法
  3. 存储最近一次pod Snapshot。
代码语言:txt复制
private[spark] trait ExecutorPodsSnapshotsStore {

  def addSubscriber
      (processBatchIntervalMillis: Long)
      (onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit

  def stop(): Unit

  def notifySubscribers(): Unit

  def updatePod(updatedPod: Pod): Unit

  def replaceSnapshot(newSnapshot: Seq[Pod]): Unit
}

消费者

从任务调度的功能角度来看,主要有:

  1. ExecutorPod的(动态)分配或创建

主要实现类:ExecutorPodsAllocator,实现方法是onNewSnapshots。当pod数量不够时会根据差额创建pod,相反则删除pod。代码片段如下:#删除多余Executor的逻辑

代码语言:txt复制
if (knownPodCount > targetNum) {
     val excess = knownPodCount - targetNum
     val knownPendingToDelete = currentPendingExecutors
       .filter(x => isExecutorIdleTimedOut(x._2, currentTime))
       .map { case (id, _) => id }
       .take(excess - newlyCreatedExecutorsForRpId.size)
     val toDelete = newlyCreatedExecutorsForRpId
       .filter { case (_, (_, createTime)) =>
         currentTime - createTime > executorIdleTimeout
       }.keys.take(excess).toList    knownPendingToDelete

    if (toDelete.nonEmpty) {
      logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).")
      _deletedExecutorIds = _deletedExecutorIds    toDelete

      Utils.tryLogNonFatalError {

        kubernetesClient
          .pods()
          .withField("status.phase", "Pending")
          .withLabel(SPARK_APP_ID_LABEL, applicationId)
          .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
          .withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*)
          .delete()
        newlyCreatedExecutors --= toDelete
        knownPendingCount -= knownPendingToDelete.size
      }
    }
  }

  #创建ExecutorPod的逻辑

  for ( _ <- 0 until numExecutorsToAllocate) {
  val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
  val executorConf = KubernetesConf.createExecutorConf(
    conf,
    newExecutorId.toString,
    applicationId,
    driverPod,
    resourceProfileId)

  val resolvedExecutorSpec = executorBuilder.buildFromFeatures(executorConf, secMgr,
    kubernetesClient, rpIdToResourceProfile(resourceProfileId))
  val executorPod = resolvedExecutorSpec.pod
  val podWithAttachedContainer = new PodBuilder(executorPod.pod)
    .editOrNewSpec()
    .addToContainers(executorPod.container)
    .endSpec()
    .build()
  val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer)
  1. 各种无效状态的ExecutorPod在spark层面和k8s层面的销毁

实现类:ExecutorPodsLifecycleManager。在onNewSnapshots方法中主要针对deleted、failed、succeeded以及inactive状态的pod进行处理:

i. removeExecutorFromSpark:从driver的数据结构中剔除pod

代码语言:txt复制
schedulerBackend.doRemoveExecutor(execId.toString, exitReason)

     driverEndpoint.send(RemoveExecutor(executorId, reason))

ii. removeExecutorFromK8s:从k8s中删除pod

代码语言:txt复制
kubernetesClient
.pods()
.withName(updatedPod.getMetadata.getName)
.delete()

引导类

实现类KubernetesClusterManager。

主要方法:

代码语言:txt复制
#创建SchedulerBackend
createSchedulerBackend(){
        new KubernetesClusterSchedulerBackend(
      scheduler.asInstanceOf[TaskSchedulerImpl],
      sc,
      kubernetesClient,
      schedulerExecutorService,
      snapshotsStore,
      executorPodsAllocator,
      executorPodsLifecycleEventHandler,
      podsWatchEventSource,
      podsPollingEventSource)
}

  #初始化TaskScheduler,当taskScheduler运行起来后,这套消息系统也就自动运转起来
  override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
    scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
  }

KubernetesClusterSchedulerBackend

KubernetesClusterManager new出了消息系统所有的对象,包括KubernetesClusterSchedulerBackend。KubernetesClusterSchedulerBackend则是启动了整个消息系统。如下:

代码语言:txt复制
override def start(): Unit = {
    super.start()
    val initExecs = Map(defaultProfile -> initialExecutors)
    podAllocator.setTotalExpectedExecutors(initExecs)
    lifecycleEventHandler.start(this)
    podAllocator.start(applicationId())
    watchEvents.start(applicationId())
    pollEvents.start(applicationId())
    setUpExecutorConfigMap()
  }

start方法的逻辑:首先生成ResourceProfile的默认实例defaultProfile,包括task的资源需求和executor的(cpu,memory,overheadMemory)等资源需求,并根据参数确定defaultProfile需要的executor数量。然后把defaultProfile -> initialExecutors 交给PodAllocator,这样podAllocator就可以根据消息系统当前的pod数量,来确定是否需要启动或删除一些ExecutorPod。

另外,start方法还会把SPARK_CONF_DIR目录下的配置文件以configmap的形式提供给ExecutorPods。

KubernetesClusterSchedulerBackend继承CoarseGrainedSchedulerBackend,CoarseGrainedSchedulerBackend实现了两个接口:

  1. ExecutorAllocationClient:负责Executor的增删改查,方法:
    1. isExecutorActive:查Executor状态
    2. requestTotalExecutors:申请创建Executor,对于K8s来说,最终调上述了ExecutorPodAllocator的方法
    3. killExecutors:删除Executor,通过k8s client删除对应的pod
  2. SchedulerBackend:spark作业层面进行Task的调度管理。主要方法:
    1. reviveOffers:提供资源给TaskScheduler,最终调用TaskScheduler.resourceOffers方法,并返回TaskDescription清单,然后调用CoarseGrainedSchedulerBackend.launchTasks启动任务。
    2. killTask:kill任务,调用CoarseGrainedSchedulerBackend.driverEndpoint kill task,driver会向对应的Executor发起killTask的Rpc请求。

总结来说,CoarseGrainedSchedulerBackend实现了spark作业层面的抽象概念Task的调度管理,以及计算资源层面的Executor的调度管理。而具体的ClusterManager如K8s,则在Executor的物化层面做了实现,如k8s的增加Executor实际是用ExecugtorPodAllocator去创建对应的pod,而删除Executor直接调用K8s客户端删除Pod。yarn也类似。

续:Task调度流程

任务在driver中从诞生到最终发送的过程,主要有一下几个步骤:

  • DAGScheduler对作业计算链按照shuffle依赖划分多个stage,提交一个stage根据个stage的一些信息创建多个Task,包括ShuffleMapTask和ResultTask, 并封装成一个任务集(TaskSet),把这个任务集交给TaskScheduler
  • TaskSchedulerImpl将接收到的任务集加入调度池中,然后通知调度后端SchedulerBackend
  • CoarseGrainedSchedulerBackend收到新任务提交的通知后,检查下现在可用 executor有哪些,并把这些可用的executor交给TaskSchedulerImpl
  • TaskSchedulerImpl根据获取到的计算资源,根据任务本地性级别的要求以及考虑到黑名单因素,按照round-robin的方式对可用的executor进行轮询分配任务,经过多个本地性级别分配,多轮分配后最终得出任务与executor之间的分配关系,并封装成TaskDescription形式返回给SchedulerBackend
  • SchedulerBackend拿到这些分配关系后,就知道哪些任务该发往哪个executor了,通过调用rpc接口将任务通过网络发送即可。

0 人点赞