MapReduce之二次排序

2022-09-23 21:48:30 浏览数 (1)

应用场景

     面试官问了一个MapReduce问题:“如何用MapReduce实现两个表的join连接”。

我说“用两个job实现,第一个。。。,第二个。。。”

“还要用两个?二次排序会不会?”

“不会”。于是这两天回来看了下MapReduce的二次排序。

在某些情况下,需要对reduce中的value进行排序。而这时,可以利用二次排序。二次排序,可以将根据key聚合起来的valueList根据value进行排序。

例子

输入数据

输出数据

1 2 3 4 5 6 7 8 7 82 12 211 20 21 20 53 20 522 31 42 40 511 50 51 50 52 50 53 50 53 50 54 50 62 50 512 50 522 60 51 60 52 60 53 60 56 60 56 60 57 60 57 60 61 63 61 70 54 70 55 70 56 70 57 70 58 70 58 71 55 71 56 73 57 74 58 203 21 530 54 730 54 740 58

1 2 3 4 5 6 7 8 7 82 12 211 20 21 20 53 20 522 31 42 40 511 50 51 50 52 50 53 50 53 50 54 50 62 50 512 50 522 60 51 60 52 60 53 60 56 60 56 60 57 60 57 60 61 63 61 70 54 70 55 70 56 70 57 70 58 70 58 71 55 71 56 73 57 74 58 203 21 530 54 730 54 740 58

二次排序解决思路

以前一直以为Reduce中reduce(key,valueList)中键值对的各个key是一样的,而从对MapReduce的深入学习中,知道了只要满足一定条件,这些key是可以不一样的。我先分析下mapreduce过程。

原理

    map任务快结束时,会依次调用以下几个函数:

    1.job.setPartitionerClass()对键值对列表根据<key,value>进行分区,每个分区映射到一个reducer。默认的分区函数是HashPartion。

    2.job.setSortComparatorClass()设置的key比较函数类排序,默认根据key的compareTo进行比较。

   reduce阶段:

    1.接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass()设置的key比较函数类对所有数据对排序。

    2.之后构造一个key对应的value迭代器。这时就要用到分组,使用job.setGroupingComparatorClass()设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。

    3.之后,执行Reducer的reduce()函数,该函数的输入是所有在同一个分组内的(key,value)

思路

    这样如果要进行二次排序,可以这样设计:以前的键值对是<key,value>,新的键是键(key value)的组合,键值对是<键(key value),value>。

    setGroupingComparatorClass()根据键(key value)的key进行比较,这样就可以将具有相同key的<键(key value),value>分在一个reduce里面进行操作。键(key value)的compareTo()首先进行key的比较,再进行value的比较。完成之后reduce输入的<键(key value),valueList>中valueList就是排好序的value列表。

示例代码

代码语言:javascript复制
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.examples;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * This is an example Hadoop Map/Reduce application.
 * It reads the text input files that must contain two integers per a line.
 * The output is sorted by the first and second number and grouped on the 
 * first number.
 *
 * To run: bin/hadoop jar build/hadoop-examples.jar secondarysort
 *            <i>in-dir</i> <i>out-dir</i> 
 */
public class SecondarySort {
 
  /**
   * Define a pair of integers that are writable.
   * They are serialized in a byte comparable format.
   */
  public static class IntPair 
                      implements WritableComparable<IntPair> {
    private int first = 0;
    private int second = 0;
    
    /**
     * Set the left and right values.
     */
    public void set(int left, int right) {
      first = left;
      second = right;
    }
    public int getFirst() {
      return first;
    }
    public int getSecond() {
      return second;
    }
    /**
     * Read the two integers. 
     * Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1
     */
    @Override
    public void readFields(DataInput in) throws IOException {
      first = in.readInt()   Integer.MIN_VALUE;
      second = in.readInt()   Integer.MIN_VALUE;
    }
    @Override
    public void write(DataOutput out) throws IOException {
      out.writeInt(first - Integer.MIN_VALUE);
      out.writeInt(second - Integer.MIN_VALUE);
    }
    @Override
    public int hashCode() {
      return first * 157   second;
    }
    @Override
    public boolean equals(Object right) {
      if (right instanceof IntPair) {
        IntPair r = (IntPair) right;
        return r.first == first && r.second == second;
      } else {
        return false;
      }
    }
    /** A Comparator that compares serialized IntPair. */ 
    public static class Comparator extends WritableComparator {
      public Comparator() {
        super(IntPair.class);
      }

      public int compare(byte[] b1, int s1, int l1,
                         byte[] b2, int s2, int l2) {
        return compareBytes(b1, s1, l1, b2, s2, l2);
      }
    }

    static {                                        // register this comparator
      WritableComparator.define(IntPair.class, new Comparator());
    }

    @Override
    public int compareTo(IntPair o) {
      if (first != o.first) {
        return first < o.first ? -1 : 1;
      } else if (second != o.second) {
        return second < o.second ? -1 : 1;
      } else {
        return 0;
      }
    }
  }
  
