2021年大数据Hadoop(二十):MapReduce的排序和序列化

2021-10-11 15:47:50 浏览数 (1)

MapReduce的排序和序列化

概述

序列化(Serialization)是指把结构化对象转化为字节流。

反序列化(Deserialization)是序列化的逆过程。把字节流转为结构化对象。

当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流,反之当要将接收到或从磁盘读取的字节流转换为对象,就要进行反序列化。

Java的序列化(Serializable)是一个重量级序列化框架,一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系…),不便于在网络中高效传输;所以,hadoop自己开发了一套序列化机制(Writable),精简,高效。不用像java对象类一样传输多层的父子关系,需要哪个属性就传输哪个属性值,大大的减少网络传输的开销。

Writable是Hadoop的序列化格式,hadoop定义了这样一个Writable接口。

一个类要支持可序列化只需实现这个接口即可。

代码语言:javascript复制
public interface  Writable {

 void write(DataOutput out) throws IOException;

 void readFields(DataInput in) throws IOException;

}

另外 Writable 有一个子接口是 WritableComparable, WritableComparable 是既可实现序列化, 也可以对key进行比较, 我们这里可以通过自定义 Key 实现 WritableComparable 来实现我们的排序功能.

代码语言:javascript复制
// WritableComparable分别继承Writable和Comparable

public interface WritableComparable<T> extends Writable, Comparable<T> {

}

//Comparable

public interface Comparable<T> {

    int compareTo(T var1);

}

Comparable接口中的comparaTo方法用来定义排序规则,用于将当前对象与方法的参数进行比较。

例如:o1.compareTo(o2);

如果指定的数与参数相等返回0。

如果指定的数小于参数返回 -1。

如果指定的数大于参数返回 1。

返回正数的话,当前对象(调用compareTo方法的对象o1)要排在比较对象(compareTo传参对象o2)后面,返回负数的话,放在前面。

需求

数据格式如下

a   1 a   9 b   3 a   7 b   8 b   10 a   5

要求:

第一列按照字典顺序进行排列

第一列相同的时候, 第二列按照升序进行排列

​​​​​​​分析

实现自定义的bean来封装数据,并将bean作为map输出的key来传输

MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key。所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable,然后重写key的compareTo方法。

如果自定义的JavaBean要参与MapReduce运算,则必须进行序列化,必须实现Writable接口,如果该JavaBean作为K2,则必须实现WritableComparable接口,让JavaBean具有排序的功能

实现

自定义类型和比较器

代码语言:javascript复制
public class SortBean implements WritableComparable<SortBean>{



  private String word;

  private int  num;



  public String getWord() {

  return word;

  }



  public void setWord(String word) {

  this.word = word;

  }



  public int getNum() {

  return num;

  }



  public void setNum(int num) {

  this.num = num;

  }



  @Override

  public String toString() {

  return   word   "t"  num ;

  }



  //实现比较器,指定排序的规则

  /*

规则:

  第一列(word)按照字典顺序进行排列    //  aac   aad

  第一列相同的时候, 第二列(num)按照升序进行排列

   */

  /*

  a  1

  a  5

  b  3

  b  8

   */

  @Override

  public int compareTo(SortBean sortBean) {

  //先对第一列排序: Word排序

  int result = this.word.compareTo(sortBean.word);

  //如果第一列相同,则按照第二列进行排序

  if(result == 0){

  return  this.num - sortBean.num;

  }

  return result;

  }



  //实现序列化

  @Override

  public void write(DataOutput out) throws IOException {

  out.writeUTF(word);

  out.writeInt(num);

  }



  //实现反序列

  @Override

  public void readFields(DataInput in) throws IOException {

  this.word = in.readUTF();

  this.num = in.readInt();

  }

}

​​​​​​​编写Mapper代码

代码语言:javascript复制
public class SortMapper extends Mapper<LongWritable,Text,SortBean,NullWritable> {

  /*

map方法将K1和V1转为K2和V2:



K1            V1

0            a  3

5            b  7

----------------------

K2                         V2

SortBean(a  3)         NullWritable

SortBean(b  7)         NullWritable

   */

  @Override

  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

  //1:将行文本数据(V1)拆分,并将数据封装到SortBean对象,就可以得到K2

  String[] split = value.toString().split("t");



  SortBean sortBean = new SortBean();

  sortBean.setWord(split[0]);

  sortBean.setNum(Integer.parseInt(split[1]));



  //2:将K2和V2写入上下文中

  context.write(sortBean, NullWritable.get());

  }

 }

​​​​​​​编写Reducer代码

代码语言:javascript复制
public class SortReducer extends Reducer<SortBean,NullWritable,SortBean,NullWritable> {



   //reduce方法将新的K2和V2转为K3和V3

   @Override

   protected void reduce(SortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

      context.write(key, NullWritable.get());

   }

}

​​​​​​​编写主类代码

代码语言:javascript复制
public class SortRunner {

      public static void main(String[] args) throws Exception {

  

  Configuration conf = new Configuration();

          //1:创建job对象

          Job job = Job.getInstance(conf, "mapreduce_sort");

   

       

       //2:指定job所在的jar包

  job.setJarByClass(SortRunner.class);

    

  //3:指定源文件的读取方式类和源文件的读取路径

  job.setInputFormatClass(TextInputFormat.class);

  ///TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/input/sort_input"));

  TextInputFormat.addInputPath(job, new Path("file:///D:\input\sort_input"));



  //4:指定自定义的Mapper类和K2、V2类型

  job.setMapperClass(SortMapper.class);

  job.setMapOutputKeyClass(SortBean.class);

  job.setMapOutputValueClass(NullWritable.class);





  //5:指定自定义的Reducer类和K3、V3的数据类型

  job.setReducerClass(SortReducer.class);

  job.setOutputKeyClass(SortBean.class);

  job.setOutputValueClass(NullWritable.class);





  //6:指定输出方式类和结果输出路径

  job.setOutputFormatClass(TextOutputFormat.class);

  TextOutputFormat.setOutputPath(job, new Path("file:///D:\out\sort_out"));



   

         //7:将job提交给yarn集群

         boolean bl = job.waitForCompletion(true);



         System.exit(bl?0:1);

      }

  }