最近几天做Hadoop机架感知功能时,在网上可以找到很多关于HDFS机架感知的资料,但是对于YARN机架感知的介绍却很少。这篇文章最主要就是说明机架感知功能对于YARN来说起到的作用,若有理解的偏差请指正。谢谢。
*注:代码基于3.1.1*
1 前言
1.1 Hadoop机架感知功能对于HDFS
HDFS的默认副本数是3个,在未启用Hadoop机架感知功能时,数据的备份是随机的,有可能同一个DataNode节点有多个副本,也有可能所有的3个副本都在同一个机架,这样如果一个节点宕机或整个机架出现问题,数据就会丢失。
在开启Hadoop机架感知功能后,本地会存储一份, 同机架的某个节点存储一份,不同机架的某个节点存储一份。这样保证了如果本地数据损坏,首先会在同机架的节点上获取数据;如果整个机架出现问题,也可以保证从其他机架上获取数据。
Hadoop的机架感知功能使得HDFS在读取数据时降低了整体的带宽消耗和读取延时,也保证了数据不会轻易丢失。
1.2 Hadoop机架感知功能对于YARN
Hadoop机架感知功能对于YARN最直观的表现,可以通过ResourceManager的管理界面(ResourceManagerIP:8088/cluster/nodes)查看各个NodeManager所属机架,未启用Hadoop机架感知功能时,默认的机架就为/default-rack。这个可以结合NodeManager的健康检查功能,很快的定位出现问题节点所在的物理位置。
并且Hadoop机架感知还会影响YARN中Container启动时所在节点。Container首先会选择数据所在节点启动,如果该节点资源不足,则会在与该节点同机架的节点启动。如果该机架的节点资源都不足,则在其他节点启动。
2 启用Hadoop机架感知功能
2.1 相关配置
配置项 | 默认值 | 配置说明 |
---|---|---|
net.topology.node.switch.mapping.impl | org.apache.hadoop.net.ScriptBasedMapping | DNSToSwitchMapping的实现类。当配置为"org.apache.hadoop.net.ScriptBasedMapping"时,它调用“net.topology.script.file.name”中指定的脚本来解析节点所属机架。如果“net.topology.script.file.name”的值为空,所有节点的机架信息都为"/default-rack" |
net.topology.script.file.name | 解析机架拓扑关系的脚本名。例如:如果192.168.0.1所属机架为/rack1,则将“192.168.0.1”作为参数传入脚本,则会返回/rack1为输出。 |
2.2 功能的启用
"net.topology.node.switch.mapping.impl"配置项是拓扑解析的实现类,用户可以自己编写Java程序来解析机架信息。我们这里使用它的默认值“org.apache.hadoop.net.ScriptBasedMapping”来实现Hadoop的机架感知功能。
2.2.1 脚本的编写
一个脚本的实例如下所示,里面包含了每个IP所对应的rack信息,可以将这个脚本放在/home目录下,脚本名字为 topology-rack.sh。
代码语言:shell复制#!/usr/bin/python
#-*-coding:utf-8 -*-
import sys
rack = {
"192.168.0.1":"/room1-rack1",
"192.168.0.2":"/room1-rack1",
"192.168.0.3":"/room1-rack2",
"192.168.0.4":"/room1-rack2",
"192.168.0.5":"/room2-rack1",
"192.168.0.6":"/room2-rack1",
"192.168.0.7":"/room2-rack2"
}
if __name__=="__main__":
print "/" rack.get(sys.argv[1],"/default-rack")
将配置项“net.topology.script.file.name”配置如下:
代码语言:txt复制 <property>
<name>net.topology.script.file.name</name>
<value>/home/topology-rack.sh</value>
</property>
重启HDFS和YARN之后,就可以开启Hadoop机架感知功能。
**注:如果DataNode或NodeManager扩容,要及时修改脚本文件当中的拓扑关系,否则会默认扩容节点为/default-rack机架**
3 YARN机架感知的源码分析
3.1 初始化过程
YARN中,主要在RackResolver
类中处理机架信息。下面介绍该类的主要功能。
首先看其的初始化方法,RackResolver.init()
:
- 获取net.topology.node.switch.mapping.impl配置,初始化机架感知处理类。
public synchronized static void init(Configuration conf) {
....
Class<? extends DNSToSwitchMapping> dnsToSwitchMappingClass =
conf.getClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
ScriptBasedMapping.class,
DNSToSwitchMapping.class);
try {
DNSToSwitchMapping newInstance = ReflectionUtils.newInstance(
dnsToSwitchMappingClass, conf);
dnsToSwitchMapping =
((newInstance instanceof CachedDNSToSwitchMapping) ? newInstance
: new CachedDNSToSwitchMapping(newInstance));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
初始化机架感知处理类时,会通过ScriptBasedMapping
的构造函数,初始化一些参数:
- 初始化解析脚本的存放路径;
- 初始化脚本入参最大个数。
public void setConf (Configuration conf) {
super.setConf(conf);
if (conf != null) {
scriptName = conf.get(SCRIPT_FILENAME_KEY);
maxArgs = conf.getInt(SCRIPT_ARG_COUNT_KEY, DEFAULT_ARG_COUNT);
} else {
scriptName = null;
maxArgs = 0;
}
}
3.2 获取主机与机架的相应关系
3.2.1 建立主机名与机架的对应关系
YARN会通过RackResolver.coreResolve()
方法来获取主机名和机架信息的对应关系,具体如下:
- 通过
dnsToSwitchMapping.resolve(tmpList)
获取主机与机架的信息,其具体解析过程3.2.2介绍; - 默认机架信息为
DEFAULT_RACK
,如果主机解析机架信息时为空,则其机架信息为默认值; - 创建并返回一个主机名与机架信息对应的
Node
。
private static Node coreResolve(String hostName) {
List <String> tmpList = Collections.singletonList(hostName);
List <String> rNameList = dnsToSwitchMapping.resolve(tmpList);
String rName = NetworkTopology.DEFAULT_RACK;
if (rNameList == null || rNameList.get(0) == null) {
LOG.debug("Could not resolve {}. Falling back to {}", hostName,
NetworkTopology.DEFAULT_RACK);
} else {
rName = rNameList.get(0);
LOG.debug("Resolved {} to {}", hostName, rName);
}
return new NodeBase(hostName, rName);
}
3.2.2 从缓存中获取主机名对应机架信息
接下来对3.2.1中所提到的dnsToSwitchMapping.resolv(List<String> names)
进行说明。dnsToSwitchMapping
是CachedDNSToSwitchMapping
的一个实例,会调用CachedDNSToSwitchMapping.resolve()
方法,其说明如下:
- 将主机名转换为主机IP;
- 通过
getUncachedHosts(names)
获取没存放在缓存中的主机IP; - 调用
rawMapping.resolve(uncachedHosts)
来解析未在缓存中主机的机架信息,具体解析过程3.2.3中介绍; - 将
uncachedHosts
和resolvedHosts
信息对应的存放到缓存中; - 再从缓存中读取主机IP对应的机架信息,并返回。
***注:由第2条可知,在NodeManager扩容时,将其信息更新到脚本就可以。但是如果修改一个已存在的NodeManager的机架信息,需要重启YARN服务生效(更新缓存)。***
代码语言:java复制 public List<String> resolve(List<String> names) {
names = NetUtils.normalizeHostNames(names);
...
List<String> uncachedHosts = getUncachedHosts(names);
// Resolve the uncached hosts
List<String> resolvedHosts = rawMapping.resolve(uncachedHosts);
//cache them
cacheResolvedHosts(uncachedHosts, resolvedHosts);
//now look up the entire list in the cache
return getCachedHosts(names);
}
3.2.3 从脚本中解析主机对应的机架信息
在rawMapping.resolve(uncachedHosts)
中,实际调用的是ScriptBasedMapping#RawScriptBasedMapping.resolve()
方法,其说明如下:
- 判断脚本信息是否为空(详见3.1);
- 通过脚本获取主机IP对应的机架信息(
runResolveCommand()
方法就是判断下传参是否符合要求,并且执行脚本,并将脚本执行结果返回),经过处理后返回。
public List<String> resolve(List<String> names) {
List<String> m = new ArrayList<String>(names.size());
if (scriptName == null) {
for (String name : names) {
m.add(NetworkTopology.DEFAULT_RACK);
}
return m;
}
...
String output = runResolveCommand(names, scriptName);
if (output != null) {
StringTokenizer allSwitchInfo = new StringTokenizer(output);
while (allSwitchInfo.hasMoreTokens()) {
String switchInfo = allSwitchInfo.nextToken();
m.add(switchInfo);
}
...
return m;
}
3.3 主机与机架信息的调用
在ResourceTrackerService.resolve()
、AMRMClientImpl.resolveRacks()
、RMContainerAllocator.assignMapsWithLocality()
、TaskAttemptImpl
的构造函数和TaskAttemptImpl.computeRackAndLocality()
中都调用了RackResolver.resolve()
方法来获取主机与机架的信息。
- 在
ResourceTrackerService
中,用其信息用在NodeManager
向ResourceManager
注册的过程中; - 在
AMRMClientImpl
中,其信息用在添加containerRequest
当中; - 在
RMContainerAllocator
和TaskAttemptImpl
中,其信息与资源本地性相关;
4 总结
HADOOP的机架感知功能,除了对HDFS的数据的副本有关之外,对YARN的NodeManager注册及Container的资源分发也有着密切的关系。