ZooKeeper节点数据量限制引起的Hadoop YARN ResourceManager崩溃原因分析(三)

2020-05-18 16:35:21 浏览数 (1)

这个问题又让我们碰到了,发生次数不频繁但是一旦发生就会造成ResourceManager服务崩溃、ZK注册watch过多等问题。不彻底解决这个问题心中一直是个梗,所以基于前两次的分析和阅读社区最新版Hadoop 3.2.1代码之后,给生产环境YARNpatch最终解决这个问题。对于疑难问题,每遇到一次就有一次不同的感悟,接下来是我本次分析和解决该问题的过程记录。前两次解决和分析该问题的记录如下:

  • ZooKeeper节点数据量限制引起的Hadoop YARN ResourceManager崩溃原因分析
  • ZooKeeper节点数据量限制引起的Hadoop YARN ResourceManager崩溃原因分析(二)

环境

  • Hadoop版本:Apache Hadoop 2.6.3
  • ZooKeeper版本:ZooKeeper 3.4.10
  • 两个ResourceManager节点:主节点RM01,从节点RM02

问题原因

这个问题很难复现,前两次一直没找到产生该问题的原因,打了patch之后,我们在日志中发现,产生该问题主要是由于部分异常任务导致的,日志如下:

代码语言:javascript复制
2020-04-28 10:05:54 INFO  org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore:768 - Application update attemptState data size for /rmstore/ZKRMStateRoot/RMAppRoot/application_1587969707206_16259/appattempt_1587969707206_16259_000001 is 20266528. Exceed the maximum allowed 3145728 size. ApplicationAttemptState info: ApplicationAttemptState{attemptId=appattempt_1587969707206_16259_000001, diagnostics='User class threw exception: java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 2.0 failed 4 times, most recent failure: Lost task 15.3 in stage 2.0 (TID 4224, datanode44.bi): java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificUnsafeProjection;Lorg/apache/spark/sql/catalyst/InternalRow;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificUnsafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.immutable.Set hset;
/* 009 */   private boolean hasNull;
/* 010 */   private UnsafeRow result;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
/* 012 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;

当任务出现异常时,YARN会保存任务的异常信息,当异常信息很多时,YARNZK保存任务状态的数据量就会超过ZK的限制。从日志中可以看出,出现异常的Spark任务状态数据是20266528字节,也就是19MB,远远超过了我们所设置的3MB。在YARN监控界面上可以看到该任务的异常信息有20万行:

解决方案

由于有了前两次发现和解决问题以及源码理解的经验,所以这次解决问题就顺手的多,去年八月份解决该问题的最终方案是调整ZK服务端和YARN客户端的jute.maxbuffer参数值为3MB,也就是调整ZK中每个ZNode能保存的最大数据量为3MB。但是这样的方案有以下明显的缺点:

  • 使ZK中保存的数据量比较大,导致ZK JVM内存紧张,极端情况下会使ZK OOM,同时也会影响ZK数据读写、数据同步以及持久化效率
  • jute.maxbuffer属于硬配置的方式,为了使配置生效,还需要重启ZK服务和客户端YARN RM服务,对ZK服务以及依赖ZK的服务运维成本比较大。由于当前我们生产环境YARN使用的这套ZK集群还管理HBase、流式计算任务的元数据,所以重启影响还是比较大的

可以看出,通过修改jute.maxbuffer方式虽然也解决了问题,但是会对ZK服务和依赖ZK的服务有影响,运维成本也比较高。于是通过追踪社区issue和阅读Hadoop 3.2.1源码,我们采取通过在yarn-site.xml增加yarn.resourcemanager.zk-max-znode-size.bytes配置的方式来解决YARNZK写数据量超过ZK限制的问题,该配置是在Hadoop 2.9.0版本加入的。使用这种方式,我们不需要修改ZK服务端的配置,而只需要修改YARN服务端的配置并重启YARN就能限制YARNZK写入的数据量,而且也提高了ZK服务的可用性。打了patch后的代码逻辑超过数据量限制的任务状态数据直接被丢弃,并打印log日志,方便日后问题追溯。打了patch后的ZKRMStateStore主要代码如下(由于篇幅原因,其余代码省略):

代码语言:javascript复制
public class ZKRMStateStore extends RMStateStore{
    
    private int zknodeLimit; // 保存ZNode节点数据量限制
    
     @Override
    public synchronized void initInternal(Configuration conf) throws Exception {
        // 其余部分省略
        // 获取yarn-site.xml中yarn.resourcemanager.zk-max-znode-size.bytes的值
        zknodeLimit = conf.getInt(YarnConfiguration.RM_ZK_ZNODE_SIZE_LIMIT_BYTES,
                YarnConfiguration.DEFAULT_RM_ZK_ZNODE_SIZE_LIMIT_BYTES);
    }
    
    @Override
    public synchronized void updateApplicationAttemptStateInternal(
            ApplicationAttemptId appAttemptId,
            ApplicationAttemptStateData attemptStateDataPB)
            throws Exception {
        String appIdStr = appAttemptId.getApplicationId().toString();
        String appAttemptIdStr = appAttemptId.toString();
        String appDirPath = getNodePath(rmAppRoot, appIdStr);
        String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Storing final state info for attempt: "   appAttemptIdStr
                      " at: "   nodeUpdatePath);
        }
        byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();

        ApplicationAttemptState attemptState = getApplicationAttemptState(appAttemptId, attemptStateDataPB);
        // 判断要写入的任务尝试数据信息是否超过zknodeLimit变量的值,如果没有,就执行任务尝试数据更新操作。否则,只打印info信息,不执行任务尝试数据更新操作
        if (attemptStateData.length <= zknodeLimit) {
            if (existsWithRetries(nodeUpdatePath, true) != null) {
                setDataWithRetries(nodeUpdatePath, attemptStateData, -1);
            } else {
                createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
                        CreateMode.PERSISTENT);
                LOG.debug(appAttemptId   " znode didn't exist. Created a new znode to"
                          " update the application attempt state.");
            }
            LOG.info("Application update attemptState data size for "   nodeUpdatePath   " is "
                      attemptStateData.length   ". The maximum allowed "   zknodeLimit   " size. ApplicationAttemptState info: "   attemptState.toString()   ". AppAttemptTokens length:"   attemptStateDataPB.getAppAttemptTokens().array().length
                      ". See yarn.resourcemanager.zk-max-znode-size.bytes.");
        } else {
            LOG.info("Application update attemptState data size for "   nodeUpdatePath   " is "
                      attemptStateData.length   ". Exceed the maximum allowed "   zknodeLimit   " size. ApplicationAttemptState info: "   attemptState.toString()   ". AppAttemptTokens length:"   attemptStateDataPB.getAppAttemptTokens().array().length
                      ". See yarn.resourcemanager.zk-max-znode-size.bytes.");
        }
    }
}

问题总结

1、YARN使用ZK来实现故障状态恢复,这里的修改会不会影响正常任务的执行和状态恢复?

不会。经过线上一段时间的运行和我们使用zkdoctor监控的数据发现,YARN存储在ZK中的正常任务的状态数据一般不会超过512K,只有部分异常任务的异常信息数据会特别大,这个异常信息数据是引起YARNZK写数据量超过限制的根本原因。

YARN将共享状态存储系统定义成一个RMStateStore抽象类,以保存ResourceManager故障恢复后所必需的状态信息,这些信息都是一些基本数据类型的信息,没有特别复杂的数据类型,比如字节数组。ResourceManager也不会保存已经分配给每个ApplicationMaster的资源信息和每个NodeManager的资源使用信息,这些均可通过相应的心跳汇报机制重构出来。因此,ResourceManagerHA实现是非常轻量级的。涉及到任务状态的主要类如下:

(1)Application状态信息ApplicationState

代码语言:javascript复制
 /**
   * State of an application application
   * 任务状态信息类
   */
  public static class ApplicationState {
    final ApplicationSubmissionContext context; // 任务描述信息content
    final long submitTime; // 任务提交时间
    final long startTime; // 任务开始时间
    final String user; // 任务提交人
    Map<ApplicationAttemptId, ApplicationAttemptState> attempts =
                  new HashMap<ApplicationAttemptId, ApplicationAttemptState>(); // 任务重试信息
    // fields set when application completes.
    RMAppState state; // 任务运行状态
    String diagnostics; // 任务异常诊断信息
    long finishTime; // 任务完成时间
    
    // 省略其他代码
  }

(2)Application对应的每个ApplicationAttempt信息ApplicationAttemptState

代码语言:javascript复制
 /**
   * State of an application attempt
   * 任务尝试状态信息类
   */
  public static class ApplicationAttemptState {
    final ApplicationAttemptId attemptId; // 任务尝试ID
    final Container masterContainer; // 所在container的信息
    final Credentials appAttemptCredentials; // 安全token
    long startTime = 0; // 开始时间
    long finishTime = 0; // 结束时间
    // fields set when attempt completes
    RMAppAttemptState state; // 运行状态
    String finalTrackingUrl = "N/A"; // 任务运行日志查看地址
    String diagnostics; // 任务异常诊断信息
    int exitStatus = ContainerExitStatus.INVALID; // 任务退出状态
    FinalApplicationStatus amUnregisteredFinalStatus; // 任务最终状态
    long memorySeconds; // 任务消耗的内存总资源
    long vcoreSeconds; // 任务消耗的CPU总资源
    
    // 省略其他代码
  }

(3)安全令牌相关信息RMDTSecretManagerState

代码语言:javascript复制
 /**
   * 安全令牌信息
   */
  public static class RMDTSecretManagerState {
    // DTIdentifier -> renewDate
    Map<RMDelegationTokenIdentifier, Long> delegationTokenState =
        new HashMap<RMDelegationTokenIdentifier, Long>(); // 授权令牌状态
    Set<DelegationKey> masterKeyState =
        new HashSet<DelegationKey>(); // master key状态
    int dtSequenceNumber = 0; // 序列号
    
    // 省略其他代码
 }

2、YARN出现异常时为什么会导致ZK中注册很多的watch?

YARN出现异常会进行故障转移,故障转移到standby节点,standby节点会调用RMStateloadState方法进行任务状态数据的恢复,loadState会调用ZKRMStateStoreloadRMAppState方法读取在ZK中保存的任务状态数据,在调用ZKgetData方法时会给任务状态节点和任务尝试状态节点注册watch,以监听任务状态的变化。由于任务状态节点和任务尝试状态节点是持久节点,不会因为ZK客户端连接失效而删除,且是一对多的关系,因此会导致watch数量很多。以下是加载任务状态的相关代码:

代码语言:javascript复制
 private synchronized void loadRMAppState(RMState rmState) throws Exception {
        // 当/rmstore/ZKRMStateRoot/RMAppRoot/节点及其子节点被删除或创建时,watch被触发
        List<String> childNodes = getChildrenWithRetries(rmAppRoot, true);
        for (String childNodeName : childNodes) {
            String childNodePath = getNodePath(rmAppRoot, childNodeName);
            // 获取任务节点数据并注册watch,该watch当任务节点被删除或数据被更新时触发
            byte[] childData = getDataWithRetries(childNodePath, true);
            if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
                // application
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Loading application from znode: "   childNodeName);
                }
                ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
                ApplicationStateDataPBImpl appStateData =
                        new ApplicationStateDataPBImpl(
                                ApplicationStateDataProto.parseFrom(childData));
                // 获取任务数据
                ApplicationState appState =
                        new ApplicationState(appStateData.getSubmitTime(),
                                appStateData.getStartTime(),
                                appStateData.getApplicationSubmissionContext(),
                                appStateData.getUser(),
                                appStateData.getState(),
                                appStateData.getDiagnostics(), appStateData.getFinishTime());
                if (!appId.equals(appState.context.getApplicationId())) {
                    throw new YarnRuntimeException("The child node name is different "  
                            "from the application id");
                }
                rmState.appState.put(appId, appState);
                // 获取任务重试数据
                loadApplicationAttemptState(appState, appId);
            } else {
                LOG.info("Unknown child node with name: "   childNodeName);
            }
        }
    }

