Hadoop-Shuffle洗牌过程,与combine和partition的关系「建议收藏」

2022-08-09 14:38:09 浏览数 (1)

大家好,又见面了,我是你们的朋友全栈君。

Shuffle的正常意思是洗牌或弄乱,是MapReduce的核心。

下图展示了Shuffle包含的步骤:

phase:阶段

partitions:分开,隔离

marge/combine:合并

上图包含了整个mapreduce过程,更准确的说shuffle包含partitions和sort、combine(merge)过程,对应map到reduce之间的过程,不包括map和reduce。

我们来分析分析整个流程:

1,在map task执行时,它的输入数据来源于HDFS的block,当然在MapReduce概念中,map task只读取split。

2,在经过mapper的运行后,也就是上图map阶段后,我们得知mapper的输出是这样一个key/value对,然后这个结果要交给reduce处理,那么怎么知道要交给那个reduce去处理呢?这里就需要partitioner接口处理了,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。因为这样以期望能够达到负载均衡,以后的Reducer就会根据partition来读取自己对应的数据。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。

这里注意,经过partitioner后,同一个map task的处理结果会可能放到不同reduce端处理,即使map处理的split文件只有一个文本,那这个文本上的数据有可能在不同的reduce处理

当然对于partitioner是如何分配map的处理结果到reduce的原理这里小编也不清楚,有懂的朋友,欢迎留言赐教。

3,在知道map的结果要到那个reduce处理后,下面的步骤可见下图:

把map结果写入内存缓冲区中,缓冲区的作用是批量收集map结果,减少磁盘IO的影响,因为缓存区输入内存,速度较与磁盘通讯快。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。

4,这个内存缓冲区是有大小限制的,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写,字面意思很直观。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程,就是缓冲区可以变写入map处理结果,边溢写到磁盘。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。会形成多个溢写文件,对应第一个图的横向的三个partitions。

5,当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。

这里我们先看下图:这是个wordcount例子(没有partition的理想化操作)

上图也表明了溢写线程启动后先做map阶段的sort排序,再做map阶段的combine合并(combiner没有默认的实现,需要显式的设置在conf中才有作用。)这样一个split的溢写就完成了。注意这里做sort和combine的前提是处理的数据要在同一个partitioner,否则即使是相同的两个字符,即便实在同一个文件也不会做combine合并。而是在最后的reduce阶段做合并相加。上图可以理解没有partition的理想化操作

6,每次溢写会在磁盘上生成一个溢写文件,如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个溢写文件存在。当map task真正完成时,内存缓冲区中的数据也全部溢写到磁盘中形成一个溢写文件。最终磁盘中会至少有一个这样的溢写文件存在(如果map的输出结果很少,当map执行完成时,只会产生一个溢写文件),因为最终的文件只有一个,所以需要将这些溢写文件归并到一起,这个过程就叫做map的Merge。注意,因为是合并多个溢写文件,这些文件中也有可能存在相同的key,所以如果client设置了combine也会执行合并操作。

这里多次出现了combine合并操作,但是并不是所有的job都适用combiner,只有操作满足结合律的才可设置combiner。combine操作类似于:opt(opt(1, 2, 3), opt(4, 5, 6))。如果opt为求和、求最大值的话,可以使用,但是如果是求中值的话,不适用。如果适用我们是建议执行combine操作的,因为如果有很多个key/value对需要发送到某个reduce端去,那么需要将这些key/value值拼接到一块,减少与partition相关的索引记录。

7,至此,map端的所有工作都已结束,map的shuffle也结束了。最终生成的这个文件也存放在TaskTracker够得着的某个本地目录内。每个reduce task不断地通过RPC从JobTracker那里获取map task是否完成的信息,如果reduce task得到通知,获知某台TaskTracker上的map task执行完成,Shuffle的后半段过程开始启动。

提示:上面的步骤我们都是map阶段的,对于本文的第一个图我们要一直看着左半边的图。因为途中有多个sort和merge,所以我这里也在文中特别提示了map阶段的sort排序和map阶段的merge.

reduce阶段:

1, Copy过程,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求map task所在的TaskTracker获取map task的输出文件。因为map task早已结束,这些文件就归TaskTracker管理在本地磁盘中。 Copy过来的数据会先放入内存缓冲区中,注意这个内存缓冲区是reduce的内存缓存区了。注意看本文第一张图,copy到同一个reduce的数据不一定对应同一个map的所有数据哦。而是我们上面说的同一个partitoner区内的数据,所以有可能不同的map中有相同的partitioner,这样就会出现文中图一同一个map的最终spill文件指向多个reduce,而一个reduce的数据有来自多个map的最终spill文件。当然一个reduce是可以对应一个或多个partition的。这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。

当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。

—————-

这里说一下merge和combine的区别,前者是把两个文件合并成一个,但是不是会把里面相同的key相加,而是放到一个数组里面。而后者就是在merge后对相同的key相加为一个。

—————

2,merge过程:里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。merge有三种形式:1)内存到内存 2)内存到磁盘 3)磁盘到磁盘。默认情况下第一种形式不启用,让人比较困惑,是吧。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的那个文件。

3,Reducer的输入文件。不断地merge后,最后会生成一个“最终文件”。为什么加引号?因为这个文件可能存在于磁盘上,也可能存在于内存中。对我们来说,当然希望它存放于内存中,直接作为Reducer的输入,但默认情况下,这个文件是存放于磁盘中的。当Reducer的输入文件已定,整个Shuffle才最终结束。然后就是Reducer执行,把结果放到HDFS上。

注意点:

(1)同一个job,map阶段和reduce阶段是有先后的,要所有的map任务都结束了才能进行reduce阶段。

(2)map和reduce阶段都有sort排序和combine合并阶段

(3)combine合并是需要我们通过配置conf文件或,代码设置才能生效,不是默认的

(4)理清partitioner的作用才能理解本文图一

(5)默认一个split切片数据对应一个map任务,同一个split切片中的数据有一个或多个partition区。不同的split有可能有相同partition。最后无论属于哪个map任务,只要partition相同就会归到一个reduce,默认一个partition对应一个reduce,可配置。

(6)parititons (数据分割) 把map任务输出的中间结果按key的范围划分R份,(R是预先定义的reduce任务的个数),划分时通常使用hash函数,这样可以保证某一范围内的key一定是由一个reduce任务来处理,可以简化reduce的过程。默认一”类”partitions对应一个reduce。

本文细节内容比较多,希望读者能多读思考

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/106120.html原文链接:https://javaforall.cn

0 人点赞