一.服务库
本文分析Hadoop2.6源码,生命周期长的对象,Yarn采用基于服务的对象管理模型对其进行管理。
特点
- 将每个被服务化的对象分为4个状态:NOTINITED(被创建)、INITED(已初始化)、STARTED(已启动)、STOPPED(停止)。
- 任何服务状态变化都可以出发另外一些动作。
- 可通过组合方式对任意服务进行组合,以便管理。
Yarn的服务模型的类图(org.apache.hadoop.service),定义基本服务初始化,启动,停止等操作,AbstractService提供了一个基本Service实现。Yarn中所有对象,如果非组合服务,直接继承AbstractService即可,否则需继承CompositeService。
在Yarn中,ResourceManager和NodeManager属于组合服务,它们内部包含多个单一服务和组合服务,以实现对内部多种服务统一管理。
二.事件库
Yarn采用基于事件驱动的并发模型,该模型能够增强并发性,从而提高系统性能。为了构建该模型,YARN将各种处理逻辑抽象成事件和对应事件调度器,并将每类事件的处理过程分割成多个步骤,用有限状态机表示。
整个处理过程大致为:处理请求会作为事件进入系统,由中央异步调度器(Async-Dispatcher)负责传递给相应事件调度器(Event Handler)。该事件调度器可能将该事件转发给另外一个事件调度器,也可能交给一个带有有限状态机的事件处理器,其处理结果也以事件的形式输出给中央异步调度器。而新的事件会再次被中央异步调度器转发给下一个事件调度器,直至处理完成(达到终止条件)。
在YARN中,所有核心服务实际上都是一个中央异步调度器,包括ResourceManager、NodeManager、MRAppMaster(MapReduce应用程序的ApplicationMaster)等,它们维护了事先注册的事件与事件处理器,并根据接收的事件类型驱动服务的运行。
YARN中事件与事件处理器类的关系(位于包org.apache.hadoop.yarn.event中)。
当使用YARN事件库时,通常先要定义一个中央异步调度器AsyncDispatcher,负责事件的处理与转发,然后根据实际业务需求定义一系列事件Event与事件处理器EventHandler,并注册到中央异步调度器中以实现事件统一管理和调度。以MRAppMaster为例,它内部包含一个中央异步调度器AsyncDispatcher,并注册了TaskAttemptEvent/TaskAttemptImpl、TaskEvent/TaskImpl、JobEvent/JobImpl等一系列事件/事件处理器,由中央异步调度器统一管理和调度。
2.2 源码分析
代码语言:javascript复制调度器
public interface Dispatcher {
public static final String DISPATCHER_EXIT_ON_ERROR_KEY =
"yarn.dispatcher.exit-on-error";
public static final boolean DEFAULT_DISPATCHER_EXIT_ON_ERROR = false;
//默认实现把事件加入队列
EventHandler getEventHandler();
void register(Class<? extends Enum> eventType, EventHandler handler);
}
AsyncDispatcher(中央异步调度器)
继承AbstractService
代码语言:javascript复制/**
* 获取事件
* 先进先出队列,阻塞队列
* @return
*/
Runnable createThread() {
return new Runnable() {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
drained = eventQueue.isEmpty();
// blockNewEvents is only set when dispatcher is draining to stop,
// adding this check is to avoid the overhead of acquiring the lock
// and calling notify every time in the normal run of the loop.
if (blockNewEvents) {
synchronized (waitForDrained) {
if (drained) {
waitForDrained.notify();
}
}
}
Event event;
try {
event = eventQueue.take();
} catch(InterruptedException ie) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return;
}
if (event != null) {
dispatch(event);
}
}
}
};
}
/**
* 适配启动设计handler处理器
* @param event
*/
@SuppressWarnings("unchecked")
protected void dispatch(Event event) {
//all events go thru this loop
if (LOG.isDebugEnabled()) {
LOG.debug("Dispatching the event " event.getClass().getName() "."
event.toString());
}
Class<? extends Enum> type = event.getType().getDeclaringClass();
try{
EventHandler handler = eventDispatchers.get(type);
if(handler != null) {
handler.handle(event);
} else {
throw new Exception("No handler for registered for " type);
}
} catch (Throwable t) {
//TODO Maybe log the state of the queue
LOG.fatal("Error in dispatcher thread", t);
// If serviceStop is called, we should exit this thread gracefully.
if (exitOnDispatchException
&& (ShutdownHookManager.get().isShutdownInProgress()) == false
&& stopped == false) {
LOG.info("Exiting, bbye..");
System.exit(-1);
}
}
}
/**
**注册事件,根据eventType绑定
**
**/
@SuppressWarnings("unchecked")
@Override
public void register(Class<? extends Enum> eventType,
EventHandler handler) {
/* check to see if we have a listener registered */
EventHandler<Event> registeredHandler = (EventHandler<Event>)
eventDispatchers.get(eventType);
LOG.info("Registering " eventType " for " handler.getClass());
if (registeredHandler == null) {
eventDispatchers.put(eventType, handler);
} else if (!(registeredHandler instanceof MultiListenerHandler)){
/* for multiple listeners of an event add the multiple listener handler */
MultiListenerHandler multiHandler = new MultiListenerHandler();
multiHandler.addHandler(registeredHandler);
multiHandler.addHandler(handler);
eventDispatchers.put(eventType, multiHandler);
} else {
/* already a multilistener, just add to it */
MultiListenerHandler multiHandler
= (MultiListenerHandler) registeredHandler;
multiHandler.addHandler(handler);
}
}
2.3 测试
根据enum定义事件类型,功能。
代码语言:javascript复制/**
* Task事件类型定义
* @Classname TestTaskEventType
* @Description TODO
* @Date 2020/5/30 17:39
* @Created by limeng
*/
public enum TestTaskEventType {
T_KILL,
T_SCHEDULE
}
/**
* Job事件类型定义
* @Classname TestJobEventType
* @Description TODO
* @Date 2020/5/30 17:39
* @Created by limeng
*/
public enum TestJobEventType {
JOB_KILL,
JOB_INIT,
JOB_START
}
代码语言:javascript复制组合服务
public class TestSimpleMRAppMaster extends CompositeService {
private Dispatcher dispatcher;//中央异步调度器
private String jobID;
private int taskNumber;//该作业包含的任务数目
private String[] taskIDs;//该作业内部包含的所有任务
public TestSimpleMRAppMaster(String name, String jobID, int taskNumber) {
super(name);
this.jobID = jobID;
this.taskNumber = taskNumber;
this.taskIDs = new String[taskNumber];
for (int i = 0; i < taskNumber; i ) {
taskIDs[i] = new String(jobID "_task_" i);
}
}
@Override
public void serviceInit(final Configuration conf) throws Exception {
dispatcher = new AsyncDispatcher();
//注册Job和Task事件调度器
dispatcher.register(TestJobEventType.class,new JobEventDispatcher());
dispatcher.register(TestTaskEventType.class,new TaskEventDispatcher());
addService((Service)dispatcher);
super.serviceInit(conf);
}
public Dispatcher getDispatcher() {
return dispatcher;
}
private class JobEventDispatcher implements EventHandler<TestJobEvent> {
@Override
public void handle(TestJobEvent event) {
if(event.getType() == TestJobEventType.JOB_KILL){
System.out.println("Receive JOB_KILL event, killing all the tasks");
for (int i = 0; i < taskNumber; i ) {
dispatcher.getEventHandler().handle(new TestTaskEvent(taskIDs[i], TestTaskEventType.T_KILL));
}
}else if(event.getType() == TestJobEventType.JOB_INIT){
System.out.println("Receive JOB_INIT event, scheduling tasks");
for (int i = 0; i < taskNumber; i ) {
dispatcher.getEventHandler().handle(new TestTaskEvent(taskIDs[i], TestTaskEventType.T_SCHEDULE));
}
}
}
}
private class TaskEventDispatcher implements EventHandler<TestTaskEvent> {
@Override
public void handle(TestTaskEvent event) {
if (event.getType() == TestTaskEventType.T_KILL) {
System.out.println("Receive T_KILL event of task" event.getTaskID());
} else if (event.getType() == TestTaskEventType.T_SCHEDULE) {
System.out.println("Receive T_SCHEDULE of task" event.getTaskID());
}
}
}
}
代码语言:javascript复制启动类
public static void main(String[] args) throws Exception {
String jobID = "job_20131215_12";
TestSimpleMRAppMaster appMaster = new TestSimpleMRAppMaster("Simple MRAppMaster", jobID, 5);
YarnConfiguration conf = new YarnConfiguration(new Configuration());
appMaster.serviceInit(conf);
//注册事件模块
appMaster.getDispatcher().getEventHandler().handle(new TestJobEvent(jobID, TestJobEventType.JOB_KILL));
appMaster.getDispatcher().getEventHandler().handle(new TestJobEvent(jobID, TestJobEventType.JOB_INIT));
appMaster.serviceStart();
}
2.3 优势
在基于事件驱动的编程模型中,所有对象被抽象成了事件处理器,而事件处理器之间通过事件相互关联。每种事件处理器处理一种类型的事件,同时根据需要触发另外一种事件,这种编程方式具有异步、并发等特点,更加高效,因此更适合大型分布式系统。
参考
《Hadoop技术内幕:深入解析YARN架构设计与实现原理》