我们生产环境设置在ZK中保存2万个任务状态信息,发生问题时监控发现YARNZK注册了10几万的watch。由于ZKwatch信息是用HashMapkeyZNode节点的pathvalue是注册在ZNode上的watch集合)保存的,因此大量的watch会使这个HashMap成为JVM中的一个大对象,这个大对象会一直保存在ZK的服务器端不会被回收,直到YARN被动删除或者更新任务状态数据时才会移除相应节点的watchZK服务端保存watch信息的HashMap的元素数量才会相应减少。这是一个比较缓慢的过程,在这个过程中,ZK很可能因为JVM GC问题响应缓慢甚至出现OOM。去年就由于YARN出现问题往ZK注册很多watch导致ZK OOM,继而影响到依赖ZKHBase服务出现异常。因此,我们在打patch的基础上,将YARN迁移到一套独立的ZK集群,这套ZK集群只为YARN服务,从而提高大数据基础服务的可用性。

我们监控和统计发现,正常情况下,YARNZK中注册的watch很少,基本上都是运行时的任务状态数据节点的watch,因此不会对ZK产生太大压力。

3、YARN向ZK写任务状态异常为什么会触发YARN故障转移?

ZKRMStateStoreZK交互的方法里,都会调用ZKRMStateStore.ZKAction类的runWithRetries方法进行重试,正常情况下不需要重试。如果发生异常才会触发重试逻辑,默认重试1000次,当重试1000次之后,会使用throw方式给上层调用者抛出异常,凡是以下方法都有可能抛出异常:

