一、历史背景
- 时代背景 信息技术产业迅猛发展->企业、组织和个人产生的数据量俱增->我们生活在了一个数据增长比以往任何时候都要快的时代里->大数据时代
- 技术背景 网格计算、并行计算、分布式计算、资源虚拟化、网络存储等传统计算机技术和网络技术的联合演进以及相互融合
- 主要矛盾 海量数据所蕴含的4V特征:体量大(volume),模式多(variety),速度快(velocity),价值密度低(value),这使得传统数据存储和数据处理逐渐显得乏力,提升一个层次,也就是说大数据管理和信息提取的困难度和复杂性
- 云计算技术应运而生 云计算这个概念的直接起源来自Dell的数据中心解决方案、Google和IBM的分布式计算项目,但云计算的思想不是一蹴而就
- 核心思想 云计算采取集群计算,将若干独立计算实体通过网络整合成一个具有强大计算能力的资源池,并借助基础设施即服务(IaaS)、平台即服务(PaaS)、软件即服务(SaaS)和管理服务提供商(MSP)等先进的商业模式,采取移动计算,把计算资源池中强大的计算能力按需分配到用户手中。
- Hadoop历史 2003年以来,Google以学术论文的形式陆续公开了其GFS、MapReduce、BigTable等分布式海量数据处理框架,同时证明了该框架的高可扩展、高性能等优越性。基于这些技术,Doug Cutting将其运用到了全网搜索引擎项目Nutch中。2006年年初,开发人员将这个开源实现移出了Nutch,成为Lucene的一个子项目,称为Hadoop。同年2月,Apache Hadoop项目正式启动,以支持MapReduce和HDFS的独立发展。其中,HDFS(Hadoop Distributed File System)是Hadoop的分布式文件系统,也是GFS的开源实现,它主要提供在集群中的流式高吞吐量数据访问。MapReduce则是Hadoop的分布式数据处理框架,也是Google MapReduce计算模型的开源实现,它支持大规模数据集的并行运算。
- 应用
应用在海量数据的存储、计算、挖掘、分析、查询以及机器学习等。
二、讨论点
本文从Hadoop(1.0)系统中调度策略的角度展开讨论。这本质还是对Hadoop的集群资源进行管理,主要有四个方面:
- Hadoop作业调度算法和框架
- MapReduce任务调度策略
- Hadoop备份任务推测执行机制
- Hadoop资源表示模型
其中前三个方面本质是Hadoop的资源分配模型;最后一个方面是Hadoop的资源表示模型。
- Hadoop作业调度算法和框架
当没有被指定特定调度器时,Hadoop系统在启动时会加载一个默认的缺省调度器,即先进先出调度器(FIFO Scheduler)。FIFO Scheduler是按照客户作业(Job)的提交顺序获得调度机会以便执行。优点是算法思想简明且系统资源开销小,但FIFO Scheduler对用户提交作业不作区别,且FIFO策略有利于长作业,而不利于短作业,对于一些实时性要求较高的交互型作业往往效果很差。在出现长作业时,系统的平均响应时间过长,整体吞吐率下降。
在Apache Hadoop官方版本中还提供了Yahoo公司的计算能力调度器(Capacity Scheduler)和FaceBook公司的公平调度器(Fair Scheduler)。它们均是为多用户共享Hadoop集群而设计的调度框架,并以提高集群资源利用率和降低成本为目标。
Capacity Scheduler的设计思想主要是以队列为单位对系统资源进行划分和管理,再将资源按比例分配给各个队列,同时设置队列占有资源的上下限防止个别队列独占资源。缺点是队列的设置和选组无法动态进行。
Fair Scheduler以资源池对系统资源进行划分,但更注重系统资源对用户的公平共享。除此之外,它还支持资源抢占,降低小作业的调度延迟。缺点是没有考虑当前系统各节点的负载水平和实际的负载状态,导致节点实际负载不均衡。
- MapReduce任务调度策略
Hadoop系统对Map任务的调度主要以数据本地性(data-locality)为考虑因素。就是将Map任务分配给某些节点,在这些节点上有Map任务即将要处理的输入数据分片(InputSplit),所以不需要通过远程拷贝的方式将存储在其他节点上的输入数据拉取到该Map任务节点上。
提高任务数据本地性对加快MapReduce任务执行速度,作业周转时间,避免冗余网络I/O,节省集群带宽资源等诸多方面有直接影响。
- Hadoop备份任务推测执行机制改进
在异构环境中MapReduce表现不足。在分布式集群环境中,个别任务可能因为负载不均衡,资源分配不合理或者节点性能差异过大等原因,使得它的运行速度比其他任务落后,那么这些任务会拖慢整个作业的执行进度。为避免这种情况,Apache在Hadoop系统中内置了备份任务推测执行机制(Speculative Execution),采用慢任务推测机制识别出“拖后腿任务”,并为这些“慢任务”启动“备份任务”,将它们分配给空闲或者计算能力强的节点,最终选择执行快的备份任务作为最终结果。
三、Hadoop核心技术
- Hadoop生态
- HDFS
- Client:用户进程。通过与DataNode和NameNode交互完成用户提出的HDFS文件操作。它提供了一些主要的文件系统调用给用户。
- NameNode:主节点进程,一个Hadoop系统只存在一个NameNode,扮演“总管家”的角色,负责管理HDFS中的文件元数据信息(Metadata)和文件目录树。这两种信息以元数据镜像(fsimage)形式存放在本地磁盘,而用户对文件的改动以文件日志(editlog)形式存放在本地磁盘。和它还负责监控集群中所有DataNode的状态和坏DataNode节点数据的重新备份任务。
- SecondaryNameNode:相当于NameNode的“辅助”进程,定期将NameNode上的元数据镜像和文件改动日志下载,然后做合并,再返回给NameNode。
- DataNode:从节点进程,负责实际的数据存储和定期向NameNode主动汇报状态。DataNode以固定大小的数据块(block)为基本单位将用户数据切分,分别存储到不同的DataNode,默认为64MB。为了保证数据可靠性,会将同一个block块以流水线方式写到若干个(默认3个)不同的DataNode上。
HDFS为存取大文件设计,拥有以下几点基本特性:
- 对用户简单的数据备份机制:用户在HDFS上存储数据只需要一次写,而HDFS备份机制会自动完成链式数据块冗余备份。
- 认为移动计算比移动数据更经济:在单机系统中可以通过本地的磁盘I/O或内存访问就获取到计算任务的输入数据。而在分布式系统中,任务和数据两者可能分布在不同节点上,所以要通过网络I/O将数据交给任务。在Hadoop系统中认为移动计算任务到离输入数据更近的位置比直接移动输入数据到任务端更加经济,但由于在某些特殊情况下不得不采取移动数据,这使得系统有了进一步优化的空间,这也是本文要做的工作。
- 高容错性:HDFS可以在廉价硬件上较快得进行错误恢复,消除失败节点的影响,这得益于HDFS数据备份机制。
- 流式数据访问:HDFS是为存取海量数据设计,所以高吞吐量才是其设计目标,而不是低延迟数据访问。
- 支持大数据集:单一Hadoop系统,HDFS最大支持TB级的数据存储和访问。
- MapReduce
MapReduce是源自Google的一个编程模型,它以高并行和可扩展的方式处理大数据集。MapReduce的灵感来源于函数式编程,用户可将他们的计算表达为map和reduce函数,将数据作为键值对来处理,核心理念是移动计算,而不是移动数据。
Hadoop还提供了软件基础架构,以一系列map和reduce任务的形式运行MapReduce 作业。Map任务在输入数据的子集上调用map函数。在完成这些调用后,reduce 任务开始在map函数所生成的中间数据上调用reduce任务,生成最终的输出。
最重要的是,Hadoop基础架构负责处理分布式处理的所有复杂方面:并行化计算、任务调度、资源管理、机器间通信、软硬件故障处理等等。得益于这种干净的抽象,实现分布式应用程序从未像现在这么容易过。
MapReduce核心组件JobClient、JobTracker和TaskTracker,主要功能作用如下:
- JobClient:将用户的MapReduce程序提交到JobTracker。Hadoop中使用“用户作业”来指代用户编写的MapReduce程序。一个完整用户程序对应若干作业(Job),而一个作业被划分为若干任务,包含Map任务和Reduce任务。
- JobTracker:主节点进程,负责集群资源监控以及作业调度。它不仅监控所有TaskTracker与作业的运行状况,还会跟踪任务的执行进度、资源使用量等信息提供给作业调度器。
- TaskTracker:从节点进程,周期性通过发送心跳向JobTracker汇报节点上资源使用、任务执行进度等情况。同时接收并执行JobTracker返回的命令。Hadoop实用资源槽(Slot)等量划分节点资源(CPU、内存等资源),主要包括Map Slot和Reduce Slot。
- Task:主要有MapTask和ReduceTask两种。均由JobTracker分配给有剩余相应资源槽的TaskTraker上执行。而MapTask的输入数据被称作一个输入数据分片(InputSplit),它由一个block数据块划分,但一般InputSplit与block的对应关系是1比1,而一个MapTask只处理一个InputSplit。
- 作业执行过程
- 作业提交 (submit job)。 由JobClient.runJob()函数创建JobClient实例,再利用此实例的submitJob()提交用户作业,过程包括申请新JobID、计算输入分片和检查输出目录。同时将作业运行所需的资源,例如程序jar包、作业配置文件、输入分片元文件信息等上传到HDFS上一个以JobID命名的目录下。
- 添加新作业 (add new job)。 JobTracker将新作业添加到作业队列,创建一个JobInProgress实例全程跟踪作业运行状态,并等待调度器调度并初始化。
- 创建任务 (create task)。 Hadoop作业调度器从作业队列中选择一个作业进行初始化。JobInProgress为每个Task创建一个TaskInProgress实例以跟踪任务运行状态。按照输入分片信息,为每个InputSplit启动一个Map任务,而Reduce任务的数量由用户作业自行配置,每个任务和作业都有唯一ID号。
- 启动任务 (launch task)。 TaskTracker周期性地通过向JobTracker发送心跳信息,汇报自身的运行状态。当JobTracker发现TaskTracker有空闲资源槽时,会将相应任务分配给TaskTracker执行。
- 传送LaunchTaskAction指令 (transfer LaunchTaskAction)。 JobTracker会发送LaunchTaskAction命令,将任务分配给该TaskTracker执行。
- 添加新任务 (add new task)。 将Task添加到任务列表,由对应的TaskInProgress监控任务的执行过程。
- 创建新runner (new runner)。 创建一个TaskRunner实例来运行任务。
- 创建子进程 (create child)。 启动一个Java虚拟机来执行Map任务。
- 启动Map任务 (invoke map)。 将作业程序从HDFS拷贝到本地,启动Map任务对输入数据分片进行map函数处理,中间结果数据存储到本地。
- 启动合并器 (invoke combiner)。 将中间结果初步整合,减少数据的传输量。
- 创建子进程(create child)。 启动Java虚拟机,执行Reduce任务。
- 启动Reduce任务 (invoke reduce)。 经过shuffle阶段将Map任务输出的中间,结果拉取过来进行reduce函数处理得到最终结果。
- 输出最终结果 (output result)。
作业输出结果存放到HDFS上。
- 作业调度框架
JobTracker对作业的管理被抽象为三个层次:作业监控层、任务监控层和任务执行层,并以“三层多叉树”的方式建立各层之间的关系。
- 作业监控层:每个作业由一个JobInProgress实例描述和跟踪整个运行状态。
- 任务监控层:每个任务由一个TaskInProgress实例描述和跟踪整个运行状态。当所有TaskInProgress成功,那么上层对应的JobInProgress成功。
- 任务执行层:任务可能运行失败,可以尝试多次运行。每一次运行尝试的实例称为Task Attempt。当任何一次成功,上层对应的TaskInProgress成功。
Hadoop以队列为单位管理作业和资源,Hadoop调度器本质上均采用“三级调度模型”。首先选择一个作业队列(JobQueue),再从作业队列中选择某项作业(Job);接着从该项作业划分后的若干个子任务(Task)中进行选择;最后分配给某个有能力的TaskTracker去执行。其中前两个级别的选择策略是由不同调度器根据实际应用需求而设计实现的,在最后一个级别的任务选择策略中,Hadoop考虑的关键因素均为数据本地性。因此Hadoop将之封装成一个通用的模块供给调度器使用,具体存放在JobInProgress类中。
- MapReduce详细过程
在MapReduce中,一次作业的执行过程分为三个重要阶段:map、shuffle和reduce阶段,每个阶段都以键值对<key, value>作为输入/输出的数据形式。map和reduce两个阶段分别执行若干Map任务(MapTask)和Reduce任务(ReduceTask)。每个Map任务处理一个输入数据分片(InputSplit),并将产生的中间结果数据写到本地磁盘;而Reduce任务则从每个Map任务的本地磁盘上远程拷贝中间结果数据中相应的数据片段,再经过合并(merge)、排序(sort)、分组(group)和归约(reduce)后,将结果写到HDFS作为作业运行的最终结果。
下面分别就MapReduce的map、shuffle、reduce这三个重要阶段做简要介绍。
- map阶段:map阶段又可划分为读取(read)、函数处理(map)、分区(partition)、溢写(spill)、排序(sort)、分组(group)、预规约(combine)等阶段。
在map阶段,每个Map任务所要处理的数据称作一个输入数据分片(InputSplit),一个MapTask只处理一个InputSplit,它是由数据块(block)切分后的分片,但一般InputSplit与block的关系是1比1,也就是一个Map任务处理一个block的数据。
首先,每个Map任务从它的输入分片(InputSplit)中解析出一系列的键值对<key,value>,并交给用户作业程序中的map()函数处理,并得出一系列的结果键值对。
然后,由分区程序partition()对键值对进行分区以确定每个结果键值对应该交给哪个Reduce任务处理。默认的分区程序是按照map处理的结果键值对<key, value>中的key进行哈希,再以Reduce任务的总量取模,得出该键值对的分区号。
接着,以上产生的键值对和对应的分区号都会被写入一个环形逻辑结构的内存缓冲区(MapOutputBuffer)。当环形内存缓冲区(默认为100MB)达到溢写(spill)阀值时(默认80MB),便会启动溢写(spill)线程。溢写线程首先锁定这80MB数据并将其写到本地磁盘上,以一个临时文件(output/spillN.out, N表示溢写次数)的形式存放。在溢写过程中会进行排序(sort)、分组(group)、预规约(combine)的操作。其中本地排序是MapReduce默认的行为。溢写有一个重要细节是,如果map()函数处理的结果键值对中有很多个键值对的分区号一样,即需要交给同一个Reduce任务处理,那么需要将这些键值对进行拼接起来,这样只需要一个分区号就可以标记出这些键值对的分区。同时要进行分组合并,即value相同的键值对合并成<key, {value1, value2, value3, …}>,进一步减少了记录数据。
最后,如果用户定义了预规约(combine),那么就进行本地规约。最终,如果有很多次的溢写,就会产生多个临时文件,所以需要再对这些临时文件进行一次总的拼接、排序、分组、和预规约,生成Map任务的最终结果文件(output/file.out)。
- shuffle阶段:shuffle阶段实质是reduce阶段的第一步。
Reduce任务以HTTP方式从Map端将对应分区号的中间结果数据拷贝到本地磁盘或内存中,而Map端启动HTTP Server来满足这种并发请求。当一项作业(Job)中,成功完成的Map任务数达到总Map任务数的5%后,才开始shuffle。在shuffle阶段,Reduce任务端的TaskTracker会启动MapEventsFetcherThread线程从JobTracker上获取已经运行完成的Map任务列表,并保存到allMapEvents中。而ReduceTask会启动一个后台线程(GetMapEventsThread),周期性从所在的TaskTracker中获取该项作业中已完成的Map任务列表,并保存到映射表mapLocations中,该映射表保存了TaskTracker与已完成Map任务列表的映射关系。
- reduce阶段:reduce阶段可以分解为合并(merge)、排序(sort)、分组(group)、规约操作(reduce)、数据写(write)等子阶段。
Reduce任务在执行计算之前就是不断的远程拷贝当前Job里的每个Map任务的特定分区号的输出数据,同时对从不同Map任务端拷贝过来的数据做合并(merge),最后形成一个文件作为Reduce任务的输入数据。所以,对于一个Job而言,当所有的Map任务执行完毕,并且所有Map任务端的中间结果数据被拉取到各个相应的Reduce任务端,此时这些Reduce任务得到了它们各自全部的输入数据后才能真正开始进一步的计算操作。接着,对merge过后的键值对<key, value>按照key进行本地归并排序,通过排序,key相同的键值对聚集到一起形成若干形如<key, {value1, value2, value3, …}>的分组,然后每个键值对交给用户程序中编写的reduce()函数处理。最终将数据结果直接写到HDFS上作为该作业输出的一部分。
在Hadoop系统中调度方面主要涉及两个关键调度模块:Hadoop作业调度和MapReduce任务调度。作业调度是由不同调度器负责,它根据用户实际应用需求而设计实现。而在任务级别的选择策略中,Hadoop考虑的关键因素主要为数据本地性(data-locality),Hadoop将其封装成一个通用的模块供给调度器使用,具体存放在JobInProgress类中。
在Map任务调度中,现有的Hadoop系统对Map任务调度主要以数据本地性为考虑因素。就是将Map任务分配给一些节点,在这些节点上面存储着这个Map任务即将要处理的输入数据分片,所以就不需要通过远程拷贝的方式将存储在其他节点上的该Map任务的输入数据拉取到该任务节点上面再进行计算处理。由于Hadoop是针对大数据的处理平台,每个Map任务的输入数据称作一个输入数据分片,它与一个数据块的大小相同。所以提高任务数据本地性对加快Map任务的任务执行速度,Hadoop作业周转时间,避免冗余网络I/O,节省集群带宽资源等诸多方面有直接而且明显的影响,国内外众多学者在该方面的研究最为广泛和深入。通过对Hadoop源码研究,调度器直接调用JobInProgress中的obtainNewMapTask()函数来从某项作业中选择一个Map任务,此方法封装了所有调度器公用的Map任务选择策略模块。主要选择策略思想如下:
- 合法性检查。如果某节点执行该作业任务的失败次数达到阀值或该节点资源剩余不够执行该作业新任务,那么就不合法
- 从failedMaps列表中选择任务。优先选择运行失败次数最多的Map任务让其快速获得重新运行的机会,并且不再考虑数据本地性;
- 从nonRunningMapCache列表中选择任务。按照数据本地性原则选择尚未分配的MapTask;
- 从nonLocalMaps列表中选择。该列表中任务没有输入数据,所以无需考虑数据本地性。
- 从runningMapCache列表选择。查找是否存在拖延任务,尝试启动备份任务,这也是针对异构集群中节点间性能差异较大提出的一种负载均衡策略。
- 从nonLocalRunningMaps列表中查找无输入数据的拖延任务启动备份任务。
经过以上调度过程,最终为节点分配一个合适的MapTask。而在Reduce任务的调度中,Hadoop采取了非常简单的静态策略,Hadoop认为Reduce任务没有数据本地性。作业调度器同样通过调用JobInProgress实例封装的obtainNewReduceTask()函数,具体步骤如下,
- 合法性检查。磁盘空间及节点可靠性检查。
- 遍历nonRunningReduces列表,选择第一个满足条件的新ReduceTask。
- 如果未找到再从runningReduces列表中选择已分配过的Reduce Task,为慢ReduceTask启动备份任务,尝试再次执行。
主要参考:
- 董西成. Hadoop技术内幕: 深入解析MapReduce架构设计与实现原理
- 蔡斌;陈湘萍. Hadoop 技术内幕:深入解析Hadoop Common和HDFS架构设计与实现原理