YARN Fair Scheduler批量分配功能调优总结

2020-02-18 13:31:36 浏览数 (1)

背景

YARN调度效率不高,队列资源充足,集群中正在运行的任务数量少,资源利用率低的情况下有一部分任务排队,等待分配资源时间长。

注:我们的集群使用的是Apache Hadoop 2.6.3,以下内容以该版本为准。

解决方案

YARN调度时机和批量分配

出现这种情况,我们可以启用YARN批量分配功能提高单个节点资源的利用率,提升YARN调度效率。在了解批量分配之前,我们先简单了解一下调度被触发的时机。

一次调度是如何被触发的呢?这就涉及到两种调度时机,即心跳调度和持续调度。两种触发机制不同的地方有两个:

  • 调度时机:心跳调度仅仅发生在收到了某个NodeManager的心跳信息的情况下,持续调度则不依赖与NodeManager的心跳通信,是连续发生的,当心跳到来,会将调度结果直接返回给NodeManager。我们通过yarn.scheduler.fair.continuous-scheduling-enabled来配置是否打开连续调度功能,默认情况下该功能关闭
  • 调度范围:心跳调度机制下,当收到某个节点的心跳,就对这个节点且仅仅对这个节点进行一次调度,即谁的心跳到来就触发对谁的调度,而持续调度的每一轮,是会遍历当前集群的所有节点,每个节点依次进行一次调度,保证一轮下来每一个节点都被公平地调度一次

我们的集群没有启用连续调度,所以这里我们只关注心跳调度。YARNNodeManager会通过心跳的方式定期向ResourceManager汇报自身状态,心跳时间间隔通过yarn.nodemanager.heartbeat.interval-ms配置,默认值 1000ms。当NodeManagerResourceManager汇报了自身资源情况(比如,当前可用资源,正在使用的资源,已经释放的资源),这个RPC会触发ResourceManager调用nodeUpdate()方法,这个方法为这个节点进行一次资源调度。当节点资源没有被任务所预留且不开启批量分配的情况下,节点经过一次资源调度过程就结束资源分配了,这在节点资源充足,任务量多的情况下,会使集群产生资源碎片增多、节点资源利用不充分、调度效率不高等问题。因此,我们需要启用批量分配功能。

YARN为我们提供了以下几个批量分配参数:

  • yarn.scheduler.fair.assignmultiple:是否启用批量分配功能。当一个节点出现大量资源时,可以一次分配完成,也可以多次分配完成。默认情况下,该参数值为false
  • yarn.scheduler.fair.max.assign:如果开启批量分配功能,可指定一次分配的container数目。默认情况下,该参数值为-1,表示不限制
  • yarn.scheduler.fair.dynamic.max.assign:开启批量分配资源后,一次可以分配未分配资源的一半给container。默认情况下,该参数值为true。该参数Hadoop 2.8.0版本及以上版本才有

调优过程

yarn.scheduler.fair.assignmultiple参数设置为true,开启批量分配以后,在监控系统上发现,调度效率提升明显,集群资源利用率明显提升,但是经常会看到以下情况:

  • 少部分节点上运行的container数量比较多,资源被耗尽,大部分节点资源利用率为0或不高
  • 集群不繁忙的情况下,经常有预留资源的情况出现,尽管预留时间很短,预留资源的任务一直等待其他任务释放被预留资源节点上的资源,直接影响了集群的调度效率和任务的执行效率

于是,我们根据生产环境的NodeManager节点配置(170G内存、32个虚拟CPU)和任务资源配置,指定每次分配的container数量,调整yarn.scheduler.fair.max.assign的值为5。调整以后,解决了上述只开启批量分配不指定批量分配数量产生的问题。

Hadoop 2.8.0版本之后又提供了yarn.scheduler.fair.dynamic.max.assign参数,弥补了开启批量分配参数和指定批量分配数量后,部分节点资源被耗尽,导致节点负载太高的问题。无论是节点资源利用率不高还是节点太高,都会影响调度效率,所以,取一个中间值达到调度平衡就可以,这需要我们在不断的调优过程中做一个权衡。

源码解析

无论是心跳调度还是持续调度,他们对某个节点进行一次调度的算法和原理是公用的,都是通过synchronized void attemptScheduling(FSSchedulerNode node)来在某个节点上进行一次调度,方法的参数代表了准备进行资源分配的节点。批量分配参数就是在该方法中起的作用,以下是org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler类中批量分配的相关逻辑:

代码语言:javascript复制
private synchronized void attemptScheduling(FSSchedulerNode node) {
  if (rmContext.isWorkPreservingRecoveryEnabled()
      && !rmContext.isSchedulerReadyForAllocatingContainers()) {
    return;
  }

  // Assign new containers...
  // 1. Check for reserved applications
  // 2. Schedule if there are no reservations

  // 获取这个节点上预留的application
  FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
  if (reservedAppSchedulable != null) { //如果这个节点上已经有reservation
    Priority reservedPriority = node.getReservedContainer().getReservedPriority();
    
    /**
     * 之前有预留,不满足条件则释放
     * 结合本地松弛特性,判断这个预留资源的应用是否有任何一个请求在这个节点上、在节点所在的机架上、在任意机器上运行:
     * 1、如果该application在任意机器上或者该节点所在机架上的container需求已经为0,则释放预留的资源
     * 2、如果该application在该节点没有container的需求,则释放预留的资源
     * 3、如果该application在该节点有container的需求,但是该节点总资源量小于container需要的资源量,则释放预留的资源
     */
    if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
      // Don't hold the reservation if app can no longer use it
      LOG.info("Releasing reservation that cannot be satisfied for application "
            reservedAppSchedulable.getApplicationAttemptId()
            " on node "   node);
      //如果这个被预留的container已经不符合运行条件,就没有必要保持预留了,直接取消预留,让出资源
      reservedAppSchedulable.unreserve(reservedPriority, node);
      reservedAppSchedulable = null;
    } else {
      // Reservation exists; try to fulfill the reservation
      if (LOG.isDebugEnabled()) {
        LOG.debug("Trying to fulfill reservation for application "
              reservedAppSchedulable.getApplicationAttemptId()
              " on node: "   node);
      }
      //对这个已经进行了reservation对节点进行节点分配,当然,有可能资源还是不足,因此还将处于预定状态
      node.getReservedAppSchedulable().assignReservedContainer(node);
    }
  }
   /**
    * 这个节点还没有进行reservation,则尝试进行container分配 
    */
  if (reservedAppSchedulable == null) { 
    // No reservation, schedule at queue which is farthest below fair share
    int assignedContainers = 0;
    while (node.getReservedContainer() == null) { //如果这个节点没有预留的container信息,那么,就尝试给这个节点分配container
      boolean assignedContainer = false;
      //尝试进行container的分配,并判断是否完全没有分配到并且也没有reserve成功
      //如果分配到了资源,或者预留到了资源,总之不是none
      if (!queueMgr.getRootQueue().assignContainer(node).equals(
          Resources.none())) {
        assignedContainers  ;
        assignedContainer = true;
      }
      // 如果分配container失败,则结束本次分配
      if (!assignedContainer) { break; }
      // 如果没有开启批量分配特性,则结束本次分配
      if (!assignMultiple) { break; }
      // 如果开启了批量分配特性,且已分配的container数量大于每次最多分配的container数量,则结束本次分配,否则一直尝试分配,一直分配可能会出现在该节点上预留资源
      if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; }
    }
  }
  // 更新队列的metric信息
  updateRootQueueMetrics();
}

参考资料

Yarn资源请求处理和资源分配原理解析

0 人点赞