DistributtedShell的container在所有节点上仅执行一次

2022-09-29 15:40:16 浏览数 (1)

问题

在上Hadoop2培训课的时候,老师出了这么一道题

修改Distributedshell的源代码,使得用户提供的命令(由“–shell_command”参数指定)可以在所有节点上仅执行一次。(目前的实现是,如果该命令由N个task同时执行,则这N个task可能位于任意节点上,比如都在node1上。)

修改代码

该问题需要在两个地方对源码进行修改:

  1. 修改参数,指定实现的feature是否生效
  2. 让每一个container运行在不同的节点上

博客将主要介绍过程2的实现过程,主要思路是首先获取节点列表,再在申请container时,指定节点。具体过程如下:

  • 打开源码。编译好Hadoop-2.3.0之后,用Eclipse打开工程,DistributedShell的源码的位置在/hadoop-2.3.0-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java
  • 获取计算节点列表。定义nodeList用于保存计算节点列表,在ApplicationMaster的init()函数中添加初始化nodeList的代码。初始化完成后,nodeList中保存有计算节点的列表(不包括RM 节点)。
代码语言:javascript复制
public class ApplicationMaster {
  // 所有计算节点
  private static List nodeList = new ArrayList();

  public boolean init(String[] args) throws ParseException, IOException {
    //该函数的末尾添加如下代码,用于获取计算节点列表
    try {
      YarnClient yarnClient = YarnClient.createYarnClient();
      yarnClient.init(conf);
      yarnClient.start();
      List<NodeReport> clusterNodeReports;
      clusterNodeReports = yarnClient.getNodeReports(
          NodeState.RUNNING);
      for (NodeReport node : clusterNodeReports) {
        this.nodeList.add(node.getNodeId().getHost());
      }
    } catch (YarnException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    return true;
  }
}
  • 让container运行在不同的节点上。申请资源的时候,会调用函数setupContainerAskForRM,修改该函数即可,函数如下:
代码语言:javascript复制
  private ContainerRequest setupContainerAskForRM() {
    // setup requirements for hosts
    // using * as any host will do for the distributed shell app
    // set the priority for the request
    Priority pri = Records.newRecord(Priority.class);
    // TODO - what is the range for priority? how to decide?
    pri.setPriority(requestPriority);
    // Set up resource type requirements
    // For now, memory and CPU are supported so we set memory and cpu
    // requirements
    Resource capability = Records.newRecord(Resource.class);
    capability.setMemory(containerMemory);
    capability.setVirtualCores(containerVirtualCores);
    String[] nodes = null;
    if (!nodeList.isEmpty()) {
      nodes = new String[1];
      nodes[0] = (String) nodeList.get(0);
      nodeList.remove(0);
    }
    ContainerRequest request = new ContainerRequest(capability, nodes, null,
        pri);//默认的nodes为null
    LOG.info("Requested container ask: "   request.toString());
    return request;
  }
  • 改好之后,打成jar包,覆盖${HADOOP_HOME}/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.3.0.jar即可生效
  • 验证,书写如下脚本并运行。发现3个container运行在不同的节点上,表示改写成功
代码语言:javascript复制
bin/hadoop jar 
share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.3.0.jar 
org.apache.hadoop.yarn.applications.distributedshell.Client 
--jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.3.0.jar 
--shell_command "ls" 
--num_containers 3 
--container_memory 512 
--container_vcores 1 
--master_memory 350 
--priority 10

问题与解决

在获取计算节点列表时,被卡住了,最后在和别人交流的时候,知道ApplicationMaster通过yarnClient可以从RM中获取计算节点列表。最后将问题解决了。感谢所有提供帮助的人

0 人点赞