【大数据哔哔集20210107】聊聊MapReduce中的排序/二次排序/辅助排序

2021-01-26 16:32:04 浏览数 (1)

【大数据哔哔集】是小编发起的每日大数据圈子了最高频、时尚、前沿的面试题目以及资讯等,欢迎您关注。

首先,我们祭出两张经典的图:

上图表示MapReduce的shuffle执行过程。

在MapReduce的shuffle过程中执行了三次排序,分别是:

  • map的溢写阶段:根据分区以及key进行快速排序
  • map的合并溢写文件:将同一个分区的多个溢写文件进行归并排序,合成大的溢写文件
  • reduce输入阶段:将同一分区,来自不同map task的数据文件进行归并排序

此外,在MapReduce整个过程中,默认是会对输出的KV对按照key进行排序的,而且是使用快速排序。

  • map输出的排序,其实也就是上面的溢写过程中的排序。
  • reduce输出的排序,即reduce处理完数据后,MapReduce内部会自动对输出的KV按照key进行排序

具体的流程如下:

map端
  • 每个map任务都有一个环形的内存缓冲区(图中的buffer in memory)用于存储任务输出。缓冲区达到一定的阈值(默认80%),一条后台线程便开始把内容溢出(spill)到磁盘。每次内存缓冲区达到溢出阈值,就会新建一个溢出文件(spill file)。
  • 在写磁盘之前,线程首先根据数据最终要传的reduce把数据划分成相应的分区(partition)(图中partitions)。在每个分区中,后台线程按键进行内存中排序(排序是在map端进行的)。如果有combiner函数就会在排序后的输出上运行,为了让map输出结果更加紧凑。
  • 在任务完成之前,溢出文件被合并成一个已分区且已排序的输出文件(图中merge on disk)。如果溢出文件多于设置的数量,combiner就会在输出文件写到磁盘之前再次运行。
reduce端
  • 复制阶段(图中Copy Phase),如果map的输出相当小,会被复制到reduce任务的JVM内存中;否则map输出被复制到磁盘。随着磁盘上副本增多,后台线程会将它们合并为更大的、排好序的文件。
  • 排序阶段(图中“Sort” Phase),准确的说是合并阶段。复制完成map的输出后,将合并map输出,维持其顺序排序。最后一趟的合并来自内存和磁盘片段。
  • Reduce阶段,执行reduce任务,把最后一趟合并的数据直接输入reduce函数,从而省略了一次磁盘往返行程。

那么,什么是二次排序呢?

假设有如下一组数据:

代码语言:javascript复制
1   1
3   3
2   2
1   5
1   3
2   1

现在需要 MapReduce 程序先对其第一个字段排序,再对第二个字段进行排序。最终会得到如下结果:

代码语言:javascript复制
1   1
1   3
1   5
2   1
2   2
3   3

我们在根据key进行排序时,如果key是一个复合对象,即该对象中包含多个成员属性,那么在进行key比较时,就会涉及到多个属性间的比较,而如果compareTo() 方法中,比较条件为两个的话,这就是所谓的二次排序

辅助排序也叫分组排序,是指在reduce前的group过程中根据排序规则进行的分组,因为分组的时候是需要比较KV中key是否相同,如果相同才会归为同一个组,如果不相等,就归为不同的组,所以就涉及到key比较方法了。总的来说其实定义key在什么情况下才相等。这个过程可以自己定义分组的方法,也就是分组排序的实现类。

使用方法:

1、自定义分组类,继承 WritableComparator

2、调用父类的构造方法,创建实例

3、重写父类的 compare方法

一个经典的辅助排序的案例如下:

代码语言:javascript复制
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderGroupingComparator extends WritableComparator {

    protected OrderGroupingComparator() {
        super(OrderBean.class, true);
    }

    @SuppressWarnings("rawtypes")
    @Override
    public int compare(WritableComparable a, WritableComparable b) {

        OrderBean aBean = (OrderBean) a;
        OrderBean bBean = (OrderBean) b;

        int result;
        if (aBean.getOrder_id() > bBean.getOrder_id()) {
            result = 1;
        } else if (aBean.getOrder_id() < bBean.getOrder_id()) {
            result = -1;
        } else {
            result = 0;
        }

        return result;
    }
}

编写好自定义的排序类之后,需要在job中指定好自定义的分组类:job.setGroupingComparatorClass(OrderGroupCompartor.class);

这样,我们就可以使用辅助排序了。

好了,到此 Hadoop 中的排序你清楚了吗?

版权声明:

本文为《大数据真好玩》原创整理,转载需作者授权。未经作者允许转载追究侵权责任。

责编 | 大数据真好玩

插画 | 大数据真好玩

微信公众号 | 大数据真好玩

0 人点赞