本文是 yarn 学习笔记,主要参考 《Hadoop技术内幕:深入解析YARN架构设计与实现原理》,对比 yarn 和 kubernetes 的实现差异。
架构
Yarn 两个重要的组件 RM 和 NM:
- ResourceManager(RM): 中央控制资源在 应用中的分配, ResourceManager 有两个重要的组件:
- Scheduler: 根据容量、队列限制条件将系统资源分配给各个应用,可插拔,用户可以自己定制,也可以选择Fair或Capacity调度器.
- ApplicationsManager: ResourceManager <--> NodeManager,负责应用提交;与调度器协商资源以启动 per-application ApplicationsMaster(AM);监控AM运行状态并在失败时重启它.
注意 AM 一般指的是 ApplicationMaster 不是 ApplicationManager
- NodeManager(NM): 节点 agent, 管理节点 container,上报节点资源状态
ApplicationMaster(AM),用户提交的每个应用程序都需要包含一个AM, 作用为:
- 与RM调度器协商以获取资源(以container为资源单位)
- 将得到的任务进一步分配给内部的任务
- 与 NM 通信以启动/停止任务
- 监控所有任务运行状态,并在失败时重新为任务申请资源以重启任务
- Yarn已经实现了三个 AM:
- DistributedShell:分布式的运行shell命令的一个示例
- UnmanagedAM:AM 不在集群内的情况示例
- MRAppMaster:MapReduce 应用的AM
基础库
- Protocol Buffers
- Apache Avro:Avro 是 Hadoop 生态系统中的 RPC 框架,具有平台无关、支持动态 模式(无需编译)等优点
- RPC 库:其中采用的默认序列化方法为 Protocol Buffers。
- 服务库和事件库:YARN 将所有的对象服务化,以便统一管理(比创建、销毁等), 而服务之间则采用事件机制进行通信
- 服务库:对于生命周期较长的对象,YARN 采用了基于服务 Service 的对象管理模型对其进行管理,每个被服务化的对象分为 4 个状态:NOTINITED(被创建)、INITED(已初始化)、 STARTED(已启动)、STOPPED(已停止),可通过组合的方式对任意服务进行组合,以便进行统一管理
- 事件库:YARN 采用了基于事件驱动的并发模型
- 处理请求会作为事件进入系统,由中央异步调度器(Async- Dispatcher)负责传递给相应事件调度器(Event Handler)。该事件调度器可能将该事件转发给 另外一个事件调度器,也可能交给一个
带有有限状态机的事件处理器
,其处理结果也以事 件的形式输出给中央异步调度器 - 在 YARN 中,所有核心服务实际上都是一个中央异步调度器,包括 ResourceManager、 NodeManager、MRAppMaster(MapReduce 应用程序的 ApplicationMaster)等,它们维护 了事先注册的事件与事件处理器,并根据接收的事件类型驱动服务的运行。
- 当使用 YARN 事件库时,通常先要定义一个中央异步调度器 AsyncDispatcher,负责事件的处理与转发,然后根据实际业务需求定义一系列事件 Event 与事件处理器 EventHandler,并注册到中央异步调度器中以实现事件统一管理和调度。以 MRAppMaster 为例,它内部包含一个中央异步调度器 AsyncDispatcher,并注册了 TaskAttemptEvent/ TaskAttemptImpl、TaskEvent/TaskImpl、JobEvent/JobImpl 等一系列事件 / 事件处理器,由 中央异步调度器统一管理和调度。
- 处理请求会作为事件进入系统,由中央异步调度器(Async- Dispatcher)负责传递给相应事件调度器(Event Handler)。该事件调度器可能将该事件转发给 另外一个事件调度器,也可能交给一个
- 状态机库 :状态机是一种表示有限个状态以及在这些状态之间的转移和动作等行为 的数学模型。在 YARN 中,很多对象都是由若干状态组成的,且当有事件发生时, 状态之间会发生转移,比如作业、任务、Container 等,而 YARN 正是采用有限状 态机描述一些对象的状态以及状态之间的转移。Go 语言类似的库
通信协议
代码位置
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api
graph LR
JobClient-->|ApplicationClientProtocol|RM
Admin-->|ResourceManagerAdministrationProtocol|RM
AM-->|ApplicationMasterProtocol|RM
AM-->|ContainerManagementProtocol|NM
NM-->|ResourceTracker|RM
FROM -> TO | 主要作用 | 重要接口 |
---|---|---|
JobClient -> RM (ApplicationClientProtocol) | JobClient 通过该RPC提交应用程序、查询应用程序状态等 |
|
Admin -> RM (ResourceManagerAdministrationProtocol) | Admin 通过该RPC更新系统配置文件,比如节点黑白名单、用户队列权限等 |
|
AM -> RM (ApplicationMasterProtocol) | AM 通过该 RPC 向RM注册和注销自己,并为各个任务申请资源 |
|
AM -> NM (ContainerManagementProtocol) | AM 通过该 RPC 要求 NM 启动或停 止Container,获取各个 container 的状态等信息 |
|
NM -> RM (ResourceTracker) | NM 通过该 RPC 协议向 RM 注册,并定时发送心跳汇报当前节点的资源使用情况和 Container 运行情况 |
|
基本运行流程
代码语言:txt复制sequenceDiagram
Client->>RM: 1. 提交应用,其中包括AM、启动AM的命令、用户程序等
RM->>AM: 2. 分配第一个Container,与对应的NM通信,要求它在这个Container中启动应用AM
AM->>RM: 3. 向RM注册,用户可以通过RM查看应用状态。AM为各个任务申请资源,控运行状态到运行结束
loop
AM->>RM: 4. 采用轮询的方式通过RPC协议向RM申请和领取资源
AM->>NM: 5. 获得资源后,便与对应的NM通信,要求它启动任务
NM->>Container: 6. NM为任务设置好运行环境(包括环境变量/JAR包/二进制程序等)后,将任务启动命令写到一个脚本中,并通过其启动任务
Container->>AM: 7. 通过RPC协议向AM汇报自己的状态/进度,以让AM掌握状态,从而可以在任务失败时重启任务
end
AM->>RM: 8.申请注销并关闭自己
思考: AM 可不可以省略,集成到 RM 成为一个线程 (插件),让整个架构变得更简单清晰?
RM
概述见上,RM 中的 Service 分为 "Always On" services 和 "Active" services,表示 HA 模式 Leader 的功能;
从多个模块角度看:
- 交互模块:RM对普通用户、管理员、Web提供了三种对外服务:
- ClientRMService: 实现
ApplicationClientProtocol
, 提供一个 rpc server (多种实现 RpcEngine,主要:ProtobufRpcEngine2) 为普通用户提供服务,它处理来自客户端的各种 RPC,比如:终止/提交应用/获取应用状态等,内部会调用如RMContext
(最重要,内部有 Dispatcher/HAServiceState/RMStateStore 等等);RMAppManager
;RMContext.Dispatcher
(异步请求通过事件);ResourceScheduler
;YarnScheduler
完成请求- RMContext 的重要属性, serviceContext/activeServiceContext:
- rmDispatcher
- containerAllocationExpirer
- amLivelinessMonitor/amFinishingMonitor
- delegationTokenRenewer/appTokenSecretManager/containerTokenSecretManager/nmTokenSecretManager/clientToAMTokenSecretManager
- resourceScheduler
- 大量 state,包括 node, application 等,为了重启时恢复这些状态,有一个 pluggable state-store 可以持久化状态
- RMContext 的重要属性, serviceContext/activeServiceContext:
- AdminService: 为管理员提供的独立接口,主要目的是为了防止大量普通用户请求阻塞管理员通道,提供如下功能:动态更新节点列表/更新ACL列表/更新队列信息
- WebApp:提供一个Web界面来让用户更友好的获知集群和应用的状态
- ClientRMService: 实现
- NM 管理模块:用来管理NM的模块,主要包含以下三个组件:
- ResourceTrackerService,实现上面的
ResourceTracker
协议: 处理来自NodeManager的请求,主要包括:- 注册:注册是NM启动时发生的行为,NM提供的信息包括:节点ID、可用资源上限信息等
- 心跳:心跳是周期行为; NM提供的信息包括:各个Container运行状态、运行的Application列表、节点健康状态等。RM返回的信息包括:等待释放的Container列表、Application列等
- NMLivelinessMonitor: 监控NM是否活着,如果NM在一定时间 (默认10m) 内未上报心跳,则认为它死掉,需要移除
- NodesListManager: 维护正常节点和异常节点列表,管理exclude(类似黑名单)和include(类似白名单)节点列表,这两个列表均是在配置文件中设置的,可以动态加载。
- ResourceTrackerService,实现上面的
- AM 管理模块:主要是用来管理所有AM,主要包括:
- ApplicationMasterService (AMS) 实现协议
ApplicationMasterProtocol
(注意 AM 和 RMApp 是一一对应的): 处理来自AM的请求,包括:- 注册:是AM启动时发生的行为,信息包括:AM的启动节点、对外RPC端口、tracking URL等
- 心跳 (allocate):是周期行为。AM提供的信息包括:所需资源的描述、待释放Container列表、黑名单列表等。AMS返回的信息包括:新分配的Container、失败的Container、待抢占的Container列表等
- 默认这个请求会由 DefaultAMSProcessor 来处理 --> YarnScheduler.allocate --> 查询 SchedulerApplicationAttempt 的状态
- AMLivelinessMonitor:监控AM是否活着,如果AM在一定时间(默认10m)内未上报心路,则认为它死掉,它上面正在运行的Container将会被置为失败状态,而AM本身会被分配到另一个节点上(用户可以指定重试次数,默认5)
- ApplicationMasterLauncher:与某个NM通信,要求它为某个应用程序启动AM
- ApllicationMaster 的启动流程如下
- ApplicationMasterService (AMS) 实现协议
sequenceDiagram
ApllicationMasterLauncher->>NodeManager: 1. StartContainer启动app的ApplicationMaster
ApllicationMasterLauncher->>AMLivelinessMonitor: 2.通过事件将 AM 注册到AMLivelinessMonitor, 启动心跳监控
ApplicationMaster->>ApplicationMasterService: 3. 注册自己
loop
ApplicationMaster->>ApplicationMasterService: 4. allocate 心跳
ApplicationMasterService->>AMLivelinessMonitor: 5. update exprire time
end
ApplicationMaster->>ApplicationMasterService: 6. 注销自己
ApplicationMasterService->>AMLivelinessMonitor: 7. remove AM 心跳监控
- 应用管理模块:主要是各个应用外围的管理,并不涉及到应用内部
- ApplicationACLsManager:管理应用程序访问权限,包含两部分:
- 查看权限:主要用于查看应用程序基本信息
- 修改权限:主要用于修改应用程序优先级、杀死应用程序等
- RMAppManager: 管理应用程序的启动和关闭,submitApplication 会创建一个 RMAppImpl 表示一个 application
- ContainerAllocationExpirer: 当AM收到RM新分配的Container后,必须在一定时间(默认10m)内在对应的NM上启动该Container,否则RM将强制回收该Container,而一个已经分配的Container是否该被回收则是由 ContainerAllocationExpirer 决定和执行的 (由 amLivelinessMonitor 触发)
- ApplicationACLsManager:管理应用程序访问权限,包含两部分:
- 状态机管理模块:RM 使用有限状态机维护有状态对象的生命周期 (状态机的设计很重要,unicorn 也是类似设计),状态流转由各种事件驱动,状态机的引入使得 Yarn 的架构设计清晰,RM内部的状态机有:
- RMApp: 维护一个应用程序的整个运行周期,包括从启动到运行结束的整个过程。由于一个APP的生命周期可能会启动多个运行实例(Attempt),RMApp维护的是所有的这些Attempt
- RMAppAttempt: 一次应用程序的运行实例的整个生命周期,可以理解为APP的一次尝试运行
- RMContainer: 一个Container的运行周期,包括从创建到运行结束的整个过程。
- RM将资源封装成Container发送给应用程序的AM,AM在Container描述的运行环境中启动任务
- Yarn不支持Container重用,一个Container用完后会立刻释放
- RMNode: 维护了一个NM的生命周期,包括从启动到运行结束的整个过程
- RMApp >> RMAppAttempt >> RMContainer
- 安全模块:RM自带了非常全面的权限管理机制,主要包括:
- ClientToAMSecretManager
- ContainerTokenSecretManager
- ApplicationTokenSecretManager
- 调度模块:主要包含一个组件 ResourceScheduler, 资源调度器
- 它按照一定的约束条件(比如队列容量限制等)将集群中的资源分配给各个应用程序,目前主要考虑内存和CPU
- ResourceScheduler <--- AbstractYarnScheduler <--- FairScheduler/FIFOScheduler 等 是一个可插拔式的模块,自带三个调度器,用户可以自己定制。
- FIFO:先进先出,单用户
- Fair Scheduler:公平调度器(FairScheduler基本上具备其它两种的所有功能)
- Capacity Scheduler:容量调度器
事件
序号 | 组件名称 | 服务/事件处理器 | 处理的事件类型 | 输出事件类型 |
---|---|---|---|---|
1 | ClientRMService | 服务 | – | RMAppAttemptEvent/RMAppEvent/RMNodeEvent |
2 | NMLivelinessMonitor | 服务 | – | RMNodeEvent |
3 | ResourceTrackerService | 服务 | – | RMNodeEvent/RMAppAttemptEvent |
4 | AMLivelinessMonitor | 服务 | – | RMAppAttemptEvent |
5 | ContainerAllocationExpirer | 服务 | – | SchedulerEvent |
6 | ApplicationMasterLauncher | 事件处理器 | AMLauncherEvent | – |
7 | RMAppManager | 事件处理器 | RMAppManagerEvent | RMAppEvent |
8 | NodesListManager | 事件处理器 | NodesListManagerEvent | RMNodeEvent/RMAppEvent |
9 | RMApp(ApplicationEventDispatcher)(RMAppImpl) | 事件处理器 | RMAppEvent | RMAppAttemptEvent/RMNodeEvent/SchedulerEvent/RMAppManagerEvent |
10 | RMAppAttempt(ApplicationAttemptEventDispatcher)(RMAppAttemptImpl) | 事件处理器 | RMAppAttemptEvent | SchedulerEvent/RMAppAttemptEvent/RMAppEvent/AMLauncherEvent/RMNodeEvent |
11 | RMNode(NodeEventDispatcher)(RMNodeImpl) | 事件处理器 | RMNodeEvent | RMAppEvent/SchedulerEvent/NodesListManagerEvent/RMNodeEvent |
12 | ResourceScheduler(EventDispatcher)(FairScheduler) | 事件处理器 | SchedulerEvent | RMAppEvent/RMAppAttemptEvent |
13 | RMContainer (RMContainerImpl) | 事件处理器(非异步) | RMContainerEvent | RMAppEvent/RMAppAttemptEvent/RMNodeEvent |
Client 和 AM 设计
以 MapReduce 任务为例,实现了自己 的 Client(JobClient) 和 MRAppMaster(AM)
代码语言:txt复制sequenceDiagram
Client->>ResourceManager: ApplicationClientProtocol:forceKill/getAllApplication/getClusterNode/Metrcis
Client->>MRAppMaster: MRClientProtocol:getJob/TaskReport/kill/job/task/taskAttempt
AM 编写, 分成 AM-RM 和 AM-NM 两部分:
- AM-RM 完成 registerApplicationMaster --> 定期 allocate --> finishApplicationMaster
- NM-AM 完成 startContainer --> getContainerStatus --> stopContainer
例子:
- DistributedShell
调度
- 调度流程
sequenceDiagram
NodeManager->>ResourceManager: 心跳汇报节点信息
ResourceManager->>NodeManager: 心跳返回需释放的 Container等信息
ResourceManager->>ResourceScheduler: NodeUpdate事件.Sheduler分配资源存在内存中
ApplicationMaster->>ResourceScheduler: 心跳领取新分配的 Container
ApplicationMaster->>NodeManager: 分配Container到内部task并启动
- 主资源公平调度算法(DRF, Dominant Resource Fairness) 强调是是 queue 之间的公平性:
- 主资源: 用户申请的各个维度的资源占其维度上的资源总量的百分比, 其中最大都那种资源为 主(要)资源
- share 值:用户分得的主资源累积值占其维度资源总量的百分比
- 资源分配过程:每次进行资源分配时,先比较一下各个用户当前占据的share,找到share值最小的,分配一个资源单位。
- 抢占是用户在需要资源时将闲置时出让给其他用户的资源回收, 抢占会先发送给 AM 处理,如果没有处理才会心跳给 NM 强制 KILL
- 调度逻辑和 kubernetes 的对比
# 简化后的 kubernetes 调度逻辑, 为一个优先的 pod 选择最优的 node
# 目前 pod 的排序逻辑只能一个,而 node 的排序则比较丰富
while pod = podQueue.Pop(): # podQueue 的排序是一个扩展点
for node in sort(filter(nodeList): # filter 有多个扩展点;sort 基于 score 也有多个扩展点
bind(pod, node)
# 简化的 yarn ResourceScheduler 调度逻辑, 为一个优先(比如资源由多到少)的 node 选择优先的 queue -> app -> container
# node 的排序逻辑单一,而 queue/app/container 的排序较为细致
for node in sorted(nodeList):
while canStillAssin:
# 这里其实 queue 有层级关系,会从 root 到 leaf 执行, 一直到 App
app = sortedQueues.pop()
# 即 app 里面的 选择一个 container, 会考虑优先级,本地性 等
app.assignContainer(node)
FSAppAttempt.assignContainer(node)
NM
- 主要职能
- Node 相关: ResouceTrackerProtocol -> ResourceManager
- Container 相关: ContainerManagementProtocol <- ApplicationMaster/ResourceManager
- NM 也是通过事件和各个组件交互,内存主要有两个中央异步 Dispatcher, 分别在 NodeManager 和 ContainerManagerImp 中
- NM 维护了三类状态机: Application (和 RM 里面 RMApp 状态机不同), Container, LocalizedResource
对比 NM 和 kubelet
首先对比 NM 的协议 和 CRI
代码语言:txt复制# ContainerManagementProtocol 只看 Stable的
- startContainers
- stopContainers
- getContainerStatuses
# 下面都是 unstable 的
- updateContainer
- signalToContainer
- localize
- reInitializeContainer
- restartContainer
- rollbackLastReInitialization
- commitLastReInitialization
- getLocalizationStatuses
# CRI
- Run/Stop/Remove/ListPodSandbox
- PodSandboxStatus
- Create/Start/Stop/Remove/ListContainer
- ContainerStatus
- UpdateContainerResources
- ExecSync/Exec/Attach/PortForward
参考
- Yarn 文档
- Yarn源码分析-亚坤
- Hadoop YARN:调度性能优化实践
- Hadoop技术内幕:深入解析YARN架构设计与实现原理