异常会被RMStateStorenotifyStoreOperationFailed方法捕捉到,该方法很简单,主要进行以下逻辑判断:

  • 如果YARN开启了HA,则触发故障转移操作
  • 如果没有开启HA,则判断YARN是否开启了快速失败特性,则触发RMFatalEventType.STATE_STORE_OP_FAILED事件,退出进程
  • 如果以上两个条件都不满足,则打印warn信息

该方法具体代码如下:

代码语言:javascript复制
 /**
   * 该方法通知RM存储操作失败,参数是引起操作失败的异常信息
   * This method is called to notify the ResourceManager that the store
   * operation has failed.
   * @param failureCause the exception due to which the operation failed
   */
  protected void notifyStoreOperationFailed(Exception failureCause) {
    LOG.error("State store operation failed ", failureCause);
    // 如果开启了HA,则执行故障转移操作
    if (HAUtil.isHAEnabled(getConfig())) {
      LOG.warn("State-store fenced ! Transitioning RM to standby");
      Thread standByTransitionThread =
          new Thread(new StandByTransitionThread());
      standByTransitionThread.setName("StandByTransitionThread Handler");
      standByTransitionThread.start();
    } else if (YarnConfiguration.shouldRMFailFast(getConfig())) { // 如果没有开启HA,则判断有没有开启快速失败
      LOG.fatal("Fail RM now due to state-store error!");
      rmDispatcher.getEventHandler().handle(
          new RMFatalEvent(RMFatalEventType.STATE_STORE_OP_FAILED,
              failureCause));
    } else { // 否则,打印跳过存储异常警告信息
      LOG.warn("Skip the state-store error.");
    }
  }

参考资料

  • 打了patch后的ZKRMStateStore代码GitHub地址
  • 基于ZKRMStateStore的Yarn的HA机制分析
  • Application运行失败导致RM主备切换:文中提到了后来的Hadoop版本对于过长的异常诊断信息进行了截断处理
  • Zookeeper在RM HA的应用
  • YARN源码分析(三)-----ResourceManager HA之应用状态存储与恢复

0 人点赞