  /**
   * Partition based on the first part of the pair.
   */
  public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
    @Override
    public int getPartition(IntPair key, IntWritable value, 
                            int numPartitions) {
      return Math.abs(key.getFirst() * 127) % numPartitions;
    }
  }

  /**
   * Compare only the first part of the pair, so that reduce is called once
   * for each value of the first part.
   */
  public static class FirstGroupingComparator 
                implements RawComparator<IntPair> {
    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
      return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, 
                                             b2, s2, Integer.SIZE/8);
    }

    @Override
    public int compare(IntPair o1, IntPair o2) {
      int l = o1.getFirst();
      int r = o2.getFirst();
      return l == r ? 0 : (l < r ? -1 : 1);
    }
  }

  /**
   * Read two integers from each line and generate a key, value pair
   * as ((left, right), right).
   */
  public static class MapClass 
         extends Mapper<LongWritable, Text, IntPair, IntWritable> {
    
    private final IntPair key = new IntPair();
    private final IntWritable value = new IntWritable();
    
    @Override
    public void map(LongWritable inKey, Text inValue, 
                    Context context) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(inValue.toString());
      int left = 0;
      int right = 0;
      if (itr.hasMoreTokens()) {
        left = Integer.parseInt(itr.nextToken());
        if (itr.hasMoreTokens()) {
          right = Integer.parseInt(itr.nextToken());
        }
        key.set(left, right);
        value.set(right);
        context.write(key, value);
      }
    }
  }
   /**
   * A reducer class that just emits the sum of the input values.
   */
  public static class Reduce 
         extends Reducer<IntPair, IntWritable, Text, IntWritable> {
    private static final Text SEPARATOR = 
      new Text("------------------------------------------------");
    private final Text first = new Text();
    
    @Override
    public void reduce(IntPair key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      context.write(SEPARATOR, null);
      first.set(Integer.toString(key.getFirst()));
      for(IntWritable value: values) {
        context.write(first, value);
      }
    }
  }
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: secondarysort <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "secondary sort");
    job.setJarByClass(SecondarySort.class);
    job.setMapperClass(MapClass.class);
    job.setReducerClass(Reduce.class);

    // group and partition by the first int in the pair
    job.setPartitionerClass(FirstPartitioner.class);
    job.setGroupingComparatorClass(FirstGroupingComparator.class);

    // the map output is IntPair, IntWritable
    job.setMapOutputKeyClass(IntPair.class);
    job.setMapOutputValueClass(IntWritable.class);

    // the reduce output is Text, IntWritable
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

}

二次排序实现两个表的join

我们可以用二次排序来实现两个表的join

如果希望表A和表B根据字段ID join在一起,处理表A时输出<(ID A),记录>,处理表B时输出<(ID B),记录>,mapper中根据ID进行partition,这可以让具有相同ID的记录放在一个reducer中处理。reducer中进行分组时,根据ID分组,使得一个reduce()能处理相同ID的记录列表。那么该reduce()函数处理的记录列表中,前面那部分为表A的记录,后面为表B的记录。这样,你在程序中就可以将这两部分记录join了。

参考资料

1. http://my.oschina.net/u/556624/blog/140719

0 人点赞