第7章 MapReduce进阶
7.1 MapReduce过程
7.1.1 input
一般而言,数据文件都会上传到HDFS上,也就是说HDFS上的文件作为MapReduce的输入。已知block块大小是128M(Hadoop 2.x默认的blockSize是128MB,Hadoop 1.x默认的blockSize是64MB)。MapReduce计算框架首先会用InputFormat的子类FileInputFormat类对输入文件进行切分,形成输入分片(InputSplit)。每个InputSplit分片将作为一个Map任务的输入,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组。也就是说,InputSplit只是对输入数据进行逻辑上切分,并不会将物理文件切分成片进行存储。
确定InputSplit的大小是个重要的问题。
(1)如果mapred-site.xml中没有设置分片的范围,InputSplit分片大小是由block块数决定的,即是InputSplit分片大小等于block块大小。比如把一个258MB的文件上传到HDFS上,默认block块大小是128MB,那么它就会被分成3个block块,与之对应产生3个InputSplit分片,所以最终会产生3个map任务。注意:第3个block块大小是128MB,但是该块中的文件只有只有2MB,而非128M一个块的大小。 (2)如果在mapred-site.xml中设置mapred.min.split.size和mapred.max.split.size,则可控制InputSplit的大小。那么InputSplit分片到底是多大呢?
代码语言:javascript复制minSize=max{minSplitSize,mapred.min.split.size}
maxSize=mapred.max.split.size
splitSize=max{minSize,min{maxSize,blockSize}}
其中,参数mapred.min.split.size的默认值为1个字节,minSplitSize随着File Format的不同而不同。mapred.max.split.size默认为Long.MAX_VALUE = 9223372036854775807。 InputSplit分片大小的下限是max {mapred.min.split.size, minSplitSize},上限是mapred.max.split.size。
FileInputFormat类提供了computeSplitSize方法计算InputSplit的大小,代码如下:
代码语言:javascript复制 public List<InputSplit> getSplits(JobContext job) throws IOException {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
...
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
...
}
protected long computeSplitSize(long blockSize, long minSize,long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
由上面的InputSplit分片大小计算公式可知,一个InputSplit分片可以大于一个block块,也可以小于一个block块。对于Map任务来说,处理的单位是一个InputSplit。请注意InputSplit是一个逻辑概念,InputSplit所包含的数据仍然存储在HDFS的block块中。 YARN在进行任务调度时,会优先考虑本节点的数据。如果本节点没有可处理的数据或者还需要其他节点的数据,Map任务就必须从其他节点将数据通过网络传递到本节点,性能受到影响。如果InputSplit的大小大于block块大小时,Map任务就必须从其他节点读取数据,这样就不能很好实现数据本地性。所以,InputSplit的大小尽量等于block块大小,以提高Map任务的数据本地性。
InputSplit的数量是文件大小除以splitSize。
7.1.2 Map输出
InputSplit将键值数据传给map方法处理后,输出中间结果到本地磁盘。在这个过程中,为了保证IO效率,采用了先写入内存的环形缓冲区,并做一次预排序。
- 首先map输出到内存中的一个环状的内存缓冲区,如下图中“(1)”部分所示,缓冲区的大小默认为100MB(可通过修改配置项mpareduce.task.io.sort.mb进行修改)。
- 然后,当写入内存缓冲区的大小到达一定比例时,默认为80%(可通过mapreduce.map.sort.spill.percent配置项修改),将启动一个溢写线程将内存缓冲区的内容溢写到磁盘(spill to disk),如下图“(2)”部分所示。这个溢写线程是独立的,不影响map向缓冲区写结果的线程。当溢写线程启动后,需要对80M的空间内的数据按照key进行排序。溢写线程会在磁盘中新建一个溢出写文件(图中“(2)”部分已经有了3个溢写出文件),溢写线程默认根据数据键值对溢写出文件进行分区(patition),接着后台线程将根据数据最终要传送到的Reduce把内存缓冲区中的数据写入溢出写文件对应分区。在每个patition分区,后台线程按键值进行内排序,此时如果有一个Combiner,则会在排序后的输出上运行。默认的patition分区算法是将每个键值对的键的Hash值与reducer数量进行模运算得到patition值。
- 随着map处理,map输出数据增多,磁盘中溢写文件文件的数据也在增加。这就需要将磁盘中的多个小的溢写文件合并成一个大文件,如图中”(3)”部分所示。注意,合并后的大文件已经进行了分区,每个分区内进行了排序,该大文件就是Map任务的输出结果。
- 为了提高磁盘IO性能,可以将Map输出进行压缩,这样磁盘书写熟读提高,可以介绍磁盘空间,减少传递给Reducer的数据量。注意,默认情况下,map输出是不压缩的,可以在mapred-site.xml文件中配置mapreduce.output.fileoutputformat.commpress值为true,即可开启压缩功能。
总结:map过程的输出是写入本地磁盘而不是HDFS,但是一开始数据并不是直接写入磁盘而是缓冲在内存中,缓存的好处就是减少磁盘I/O的开销,提高合并和排序的速度。又因为默认的内存缓冲大小是100M(当然这个是可以配置的),所以在编写map函数的时候要尽量减少内存的使用,为shuffle过程预留更多的内存,因为该过程是最耗时的过程。
7.1.3 Reducer输入 (Shuffle阶段)
MapReduce确保每个reducer的输入input都是按照key 排序的。Shuffler 就是mapper和reducer中间的一个步骤,也就是将map的输出转换为 reduce的输入的过程。在shuffle过程中,可以把mapper的输出按照某种key值重新切分和组合成n份,把key值符合某种范围的输送到特定的reducer端处理。shuffle是MapReduce运行的的核心,也是面试中经常被问到的地方。shuffle本意洗牌、混洗,下图是官方对Shuffle过程的描述。
Map输出结果时进行了Partitioner分区操作。其实Partitioner分区操作和map阶段的输入分片(Input split)很像,一个Partitioner对应一个reduce作业,如果我们mapreduce操作只有一个reduce操作,那么Partitioner就只有一个,如果我们有多个reduce操作,那么Partitioner对应的就会有多个,Partitioner因此就是reduce的输入分片,主要是根据实际key和value的值,根据实际业务类型或者为了更好的reduce负载均衡要求进行,这是提高reduce效率的一个关键所在。
一个Map任务的输出,可能被多个Reduce任务抓取。每个Reduce任务可能需要多个Map任务的输出作为其特殊的输入文件,而每个Map任务的完成时间可能不同,当有一个Map任务完成时,Reduce任务就开始运行。Reduce任务根据分区号在多个Map输出中抓取(fetch)对应分区的数据,这个过程也就是Shuffle的copy过程。复制操作时reduce会开启几个复制线程,这些线程默认个数是5个。这个复制过程和map写入磁盘过程类似,也有阀值和内存大小,阀值一样可以在配置文件里配置,而内存大小是直接使用reduce的tasktracker的内存大小,复制时候reduce还会进行排序操作和合并文件操作。 如果map输出很小,则会被复制到Reducer所在节点的内存缓冲区,缓冲区的大小可以通过mapred-site.xml文件中的mapreduce.reduce.shuffle.input.buffer.percent指定。一旦Reducer所在节点的内存缓冲区达到阀值,或者缓冲区中的文件数达到阀值,则合并溢写到磁盘。 如果map输出较大,则直接被复制到Reducer所在节点的磁盘中。 随着Reducer所在节点的磁盘中溢写文件增多,后台线程会将它们合并为更大且有序的文件。 当完成复制map输出,进入sort阶段。这个阶段通过归并排序逐步将多个map输出小文件合并成大文件。最后几个通过归并合并成的大文件作为reduce的输出。
当Reducer的输入文件确定后,整个Shuffle操作才最终结束。之后就是Reducer的执行了,最后Reducer会把结果存到HDFS上。
在Hadoop集群环境中,大部分map 任务与reduce任务的执行是在不同的节点上。当然很多情况下Reduce执行时需要跨节点去拉取其它节点上的map任务结果。如果集群正在运行的job有很多,那么task的正常执行对集群内部的网络资源消耗会很严重。这种网络消耗是正常的,我们不能限制,能做的就是最大化地减少不必要的消耗。还有在节点内,相比于内存,磁盘IO对job完成时间的影响也是可观的。从最基本的要求来说,我们对Shuffle过程的期望可以有:
- 完整地从map task端拉取数据到reduce 端。
- 在跨节点拉取数据时,尽可能地减少对带宽的不必要消耗。
- 减少磁盘IO对task执行的影响。
7.1.4 排序
排序贯穿Map任务和Reduce任务,在MapReduce计算框架中,主要用到两种排序算法:快速排序和归并排序。在Map任务发生了2次排序,Reduce任务发生一次排序。 (1)第1次排序发生在Map输出的内存环形缓冲区,使用快速排序。当缓冲区达到阀值时,在溢写到磁盘之前,后台线程会将缓冲区的数据划分成相应分区,在每个分区中按照键值进行内排序。 (2)第2次排序是在Map任务输出的磁盘空间上将多个溢写文件归并成一个已分区且有序的输出文件。由于溢写文件已经经过一次排序,所以合并溢写文件时只需一次归并排序即可使输出文件整体有序。 (3)第3次排序发生在Shuffle阶段,将多个复制过来的Map输出文件进行归并,同样经过一次归并排序即可得到有序文件。
排序的逻辑: 《Hadoop权威指南》第3版292页已经给出了“控制排序顺序”: Key的数据类型的排序逻辑是由RawComparator控制的,规则如下。 (1)若属性mapred.output.key.comparator.class已经被显式设置,或者通过Job类的setSortComparatorClass()方法进行设置,则使用该类的实例。 (2)否则,键必须是WritableComparable的子类,并使用针对该键类的已登记的Comparator。 (3)如果还没有已登记的Comparator,则使用RawComparator将字节流反序列化为一个对象,再由WritableComparable的compareTo()方法进行操作。