2021年大数据Hadoop(十七):MapReduce编程规范及示例编写

2021-10-11 15:49:42 浏览数 (1)

MapReduce编程规范及示例编写

编程规范

MapReduce 的开发一共有八个步骤, 其中 Map 阶段分为2个步骤,Shuffle 阶段 4 个步骤,Reduce 阶段分为2个步骤

Map阶段2个步骤

1、设置 InputFormat 类, 读取输入文件内容,对输入文件的每一行,解析成key、value对(K1和V1)。

2、自定义map方法,每一个键值对调用一次map方法,将第一步的K1和V1结果转换成另外的 Key-Value(K2和V2)对, 输出结果。

Shuffle 阶段 4 个步骤

3、 对map阶段输出的k2和v2对进行分区

4、 对不同分区的数据按照相同的Key排序

5、(可选)对数据进行局部聚合, 降低数据的网络拷贝

6、对数据进行分组, 相同Key的Value放入一个集合中,得到K2和[V2]

Reduce 阶段 2 个步骤

7、对map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。 8、对多个map任务的输出进行合并、排序。编写reduce方法,在此方法中将K2和[V2]进行处理,转换成新的key、value(K3和V3)输出,并把reduce的输出保存到文件中。

编程步骤

用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)

Mapper

  1. 自定义类继承Mapper类
  2. 重写自定义类中的map方法,在该方法中将K1和V1转为K2和V2
  3. 将生成的K2和V2写入上下文中

Reducer

  1. 自定义类继承Reducer类
  2. 重写Reducer中的reduce方法,在该方法中将K2和[V2]转为K3和V3
  3. 将K3和V3写入上下文中

Driver

整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象

1、定义类,编写main方法

2、在main方法中指定以下内容:

  1. 创建建一个job任务对象
  2. 指定job所在的jar包
  3. 指定源文件的读取方式类和源文件的读取路径
  4. 指定自定义的Mapper类和K2、V2类型
  5. 指定自定义分区类(如果有的话)
  6. 指定自定义Combiner类(如果有的话)
  7. 指定自定义分组类(如果有的话)
  8. 指定自定义的Reducer类和K3、V3的数据类型
  9. 指定输出方式类和结果输出路径
  10. 将job提交到yarn集群

WordCount示例编写

需求:在一堆给定的文本文件中统计输出每一个单词出现的总次数

第一步:数据准备

1、创建一个新的文件

cd /export/server vim wordcount.txt

2、向其中放入以下内容并保存

hello,world,hadoop hive,sqoop,flume,hello kitty,tom,jerry,world hadoop

3、上传到 HDFS

hadoop fs   -mkdir -p  /input/wordcount hadoop fs -put wordcount.txt /input/wordcoun

第二步:代码编写

1、导入maven坐标

代码语言:javascript复制
<dependencies>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-common</artifactId>

            <version>2.7.5</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-client</artifactId>

            <version>2.7.5</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-hdfs</artifactId>

            <version>2.7.5</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-mapreduce-client-core</artifactId>

            <version>2.7.5</version>

        </dependency>

        <dependency>

            <groupId>junit</groupId>

            <artifactId>junit</artifactId>

            <version>4.12</version>

        </dependency>

        <dependency>

            <groupId>org.slf4j</groupId>

            <artifactId>slf4j-simple</artifactId>

            <version>1.7.25</version>

        </dependency>

    </dependencies>

2、定义一个mapper类

代码语言:javascript复制
//首先要定义四个泛型的类型

//keyin:  LongWritable    valuein: Text

//keyout: Text            valueout:IntWritable

public class WordCountMapper extends Mapper<LongWritable, Text, Text, Writable>{

//map方法的生命周期:  框架每传一行数据就被调用一次

//key :  这一行的起始点在文件中的偏移量

//value: 这一行的内容

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

    //拿到一行数据转换为string

    String line = value.toString();

    //将这一行切分出各个单词

    String[] words = line.split(" ");

    //遍历数组,输出<单词,1>

    for(String word:words){

        context.write(new Text(word), new LongWritable (1));

    }

}

}

3、定义一个reducer类

代码语言:javascript复制
public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable> {

//生命周期:框架每传递进来一个kv 组,reduce方法被调用一次

 @Override

 protected void reduce(Text key, Iterable<LongWritable > values, Context context) throws IOException, InterruptedException {

//定义一个计数器

int count = 0;

//遍历这一组kv的所有v,累加到count中

for(LongWritable value:values){

count  = value.get();

}

context.write(key, new LongWritable (count));

 }

}

4、定义一个Driver主类,用来描述job并提交job

代码语言:javascript复制
public class WordCountRunner {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //1、创建建一个job任务对象

        Configuration configuration = new Configuration();

        Job job = Job.getInstance(configuration, "wordcount");



        //2、指定job所在的jar包

        job.setJarByClass(WordCountRunner.class);



        //3、指定源文件的读取方式类和源文件的读取路径

        job.setInputFormatClass(TextInputFormat.class); //按照行读取

        //TextInputFormat.addInputPath(job, new Path("hdfs://node1:8020/input/wordcount")); //只需要指定源文件所在的目录即可

         TextInputFormat.addInputPath(job, new Path("file:///E:\input\wordcount")); //只需要指定源文件所在的目录即可



        //4、指定自定义的Mapper类和K2、V2类型

        job.setMapperClass(WordCountMapper.class); //指定Mapper类

        job.setMapOutputKeyClass(Text.class); //K2类型

        job.setMapOutputValueClass(LongWritable.class);//V2类型



        //5、指定自定义分区类(如果有的话)

        //6、指定自定义分组类(如果有的话)

        //7、指定自定义的Reducer类和K3、V3的数据类型

        job.setReducerClass(WordCountReducer.class); //指定Reducer类

        job.setOutputKeyClass(Text.class); //K3类型

        job.setOutputValueClass(LongWritable.class);  //V3类型



        //8、指定输出方式类和结果输出路径

        job.setOutputFormatClass(TextOutputFormat.class);

        //TextOutputFormat.setOutputPath(job, new  Path("hdfs://node1:8020/output/wordcount")); //目标目录不能存在,否则报错

        TextOutputFormat.setOutputPath(job, new  Path("file:///E:\output\wordcount")); //目标目录不能存在,否则报错



        //9、将job提交到yarn集群

        boolean bl = job.waitForCompletion(true); //true表示可以看到任务的执行进度



        //10.退出执行进程

        System.exit(bl?0:1);

    }

}