编写一个直接在Yarn上运行的程序

2020-08-04 21:54:50 浏览数 (1)

我们知道基于mapReduce框架的分布式程序的编写,在这种框架下我们不需要考虑申请资源,只需要安照mapreduce框架的要求,直接编写Map函数和reduce函数即可。如何在Yarn上直接编写应用程序呢?

要想在Yarn上编写应用程序,需要编写两个组件,Client和ApplicationMaster. 例如,JobClient和MRAppMaster是Yarn专门为Mapreduce设计实现的两个Client和ApplicationMaster组件。

客户端负责向ResourceManager提交ApplicationMaster,并查询应用程序的状态。
ApplicationManager负责向ResourceManager申请资源(返回以Container形式),并与NodeManager通信以启动各个Container,同时负责监控运行的状态,并在失败时候重新申请资源。

client设计

客户端负责向ResourceManager提交ApplicationMaster,并查询应用程序的状态。
  1. 客户端通过RPC与ResourceManager通信获取appId和可分配的资源等信息
  2. 客户端通过RPC将ApplicationMaster提交到ResourceManager

客户端将启动ApplicationMaster所需要的信息打包到ApplicationSubmisionContext中 主要包括:application_id,name,priority,queue,user,unmanagered_am等

也需要提提供ApplicationClient接口实现,以供返回信息,包括集群信息,节点信息,kill信息,运行状态

当然这些程序可以使用java的RPC进行编程,Yarn提供了YarnClient类的封装编程库,使用maven载入。

代码语言:javascript复制
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-client -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-yarn-client</artifactId>
    <version>3.2.0</version>
</dependency>
代码语言:javascript复制
public class YarnClientDemo {

    //创建一个yarnclient客户断
    private YarnClient  client;
    //这里配置Yarn集群信息或者采用加在yarn-site.xml文件形式
    private Configuration conf;

    public void initClient() throws IOException, YarnException {

        client = YarnClient.createYarnClient();
        client.init(conf);

        //启动YarnClient
        client.start();
        //获取一个applicationID
        YarnClientApplication app = client.createApplication();

        //构造applicationsubmit用于提交
        ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();

        //获取appid
        ApplicationId appId = appContext.getApplicationId();
        //获取资源信息
        Resource resource = appContext.getResource();

        //设置app信息
        appContext.setApplicationName("statistic app");
        appContext.setQueue("background");
        appContext.setUnmanagedAM(false);

        client.submitApplication(appContext);
    }

}

ApplicationMaster设计

ApplicationManager负责向ResourceManager申请资源(返回以Container形式),并与NodeManager通信以启动各个Container,同时负责监控运行的状态,并在失败时候重新申请资源。

所以我们分为AM-RM交互和AM-NM交互

AM-RM
  1. ApplicationMaster通过RPC向ResourceManager注册

ApplicationMaster启动时向ResourceManager注册,注册信息封装RegisterApplicationMasterRequest中,主要信息有host, rp_port,tracking_url追踪URL.

注册成功后收到一个RegisterApplicationMasterResponse返回值,主要信息有,maximumCapability最大可用资源,client_to_am_token_master_key等信息

  1. ApplicationMaster通过RPC向ResourceManager申请资源

ApplicationMaster将要请求的资源封装为AllocateRequest参数,主要包括,priority,resource_name,capability,num_containers资源数目,进度,请求加入黑名单的机器。用户可以将一个机器加入黑名单使,RM不把该机器资源分配给本程序。

ApplicationMaster调用后会收到AllocationResponse类型返回信息。主要包括,reponse_id,allocated_containers,limit,状态等信息。

  1. ApplicationMaster通过RPC告诉ResourceManager程序运行完毕,退出

ApplicationMaster与ResourceManager交互由AMRMClientImpl和AMRMClientAsync实现,但是AMRMClientImpl是阻塞的,AMRMClientAsync是非阻塞的

代码语言:javascript复制
public class MyCallByHandler implements AMRMClientAsync.CallbackHandler
{
    //配置yarnconf信息
    private YarnConfiguration conf;
    public void onContainersCompleted(List<ContainerStatus> list) {

    }

