【大数据哔哔集】是小编发起的每日大数据圈子了最高频、时尚、前沿的面试题目以及资讯等,欢迎您关注。
首先,我们祭出两张经典的图:
上图表示MapReduce的shuffle执行过程。
在MapReduce的shuffle过程中执行了三次排序,分别是:
- map的溢写阶段:根据分区以及key进行快速排序
- map的合并溢写文件:将同一个分区的多个溢写文件进行归并排序,合成大的溢写文件
- reduce输入阶段:将同一分区,来自不同map task的数据文件进行归并排序
此外,在MapReduce整个过程中,默认是会对输出的KV对按照key进行排序的,而且是使用快速排序。
- map输出的排序,其实也就是上面的溢写过程中的排序。
- reduce输出的排序,即reduce处理完数据后,MapReduce内部会自动对输出的KV按照key进行排序
具体的流程如下:
map端
- 每个map任务都有一个环形的内存缓冲区(图中的buffer in memory)用于存储任务输出。缓冲区达到一定的阈值(默认80%),一条后台线程便开始把内容溢出(spill)到磁盘。每次内存缓冲区达到溢出阈值,就会新建一个溢出文件(spill file)。
- 在写磁盘之前,线程首先根据数据最终要传的reduce把数据划分成相应的分区(partition)(图中partitions)。在每个分区中,后台线程按键进行内存中排序(排序是在map端进行的)。如果有combiner函数就会在排序后的输出上运行,为了让map输出结果更加紧凑。
- 在任务完成之前,溢出文件被合并成一个已分区且已排序的输出文件(图中merge on disk)。如果溢出文件多于设置的数量,combiner就会在输出文件写到磁盘之前再次运行。
reduce端
- 复制阶段(图中Copy Phase),如果map的输出相当小,会被复制到reduce任务的JVM内存中;否则map输出被复制到磁盘。随着磁盘上副本增多,后台线程会将它们合并为更大的、排好序的文件。
- 排序阶段(图中“Sort” Phase),准确的说是合并阶段。复制完成map的输出后,将合并map输出,维持其顺序排序。最后一趟的合并来自内存和磁盘片段。
- Reduce阶段,执行reduce任务,把最后一趟合并的数据直接输入reduce函数,从而省略了一次磁盘往返行程。
那么,什么是二次排序呢?
假设有如下一组数据:
代码语言:javascript复制1 1
3 3
2 2
1 5
1 3
2 1
现在需要 MapReduce 程序先对其第一个字段排序,再对第二个字段进行排序。最终会得到如下结果:
代码语言:javascript复制1 1
1 3
1 5
2 1
2 2
3 3
我们在根据key进行排序时,如果key是一个复合对象,即该对象中包含多个成员属性,那么在进行key比较时,就会涉及到多个属性间的比较,而如果compareTo() 方法中,比较条件为两个的话,这就是所谓的二次排序。
辅助排序也叫分组排序,是指在reduce前的group过程中根据排序规则进行的分组,因为分组的时候是需要比较KV中key是否相同,如果相同才会归为同一个组,如果不相等,就归为不同的组,所以就涉及到key比较方法了。总的来说其实定义key在什么情况下才相等。这个过程可以自己定义分组的方法,也就是分组排序的实现类。
使用方法:
1、自定义分组类,继承 WritableComparator
2、调用父类的构造方法,创建实例
3、重写父类的 compare方法
一个经典的辅助排序的案例如下:
代码语言:javascript复制import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class OrderGroupingComparator extends WritableComparator {
protected OrderGroupingComparator() {
super(OrderBean.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean aBean = (OrderBean) a;
OrderBean bBean = (OrderBean) b;
int result;
if (aBean.getOrder_id() > bBean.getOrder_id()) {
result = 1;
} else if (aBean.getOrder_id() < bBean.getOrder_id()) {
result = -1;
} else {
result = 0;
}
return result;
}
}
编写好自定义的排序类之后,需要在job中指定好自定义的分组类:job.setGroupingComparatorClass(OrderGroupCompartor.class);
这样,我们就可以使用辅助排序了。
好了,到此 Hadoop 中的排序你清楚了吗?
版权声明:
本文为《大数据真好玩》原创整理,转载需作者授权。未经作者允许转载追究侵权责任。
责编 | 大数据真好玩
插画 | 大数据真好玩
微信公众号 | 大数据真好玩