二、外部排序
外部排序是指当数据量太大无法全部载入内存时,需要将数据分割成多个小块进行排序,然后再将排序后的小块合并成一个大的有序块。在MapReduce中,外部排序通常是在Reduce端进行的,即每个Reduce任务将它们处理的数据进行排序,然后将排序后的结果合并成一个有序的输出文件。
外部排序的实现方法有多种,包括归并排序、堆排序、快速排序等。其中,归并排序也是一种常用的排序算法,因为它可以很好地应用于外部排序场景,能够处理大规模数据集。
下面是一个使用归并排序进行外部排序的MapReduce程序示例:
代码语言:javascript复制public class ExternalSort {
public static class Map extends Mapper<LongWritable, Text, IntWritable, Text> {
private IntWritable score = new IntWritable();
private Text name = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] parts = value.toString().split(",");
name.set(parts[0]);
score.set(Integer.parseInt(parts[1]));
context.write(score, name);
}
}
public static class Reduce extends Reducer<IntWritable, Text, Text, IntWritable> {
private List<Text> nameList = new ArrayList<Text>();
private IntWritable score = new IntWritable();
public void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
nameList.clear();
for (Text value : values) {
nameList.add(new Text(value));
}
Collections.sort(nameList);
for (Text name : nameList) {
context.write(name, key);
}
}
}
public static class MergeReducer extends Reducer<IntWritable, Text, Text, IntWritable> {
private List<Text> nameList = new ArrayList<Text>();
private IntWritable score = new IntWritable();
public void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
nameList.clear();
for (Text value : values) {
nameList.add(new Text(value));
}
Collections.sort(nameList);
for (Text name : nameList) {
context.write(name, key);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "ExternalSort");
job.setJarByClass(ExternalSort.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(10);
job.setSortComparatorClass(IntWritable.Comparator.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1] "/temp"));
job.waitForCompletion(true);
Job mergeJob = new Job(conf, "MergeSort");
mergeJob.setJarByClass(ExternalSort.class);
mergeJob.setReducerClass(MergeReducer.class);
mergeJob.setOutputKeyClass(IntWritable.class);
mergeJob.setOutputValueClass(Text.class);
mergeJob.setNumReduceTasks(1);
mergeJob.setSortComparatorClass(IntWritable.Comparator.class);
for (int i = 0; i < 10; i ) {
FileInputFormat.addInputPath(mergeJob, new Path(args[1] "/temp/part-r-" String.format("d", i)));
}
FileOutputFormat.setOutputPath(mergeJob, new Path(args[1]));
System.exit(mergeJob.waitForCompletion(true) ? 0 : 1);
}
}
在这个例子中,Map任务读入的数据是以“姓名,成绩”的形式存储的。Map任务将每个学生的成绩作为key,姓名作为value输出,以便Reduce任务进行排序。Reduce任务将每个成绩相同的学生姓名存储到一个列表中,然后对列表进行排序,并按照学生姓名从小到大的顺序输出。注意,在这个例子中,Reduce任务的输出key是IntWritable类型,value是Text类型。
为了提高排序的效率,可以使用多个Reduce任务对数据进行排序。在本例中,我们使用了10个Reduce任务对数据进行排序,每个任务处理的数据量约为总数据量的1/10。
在排序完成后,我们需要将所有Reduce任务的输出合并成一个有序的输出文件。这可以通过另一个MapReduce任务来实现,我们称之为“合并排序”任务。在这个任务中,我们只需要将所有Reduce任务的输出作为输入,再次进行排序即可。
需要注意的是,在这个例子中,我们使用了“IntWritable.Comparator”作为排序器。这个排序器是IntWritable类型的默认排序器,它会按照数字的大小进行排序。如果我们要按照其他方式进行排序,例如按照字典序对字符串进行排序,就需要自定义一个排序器,并在MapReduce任务中指定使用该排序器。