    public void onContainersAllocated(List<Container> list) {
        //构建句柄
        AMRMClientAsync.CallbackHandler callbackHandler = new MyCallByHandler();

        AMRMClientAsync<ContainerRequest> asyncClient = AMRMClientAsync.createAMRMClientAsync(1000, callbackHandler);

        asyncClient.init(conf);
        asyncClient.start();

        try {
            //三个需要填的参数是name,port,url
            RegisterApplicationMasterResponse response = asyncClient.registerApplicationMaster("statistic app",5200,"url");

            asyncClient.addContainerRequest(ContainerRequest.newBuilder().build());
            
        } catch (YarnException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public void onShutdownRequest() {

    }

    public void onNodesUpdated(List<NodeReport> list) {

    }

    public float getProgress() {
        return 0;
    }

    public void onError(Throwable throwable) {

    }
}

上简单实现构建的代表

AM-NM
  1. ApplicationMaster将申请到的资源二次分配给内部的任务,并通过RPC与NodeManager通信,启动Container。

该过程传参为StartContainerRequest,主要包括localResources,environment,container_token等信息,返回一个StartContainerResponse,主要包括services_meta_data,成功或失败请求值

  1. ApplicationMaster向NodeManager询问container的运行状态,失败会重新申请资源
  2. Container运行完成,ApplicationMaster通过RPC释放Container

ApplicationMaster与ResourceManager交互由NMClientImpl和NMClientAsync实现,但是NMClientImpl是阻塞的,NMClientAsync是非阻塞的

实现方法差不多,实现NMClientAsync接口。

Yarn实现了DistributionShell的实例

DistributionShell 是Yarn自带的Application实现的例子,可以运行shell命令,代码也不多

1)构造RPC句柄。

利用Hadoop RPC接口创建一个可以直接与ResourceManager交互的RPC client句柄applicationsManager:

代码语言:javascript复制
    private void connectToASM() throws IOException {

    YarnConfiguration yarnConf = new YarnConfiguration(conf);

    InetSocketAddress rmAddress = yarnConf.getSocketAddr(

    YarnConfiguration.RM_ADDRESS,

    YarnConfiguration.DEFAULT_RM_ADDRESS,

    YarnConfiguration.DEFAULT_RM_PORT);

    LOG.info(“Connecting to ResourceManager at ”   rmAddress);

    applicationsManager = ((ClientRMProtocol) rpc.getProxy(

    ClientRMProtocol.class, rmAddress, conf));

    }

(2)获取application id。

与ResourceManager通信,请求application id:

代码语言:javascript复制
GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);

GetNewApplicationResponse response = applicationsManager.getNewApplication(request);

(3)构造ContainerLaunchContext。

构造一个用于运行ApplicationMaster的container,container相关信息被封装到ContainerLaunchContext对象中:

代码语言:javascript复制
    ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);

    //添加本地资源

    //填充localResources

    amContainer.setLocalResources(localResources);

    //添加运行ApplicationMaster所需的环境变量

    Map<String, String> env = new HashMap<String, String>();

    //填充env

    amContainer.setEnvironment(env);

    //添加启动ApplicationMaster的命令

    //填充commands;

    amContainer.setCommands(commands);

    //设置ApplicationMaster所需的资源

    amContainer.setResource(capability);

(4)构造ApplicationSubmissionContext。

构造一个用于提交ApplicationMaster的ApplicationSubmissionContext:

代码语言:javascript复制
    ApplicationSubmissionContext appContext =

    Records.newRecord(ApplicationSubmissionContext.class);

    //设置application id,调用GetNewApplicationResponse#getApplicationId()

    appContext.setApplicationId(appId);

    //设置Application名称:“DistributedShell”

    appContext.setApplicationName(appName);

    //设置前面创建的container

    appContext.setAMContainerSpec(amContainer);

    //设置application的优先级,默认是0

    pri.setPriority(amPriority);

    //设置application的所在队列,默认是”"

    appContext.setQueue(amQueue);

    //设置application的所属用户,默认是”"

    appContext.setUser(amUser);

(5)提交ApplicationMaster。

将ApplicationMaster提交到ResourceManager上,从而完成作业提交功能:

代码语言:javascript复制
applicationsManager.submitApplication(appRequest);

(6) 显示应用程序运行状态。

为了让用户知道应用程序进度,Client会每隔几秒在shell终端上打印一次应用程序运行状态:

代码语言:javascript复制
    while (true) {

    Thread.sleep(1000);

    GetApplicationReportRequest reportRequest =

    Records.newRecord(GetApplicationReportRequest.class);

    reportRequest.setApplicationId(appId);

    GetApplicationReportResponse reportResponse =

    applicationsManager.getApplicationReport(reportRequest);

    ApplicationReport report = reportResponse.getApplicationReport();

    //打印report内容

    …

    YarnApplicationState state = report.getYarnApplicationState();

    FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();

    if (YarnApplicationState.FINISHED == state) {

    if (FinalApplicationStatus.SUCCEEDED == dsStatus) {

    return true;

    } else {

    return false;

    }

    } else if (YarnApplicationState.KILLED == state

    || YarnApplicationState.FAILED == state) {

    return false;

    }

    }

0 人点赞