背景
YARN
调度效率不高,队列资源充足,集群中正在运行的任务数量少,资源利用率低的情况下有一部分任务排队,等待分配资源时间长。
注:我们的集群使用的是Apache Hadoop 2.6.3
,以下内容以该版本为准。
解决方案
YARN调度时机和批量分配
出现这种情况,我们可以启用YARN
批量分配功能提高单个节点资源的利用率,提升YARN
调度效率。在了解批量分配之前,我们先简单了解一下调度被触发的时机。
一次调度是如何被触发的呢?这就涉及到两种调度时机,即心跳调度和持续调度。两种触发机制不同的地方有两个:
- 调度时机:心跳调度仅仅发生在收到了某个
NodeManager
的心跳信息的情况下,持续调度则不依赖与NodeManager
的心跳通信,是连续发生的,当心跳到来,会将调度结果直接返回给NodeManager
。我们通过yarn.scheduler.fair.continuous-scheduling-enabled
来配置是否打开连续调度功能,默认情况下该功能关闭 - 调度范围:心跳调度机制下,当收到某个节点的心跳,就对这个节点且仅仅对这个节点进行一次调度,即谁的心跳到来就触发对谁的调度,而持续调度的每一轮,是会遍历当前集群的所有节点,每个节点依次进行一次调度,保证一轮下来每一个节点都被公平地调度一次
我们的集群没有启用连续调度,所以这里我们只关注心跳调度。YARN
的NodeManager
会通过心跳的方式定期向ResourceManager
汇报自身状态,心跳时间间隔通过yarn.nodemanager.heartbeat.interval-ms
配置,默认值 1000ms
。当NodeManager
向ResourceManager
汇报了自身资源情况(比如,当前可用资源,正在使用的资源,已经释放的资源),这个RPC
会触发ResourceManager
调用nodeUpdate()
方法,这个方法为这个节点进行一次资源调度。当节点资源没有被任务所预留且不开启批量分配的情况下,节点经过一次资源调度过程就结束资源分配了,这在节点资源充足,任务量多的情况下,会使集群产生资源碎片增多、节点资源利用不充分、调度效率不高等问题。因此,我们需要启用批量分配功能。
YARN
为我们提供了以下几个批量分配参数:
yarn.scheduler.fair.assignmultiple
:是否启用批量分配功能。当一个节点出现大量资源时,可以一次分配完成,也可以多次分配完成。默认情况下,该参数值为false
yarn.scheduler.fair.max.assign
:如果开启批量分配功能,可指定一次分配的container
数目。默认情况下,该参数值为-1,表示不限制yarn.scheduler.fair.dynamic.max.assign
:开启批量分配资源后,一次可以分配未分配资源的一半给container
。默认情况下,该参数值为true
。该参数Hadoop 2.8.0
版本及以上版本才有
调优过程
将yarn.scheduler.fair.assignmultiple
参数设置为true
,开启批量分配以后,在监控系统上发现,调度效率提升明显,集群资源利用率明显提升,但是经常会看到以下情况:
- 少部分节点上运行的
container
数量比较多,资源被耗尽,大部分节点资源利用率为0
或不高 - 集群不繁忙的情况下,经常有预留资源的情况出现,尽管预留时间很短,预留资源的任务一直等待其他任务释放被预留资源节点上的资源,直接影响了集群的调度效率和任务的执行效率
于是,我们根据生产环境的NodeManager
节点配置(170G
内存、32
个虚拟CPU
)和任务资源配置,指定每次分配的container
数量,调整yarn.scheduler.fair.max.assign
的值为5
。调整以后,解决了上述只开启批量分配不指定批量分配数量产生的问题。
Hadoop 2.8.0
版本之后又提供了yarn.scheduler.fair.dynamic.max.assign
参数,弥补了开启批量分配参数和指定批量分配数量后,部分节点资源被耗尽,导致节点负载太高的问题。无论是节点资源利用率不高还是节点太高,都会影响调度效率,所以,取一个中间值达到调度平衡就可以,这需要我们在不断的调优过程中做一个权衡。
源码解析
无论是心跳调度还是持续调度,他们对某个节点进行一次调度的算法和原理是公用的,都是通过synchronized void attemptScheduling(FSSchedulerNode node)
来在某个节点上进行一次调度,方法的参数代表了准备进行资源分配的节点。批量分配参数就是在该方法中起的作用,以下是org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler
类中批量分配的相关逻辑:
private synchronized void attemptScheduling(FSSchedulerNode node) {
if (rmContext.isWorkPreservingRecoveryEnabled()
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
return;
}
// Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
// 获取这个节点上预留的application
FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
if (reservedAppSchedulable != null) { //如果这个节点上已经有reservation
Priority reservedPriority = node.getReservedContainer().getReservedPriority();
/**
* 之前有预留,不满足条件则释放
* 结合本地松弛特性,判断这个预留资源的应用是否有任何一个请求在这个节点上、在节点所在的机架上、在任意机器上运行:
* 1、如果该application在任意机器上或者该节点所在机架上的container需求已经为0,则释放预留的资源
* 2、如果该application在该节点没有container的需求,则释放预留的资源
* 3、如果该application在该节点有container的需求,但是该节点总资源量小于container需要的资源量,则释放预留的资源
*/
if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
// Don't hold the reservation if app can no longer use it
LOG.info("Releasing reservation that cannot be satisfied for application "
reservedAppSchedulable.getApplicationAttemptId()
" on node " node);
//如果这个被预留的container已经不符合运行条件,就没有必要保持预留了,直接取消预留,让出资源
reservedAppSchedulable.unreserve(reservedPriority, node);
reservedAppSchedulable = null;
} else {
// Reservation exists; try to fulfill the reservation
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to fulfill reservation for application "
reservedAppSchedulable.getApplicationAttemptId()
" on node: " node);
}
//对这个已经进行了reservation对节点进行节点分配,当然,有可能资源还是不足,因此还将处于预定状态
node.getReservedAppSchedulable().assignReservedContainer(node);
}
}
/**
* 这个节点还没有进行reservation,则尝试进行container分配
*/
if (reservedAppSchedulable == null) {
// No reservation, schedule at queue which is farthest below fair share
int assignedContainers = 0;
while (node.getReservedContainer() == null) { //如果这个节点没有预留的container信息,那么,就尝试给这个节点分配container
boolean assignedContainer = false;
//尝试进行container的分配,并判断是否完全没有分配到并且也没有reserve成功
//如果分配到了资源,或者预留到了资源,总之不是none
if (!queueMgr.getRootQueue().assignContainer(node).equals(
Resources.none())) {
assignedContainers ;
assignedContainer = true;
}
// 如果分配container失败,则结束本次分配
if (!assignedContainer) { break; }
// 如果没有开启批量分配特性,则结束本次分配
if (!assignMultiple) { break; }
// 如果开启了批量分配特性,且已分配的container数量大于每次最多分配的container数量,则结束本次分配,否则一直尝试分配,一直分配可能会出现在该节点上预留资源
if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; }
}
}
// 更新队列的metric信息
updateRootQueueMetrics();
}
参考资料
Yarn资源请求处理和资源分配原理解析