MapReduce编程规范及示例编写
编程规范
MapReduce 的开发一共有八个步骤, 其中 Map 阶段分为2个步骤,Shuffle 阶段 4 个步骤,Reduce 阶段分为2个步骤
Map阶段2个步骤
1、设置 InputFormat 类, 读取输入文件内容,对输入文件的每一行,解析成key、value对(K1和V1)。
2、自定义map方法,每一个键值对调用一次map方法,将第一步的K1和V1结果转换成另外的 Key-Value(K2和V2)对, 输出结果。
Shuffle 阶段 4 个步骤
3、 对map阶段输出的k2和v2对进行分区
4、 对不同分区的数据按照相同的Key排序
5、(可选)对数据进行局部聚合, 降低数据的网络拷贝
6、对数据进行分组, 相同Key的Value放入一个集合中,得到K2和[V2]
Reduce 阶段 2 个步骤
7、对map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。 8、对多个map任务的输出进行合并、排序。编写reduce方法,在此方法中将K2和[V2]进行处理,转换成新的key、value(K3和V3)输出,并把reduce的输出保存到文件中。
编程步骤
用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)
Mapper
- 自定义类继承Mapper类
- 重写自定义类中的map方法,在该方法中将K1和V1转为K2和V2
- 将生成的K2和V2写入上下文中
Reducer
- 自定义类继承Reducer类
- 重写Reducer中的reduce方法,在该方法中将K2和[V2]转为K3和V3
- 将K3和V3写入上下文中
Driver
整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象
1、定义类,编写main方法
2、在main方法中指定以下内容:
- 创建建一个job任务对象
- 指定job所在的jar包
- 指定源文件的读取方式类和源文件的读取路径
- 指定自定义的Mapper类和K2、V2类型
- 指定自定义分区类(如果有的话)
- 指定自定义Combiner类(如果有的话)
- 指定自定义分组类(如果有的话)
- 指定自定义的Reducer类和K3、V3的数据类型
- 指定输出方式类和结果输出路径
- 将job提交到yarn集群
WordCount示例编写
需求:在一堆给定的文本文件中统计输出每一个单词出现的总次数
第一步:数据准备
1、创建一个新的文件
cd /export/server vim wordcount.txt
2、向其中放入以下内容并保存
hello,world,hadoop hive,sqoop,flume,hello kitty,tom,jerry,world hadoop
3、上传到 HDFS
hadoop fs -mkdir -p /input/wordcount hadoop fs -put wordcount.txt /input/wordcoun
第二步:代码编写
1、导入maven坐标
代码语言:javascript复制<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
2、定义一个mapper类
代码语言:javascript复制//首先要定义四个泛型的类型
//keyin: LongWritable valuein: Text
//keyout: Text valueout:IntWritable
public class WordCountMapper extends Mapper<LongWritable, Text, Text, Writable>{
//map方法的生命周期: 框架每传一行数据就被调用一次
//key : 这一行的起始点在文件中的偏移量
//value: 这一行的内容
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//拿到一行数据转换为string
String line = value.toString();
//将这一行切分出各个单词
String[] words = line.split(" ");
//遍历数组,输出<单词,1>
for(String word:words){
context.write(new Text(word), new LongWritable (1));
}
}
}
3、定义一个reducer类
代码语言:javascript复制public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
//生命周期:框架每传递进来一个kv 组,reduce方法被调用一次
@Override
protected void reduce(Text key, Iterable<LongWritable > values, Context context) throws IOException, InterruptedException {
//定义一个计数器
int count = 0;
//遍历这一组kv的所有v,累加到count中
for(LongWritable value:values){
count = value.get();
}
context.write(key, new LongWritable (count));
}
}
4、定义一个Driver主类,用来描述job并提交job
代码语言:javascript复制public class WordCountRunner {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1、创建建一个job任务对象
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "wordcount");
//2、指定job所在的jar包
job.setJarByClass(WordCountRunner.class);
//3、指定源文件的读取方式类和源文件的读取路径
job.setInputFormatClass(TextInputFormat.class); //按照行读取
//TextInputFormat.addInputPath(job, new Path("hdfs://node1:8020/input/wordcount")); //只需要指定源文件所在的目录即可
TextInputFormat.addInputPath(job, new Path("file:///E:\input\wordcount")); //只需要指定源文件所在的目录即可
//4、指定自定义的Mapper类和K2、V2类型
job.setMapperClass(WordCountMapper.class); //指定Mapper类
job.setMapOutputKeyClass(Text.class); //K2类型
job.setMapOutputValueClass(LongWritable.class);//V2类型
//5、指定自定义分区类(如果有的话)
//6、指定自定义分组类(如果有的话)
//7、指定自定义的Reducer类和K3、V3的数据类型
job.setReducerClass(WordCountReducer.class); //指定Reducer类
job.setOutputKeyClass(Text.class); //K3类型
job.setOutputValueClass(LongWritable.class); //V3类型
//8、指定输出方式类和结果输出路径
job.setOutputFormatClass(TextOutputFormat.class);
//TextOutputFormat.setOutputPath(job, new Path("hdfs://node1:8020/output/wordcount")); //目标目录不能存在,否则报错
TextOutputFormat.setOutputPath(job, new Path("file:///E:\output\wordcount")); //目标目录不能存在,否则报错
//9、将job提交到yarn集群
boolean bl = job.waitForCompletion(true); //true表示可以看到任务的执行进度
//10.退出执行进程
System.exit(bl?0:1);
}
}