Hadoop-Yarn源码-服务库与事件库

2023-06-30 14:05:30 浏览数 (2)

一.服务库

本文分析Hadoop2.6源码,生命周期长的对象,Yarn采用基于服务的对象管理模型对其进行管理。

特点

  • 将每个被服务化的对象分为4个状态:NOTINITED(被创建)、INITED(已初始化)、STARTED(已启动)、STOPPED(停止)。
  • 任何服务状态变化都可以出发另外一些动作。
  • 可通过组合方式对任意服务进行组合,以便管理。

Yarn的服务模型的类图(org.apache.hadoop.service),定义基本服务初始化,启动,停止等操作,AbstractService提供了一个基本Service实现。Yarn中所有对象,如果非组合服务,直接继承AbstractService即可,否则需继承CompositeService。

在Yarn中,ResourceManager和NodeManager属于组合服务,它们内部包含多个单一服务和组合服务,以实现对内部多种服务统一管理。

二.事件库

Yarn采用基于事件驱动的并发模型,该模型能够增强并发性,从而提高系统性能。为了构建该模型,YARN将各种处理逻辑抽象成事件和对应事件调度器,并将每类事件的处理过程分割成多个步骤,用有限状态机表示。

整个处理过程大致为:处理请求会作为事件进入系统,由中央异步调度器(Async-Dispatcher)负责传递给相应事件调度器(Event Handler)。该事件调度器可能将该事件转发给另外一个事件调度器,也可能交给一个带有有限状态机的事件处理器,其处理结果也以事件的形式输出给中央异步调度器。而新的事件会再次被中央异步调度器转发给下一个事件调度器,直至处理完成(达到终止条件)。

在YARN中,所有核心服务实际上都是一个中央异步调度器,包括ResourceManager、NodeManager、MRAppMaster(MapReduce应用程序的ApplicationMaster)等,它们维护了事先注册的事件与事件处理器,并根据接收的事件类型驱动服务的运行。

YARN中事件与事件处理器类的关系(位于包org.apache.hadoop.yarn.event中)。

当使用YARN事件库时,通常先要定义一个中央异步调度器AsyncDispatcher,负责事件的处理与转发,然后根据实际业务需求定义一系列事件Event与事件处理器EventHandler,并注册到中央异步调度器中以实现事件统一管理和调度。以MRAppMaster为例,它内部包含一个中央异步调度器AsyncDispatcher,并注册了TaskAttemptEvent/TaskAttemptImpl、TaskEvent/TaskImpl、JobEvent/JobImpl等一系列事件/事件处理器,由中央异步调度器统一管理和调度。

2.2 源码分析

调度器

代码语言:javascript复制
public interface Dispatcher {
  public static final String DISPATCHER_EXIT_ON_ERROR_KEY =
      "yarn.dispatcher.exit-on-error";
  public static final boolean DEFAULT_DISPATCHER_EXIT_ON_ERROR = false;
  //默认实现把事件加入队列
  EventHandler getEventHandler();
  void register(Class<? extends Enum> eventType, EventHandler handler);
}

AsyncDispatcher(中央异步调度器)

继承AbstractService

代码语言:javascript复制
/**
 * 获取事件
 * 先进先出队列,阻塞队列
 * @return
 */
Runnable createThread() {
  return new Runnable() {
    @Override
    public void run() {
      while (!stopped && !Thread.currentThread().isInterrupted()) {
        drained = eventQueue.isEmpty();
        // blockNewEvents is only set when dispatcher is draining to stop,
        // adding this check is to avoid the overhead of acquiring the lock
        // and calling notify every time in the normal run of the loop.
        if (blockNewEvents) {
          synchronized (waitForDrained) {
            if (drained) {
              waitForDrained.notify();
            }
          }
        }
        Event event;
        try {
          event = eventQueue.take();
        } catch(InterruptedException ie) {
          if (!stopped) {
            LOG.warn("AsyncDispatcher thread interrupted", ie);
          }
          return;
        }
        if (event != null) {
          dispatch(event);
        }
      }
    }
  };
}
 /**
   * 适配启动设计handler处理器
   * @param event
   */
  @SuppressWarnings("unchecked")
  protected void dispatch(Event event) {
    //all events go thru this loop
    if (LOG.isDebugEnabled()) {
      LOG.debug("Dispatching the event "   event.getClass().getName()   "."
            event.toString());
    }

    Class<? extends Enum> type = event.getType().getDeclaringClass();

    try{
      EventHandler handler = eventDispatchers.get(type);
      if(handler != null) {
        handler.handle(event);
      } else {
        throw new Exception("No handler for registered for "   type);
      }
    } catch (Throwable t) {
      //TODO Maybe log the state of the queue
      LOG.fatal("Error in dispatcher thread", t);
      // If serviceStop is called, we should exit this thread gracefully.
      if (exitOnDispatchException
          && (ShutdownHookManager.get().isShutdownInProgress()) == false
          && stopped == false) {
        LOG.info("Exiting, bbye..");
        System.exit(-1);
      }
    }
  }
