【大数据】MapReduce组件 :Partition分区和排序
问题引出
要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
类比于新生<k,v>入学,不同的学生实现分配好了宿舍,然后进入到不同的宿舍(reduce task),如果map发送来的数据量太大,意味着这些数据都到这个默认reduce节点执行,没有发挥reduce并行计算的目的,IO压力也很大。 这就是分区的原因。
默认分区是根据key的hashCode对Reduce Tasks个数取模得到的。用户没法控制哪个key存储到哪个分区。
代码语言:javascript复制public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE
}
}
a) 默认下分配一个区
b) 分配几个区,则对应几个reduce任务,每个任务在执行的时候都会公用reduce内的代码
c) 自定义分区,返回的分区数量一定要和定义的reduce任务相同,具体来说就是:自定义分区类 extends HashPartitioner,重写getPartition时,返回的分支个数要和job.setNumReduceTasks(X); 中的X个数相同。
d) 自定义分区要借助自定义分区器Partitioner,工作原理如(图).从图上看出有一个分区就要有一个reduce,每个reduce对不同分区结果的进行处理后输出到不同的part-r--0000X中。
案例:
将统计结果按照手机归属地不同省份输出到不同文件中(分区)
(1)输入数据
(2)输入格式
代码语言:javascript复制id 手机号码 网络ip 网址 上行流量 下行流量 网络状态码
1, 13736230513, 192.196.100.1, www.atguigu.com, 2481, 24681, 200
2, 13846544121, 192.196.100.2,, 264, 0, 200
3, 13956435636, 192.196.100.3,, 132, 1512, 200
4, 13966251146, 192.168.100.1,, 240, 0, 404
- 期望输出数据格式
137开头的文件
代码语言:javascript复制id 手机号码 网络ip 网址 上行流量 下行流量 网络状态码
1, 13736230513, 192.196.100.1, www.atguigu.com, 2481, 24681, 200
138开头的文件
代码语言:javascript复制id 手机号码 网络ip 网址 上行流量 下行流量 网络状态码
2, 13846544121, 192.196.100.2,, 264, 0, 200
139开头的文件
代码语言:javascript复制id 手机号码 网络ip 网址 上行流量 下行流量 网络状态码
3, 13956435636, 192.196.100.3,, 132, 1512, 200
4, 13966251146, 192.168.100.1,, 240, 0, 404
思路:手机号作为key,行值作为value
(1)Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask。默认的分发规则为:根据key的hashcode%reducetask数来分发
(2)如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner
(3)设置一个自定义分区的FlowPartitioner继承抽象类:Partitione,手机136开头的放到第一个reduce来完成统计,输出结果在分区编号0中,同理137、138、139及其他的结果放置到分区编号1-4中;
(4)通过getPartition()方法开始筛选mappper输出的结果,通过对2中设置的比对,输出在不同的分区编号;
(5)在通过不同的reduce处理分区数据输出到不同的part-r-0000x中;
(6)在驱动类job中添加自定义分区类和任务数量:
代码语言:javascript复制 job.setPartitionerClass(CustomPartitioner.class)
job.setNumReduceTasks(5)
只听到从架构师办公室传来架构君的声音: 昼出耘田夜绩麻,村庄儿女各当家。有谁来对上联或下联?
增加一个FlowPartitioner.class
代码语言:javascript复制此代码由Java架构师必看网-架构君整理
package hdfs_demo.partiyioner;
import hdfs_demo.telFlow.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class FlowPartitioner extends Partitioner<Text, FlowBean> {
/**
* 返回分区号
* @param text
* @param flowBean
* @param numPartitions
* @return
*/
//进行分区
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
String phone = text.toString();//获取手机号
switch (phone.substring(0,3)){
case "136":
return 0;
case "137":
return 1;
case "138":
return 2;
case "139":
return 3;
default:
return 4;
}
}
}
在FlowDriver.class中增加分区设置:
代码语言:javascript复制package hdfs_demo.partiyioner;
import hdfs_demo.telFlow.FlowBean;
import hdfs_demo.telFlow.FlowMapper;
import hdfs_demo.telFlow.FlowReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//创建配置对象
Configuration conf = new Configuration();
//创建一个job对象
Job job = Job.getInstance(conf, "telFlowCount");
//mapreduce的启动类
job.setJarByClass(FlowDriver.class);
//设置mapper 和reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//设置map的输出类型 Text, FlowBean
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//设置reduce的输出类型 Text, FlowBean
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 在驱动类job中添加自定义分区类和任务数量
job.setPartitionerClass(FlowPartitioner.class);
job.setNumReduceTasks(5);
//设置输入数据路径
FileInputFormat.setInputPaths(job, new Path("G:\idea-workspace\hdfs_java_api\Resource\telinfo.txt"));
//设置reducer输出结果的路径
FileOutputFormat.setOutputPath(job, new Path("G:\idea-workspace\hdfs_java_api\Resource\result"));
//提交任务
boolean b = job.waitForCompletion(true);
System.out.println(b);
}
}
排序只需实现FlowBean中的compareTo方法即可
代码语言:javascript复制此代码由Java架构师必看网-架构君整理
public int compareTo(Object o) {
return 0;
}
}
代码语言:javascript复制 public int compareTo(Object o) {
FlowBean bean =(FlowBean )o;
// 倒序排列,从大到小
return this.sumFlow>bean.getSumFlow() ? -1 : 1;
}