MapReduce的排序和序列化
概述
序列化(Serialization)是指把结构化对象转化为字节流。
反序列化(Deserialization)是序列化的逆过程。把字节流转为结构化对象。
当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流,反之当要将接收到或从磁盘读取的字节流转换为对象,就要进行反序列化。
Java的序列化(Serializable)是一个重量级序列化框架,一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系…),不便于在网络中高效传输;所以,hadoop自己开发了一套序列化机制(Writable),精简,高效。不用像java对象类一样传输多层的父子关系,需要哪个属性就传输哪个属性值,大大的减少网络传输的开销。
Writable是Hadoop的序列化格式,hadoop定义了这样一个Writable接口。
一个类要支持可序列化只需实现这个接口即可。
代码语言:javascript复制public interface Writable {
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
}
另外 Writable 有一个子接口是 WritableComparable, WritableComparable 是既可实现序列化, 也可以对key进行比较, 我们这里可以通过自定义 Key 实现 WritableComparable 来实现我们的排序功能.
代码语言:javascript复制// WritableComparable分别继承Writable和Comparable
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
//Comparable
public interface Comparable<T> {
int compareTo(T var1);
}
Comparable接口中的comparaTo方法用来定义排序规则,用于将当前对象与方法的参数进行比较。
例如:o1.compareTo(o2);
如果指定的数与参数相等返回0。
如果指定的数小于参数返回 -1。
如果指定的数大于参数返回 1。
返回正数的话,当前对象(调用compareTo方法的对象o1)要排在比较对象(compareTo传参对象o2)后面,返回负数的话,放在前面。
需求
数据格式如下
a 1 a 9 b 3 a 7 b 8 b 10 a 5
要求:
第一列按照字典顺序进行排列
第一列相同的时候, 第二列按照升序进行排列
分析
实现自定义的bean来封装数据,并将bean作为map输出的key来传输
MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key。所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable,然后重写key的compareTo方法。
如果自定义的JavaBean要参与MapReduce运算,则必须进行序列化,必须实现Writable接口,如果该JavaBean作为K2,则必须实现WritableComparable接口,让JavaBean具有排序的功能
实现
自定义类型和比较器
代码语言:javascript复制public class SortBean implements WritableComparable<SortBean>{
private String word;
private int num;
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
@Override
public String toString() {
return word "t" num ;
}
//实现比较器,指定排序的规则
/*
规则:
第一列(word)按照字典顺序进行排列 // aac aad
第一列相同的时候, 第二列(num)按照升序进行排列
*/
/*
a 1
a 5
b 3
b 8
*/
@Override
public int compareTo(SortBean sortBean) {
//先对第一列排序: Word排序
int result = this.word.compareTo(sortBean.word);
//如果第一列相同,则按照第二列进行排序
if(result == 0){
return this.num - sortBean.num;
}
return result;
}
//实现序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(word);
out.writeInt(num);
}
//实现反序列
@Override
public void readFields(DataInput in) throws IOException {
this.word = in.readUTF();
this.num = in.readInt();
}
}
编写Mapper代码
代码语言:javascript复制public class SortMapper extends Mapper<LongWritable,Text,SortBean,NullWritable> {
/*
map方法将K1和V1转为K2和V2:
K1 V1
0 a 3
5 b 7
----------------------
K2 V2
SortBean(a 3) NullWritable
SortBean(b 7) NullWritable
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1:将行文本数据(V1)拆分,并将数据封装到SortBean对象,就可以得到K2
String[] split = value.toString().split("t");
SortBean sortBean = new SortBean();
sortBean.setWord(split[0]);
sortBean.setNum(Integer.parseInt(split[1]));
//2:将K2和V2写入上下文中
context.write(sortBean, NullWritable.get());
}
}
编写Reducer代码
代码语言:javascript复制public class SortReducer extends Reducer<SortBean,NullWritable,SortBean,NullWritable> {
//reduce方法将新的K2和V2转为K3和V3
@Override
protected void reduce(SortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
编写主类代码
代码语言:javascript复制public class SortRunner {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//1:创建job对象
Job job = Job.getInstance(conf, "mapreduce_sort");
//2:指定job所在的jar包
job.setJarByClass(SortRunner.class);
//3:指定源文件的读取方式类和源文件的读取路径
job.setInputFormatClass(TextInputFormat.class);
///TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/input/sort_input"));
TextInputFormat.addInputPath(job, new Path("file:///D:\input\sort_input"));
//4:指定自定义的Mapper类和K2、V2类型
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(SortBean.class);
job.setMapOutputValueClass(NullWritable.class);
//5:指定自定义的Reducer类和K3、V3的数据类型
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(SortBean.class);
job.setOutputValueClass(NullWritable.class);
//6:指定输出方式类和结果输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("file:///D:\out\sort_out"));
//7:将job提交给yarn集群
boolean bl = job.waitForCompletion(true);
System.exit(bl?0:1);
}
}