MapReduce快速入门系列(6) | Shuffle之Partition分区

2020-10-28 15:30:19 浏览数 (1)

一. Shuffle之Partition分区

  今天我们讲的第五步,Partition分区操作。

  Partition分区:按照一定的分区规则,将key value的list进行分区。分区的创建分为默认的和自定义两种。

1.1. 默认Partition分区

代码语言:javascript复制
public class HashPartitioner<K,V> extends Partitioner<K,V>{
    public int getPartition(K key,V value, int numReduceTasks){
        return(key.hashCode()& Integer.MAX_VALUE) % numReduceTasks;
    }
}

  默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。既然用户不能控制存储到某个区间,能自定义么,答案是可以的。

1.2. 自定义Partitioner

  • 1. 自定义类继承Partitioner,重写getPartition()方法
代码语言:javascript复制
public class CustomPartitioner extends Partitioner<Text, FlowBean>{
    @Override
    public int getPartition(Text key,FlowBean value, int numReduceTasks){
        // 控制分区代码逻辑
        ... ...
            return Partition;
    }
}
  • 2. 在Job驱动中,设置自定义Partitioner

job.setPartitionerClass(CustomPartitioner.class);

  • 3. 自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask

job.setNumReduceTasks(5);

1.3. 分区总结

  1. 如果ReduceTask的数量>getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
  2. 如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;
  3. 如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件part-r-00000;
  4. 分区号必须从零开始,逐一累加。

1.4. 案例的简单分析

  假设自定义分区数为5,则

  1. job.setNumReduceTasks(1); 会正常运行,只不过会产生一个输出文件
  2. job.setNumReduceTasks(2); 会报错
  3. job.setNumReduceTasks(6); 大于5,程序会正常运行,会产生空文件

二. 案例分析

  案例继续采用《MapReduce系列(4) | Hadoop序列化》中的文档,有需要文档的可以到此章自行复制保存。

2.1. 需求

将统计结果按照手机归属地不同省份输出到不同文件中(分区)

  • 1. 输入数据
  • 2. 期望输出数据

  手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。

2.2. 需求分析

2.3 代码实现

  • 1. 在系列的基础上,增加一个分区类MyPartitioner
代码语言:javascript复制
package com.buwenbuhuo.partition;
import com.buwenbuhuo.flowsun.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * @author 卜温不火
 * @create 2020-04-23 20:27
 * com.buwenbuhuo.partition - the name of the target package where the new class or interface will be created.
 * mapreduce0422 - the name of the current project.
 */
public class MyPartitioner extends Partitioner<Text, FlowBean> {

    @Override
    public int getPartition(Text text, FlowBean flowBean, int numPartitions) {

        String phone = text.toString();

        switch (phone.substring(0, 3)) {
            case "136":
                return 0;
            case "137":
                return 1;
            case "138":
                return 2;
            case "139":
                return 3;
            default:
                return 4;
        }
    }
}
  • 2. 在驱动函数中增加自定义数据分区设置和ReduceTask设置
代码语言:javascript复制
package com.buwenbuhuo.partition;

/**
 * @author 卜温不火
 * @create 2020-04-23 14:14
 * com.buwenbuhuo.flowsun - the name of the target package where the new class or interface will be created.
 * mapreduce0422 - the name of the current project.
 */

import com.buwenbuhuo.flowsun.FlowBean;
import com.buwenbuhuo.flowsun.FlowDriver;
import com.buwenbuhuo.flowsun.FlowMapper;
import com.buwenbuhuo.flowsun.FlowReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class PartitionerDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 1 获取job实例
        Job job = Job.getInstance(new Configuration());

        // 2.设置类路径
        job.setJarByClass(PartitionerDriver.class);

        // 3 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);


        // 8 指定自定义数据分区
        job.setPartitionerClass(MyPartitioner.class);

        // 9 同时指定相应数量的reduce task
        job.setNumReduceTasks(5);

        // 4 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 5 指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 6 指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path("d:\input"));
        FileOutputFormat.setOutputPath(job, new Path("d:\output"));

        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

2.4 运行并查看结果

  • 1. 运行
  • 2. 查看

0 人点赞