/**
**注册事件,根据eventType绑定
**
**/
 @SuppressWarnings("unchecked")
  @Override
  public void register(Class<? extends Enum> eventType,
      EventHandler handler) {
    /* check to see if we have a listener registered */
    EventHandler<Event> registeredHandler = (EventHandler<Event>)
    eventDispatchers.get(eventType);
    LOG.info("Registering "   eventType   " for "   handler.getClass());
    if (registeredHandler == null) {
      eventDispatchers.put(eventType, handler);
    } else if (!(registeredHandler instanceof MultiListenerHandler)){
      /* for multiple listeners of an event add the multiple listener handler */
      MultiListenerHandler multiHandler = new MultiListenerHandler();
      multiHandler.addHandler(registeredHandler);
      multiHandler.addHandler(handler);
      eventDispatchers.put(eventType, multiHandler);
    } else {
      /* already a multilistener, just add to it */
      MultiListenerHandler multiHandler
      = (MultiListenerHandler) registeredHandler;
      multiHandler.addHandler(handler);
    }
  }

2.3 测试

根据enum定义事件类型,功能。

代码语言:javascript复制
/**
 * Task事件类型定义
 * @Classname TestTaskEventType
 * @Description TODO
 * @Date 2020/5/30 17:39
 * @Created by limeng
 */
public enum TestTaskEventType {
    T_KILL,
    T_SCHEDULE
}
/**
 * Job事件类型定义
 * @Classname TestJobEventType
 * @Description TODO
 * @Date 2020/5/30 17:39
 * @Created by limeng
 */
public enum TestJobEventType {
    JOB_KILL,
    JOB_INIT,
    JOB_START
}

组合服务

代码语言:javascript复制
public class TestSimpleMRAppMaster extends CompositeService {
    private Dispatcher dispatcher;//中央异步调度器
    private String jobID;
    private int taskNumber;//该作业包含的任务数目
    private String[] taskIDs;//该作业内部包含的所有任务


    public TestSimpleMRAppMaster(String name, String jobID, int taskNumber) {
        super(name);
        this.jobID = jobID;
        this.taskNumber = taskNumber;
        this.taskIDs = new String[taskNumber];
        for (int i = 0; i < taskNumber; i  ) {
            taskIDs[i] = new String(jobID "_task_" i);
        }
    }

    @Override
    public void serviceInit(final Configuration conf) throws Exception {
        dispatcher = new AsyncDispatcher();
        //注册Job和Task事件调度器
        dispatcher.register(TestJobEventType.class,new JobEventDispatcher());
        dispatcher.register(TestTaskEventType.class,new TaskEventDispatcher());
        addService((Service)dispatcher);
        super.serviceInit(conf);
    }
    public Dispatcher getDispatcher() {
        return dispatcher;
    }
    private class JobEventDispatcher implements EventHandler<TestJobEvent> {
        @Override
        public void handle(TestJobEvent event) {
            if(event.getType() == TestJobEventType.JOB_KILL){
                System.out.println("Receive JOB_KILL event, killing all the tasks");
                for (int i = 0; i < taskNumber; i  ) {
                    dispatcher.getEventHandler().handle(new TestTaskEvent(taskIDs[i], TestTaskEventType.T_KILL));
                }
            }else if(event.getType() == TestJobEventType.JOB_INIT){
                System.out.println("Receive JOB_INIT event, scheduling tasks");
                for (int i = 0; i < taskNumber; i  ) {
                    dispatcher.getEventHandler().handle(new TestTaskEvent(taskIDs[i], TestTaskEventType.T_SCHEDULE));
                }
            }
        }
    }
    private class TaskEventDispatcher implements EventHandler<TestTaskEvent> {
        @Override
        public void handle(TestTaskEvent event) {
            if (event.getType() == TestTaskEventType.T_KILL) {
                System.out.println("Receive T_KILL event of task"   event.getTaskID());
            } else if (event.getType() == TestTaskEventType.T_SCHEDULE) {
                System.out.println("Receive T_SCHEDULE of task"   event.getTaskID());
            }
        }
    }
}

启动类

代码语言:javascript复制
public static void main(String[] args) throws Exception {
    String jobID = "job_20131215_12";
    TestSimpleMRAppMaster appMaster  = new TestSimpleMRAppMaster("Simple MRAppMaster", jobID, 5);
    YarnConfiguration conf = new YarnConfiguration(new Configuration());
    appMaster.serviceInit(conf);
    //注册事件模块
    appMaster.getDispatcher().getEventHandler().handle(new TestJobEvent(jobID, TestJobEventType.JOB_KILL));
    appMaster.getDispatcher().getEventHandler().handle(new TestJobEvent(jobID, TestJobEventType.JOB_INIT));
    appMaster.serviceStart();
}

2.3 优势

在基于事件驱动的编程模型中,所有对象被抽象成了事件处理器,而事件处理器之间通过事件相互关联。每种事件处理器处理一种类型的事件,同时根据需要触发另外一种事件,这种编程方式具有异步、并发等特点,更加高效,因此更适合大型分布式系统。

参考

《Hadoop技术内幕:深入解析YARN架构设计与实现原理》

0 人点赞