基本概念
Map:分布计算 Reduce:汇总计算
代码语言:javascript复制这里要与Spark中的map和reduce算子做区分。Spark的map()和reduce()与Hadoop MapReduce中的同名函数没有直接对应关系。
split阶段将文件逻辑拆分,为了分布式计算做准备,每一个MapTask生成一个临时文件,多个临时文件会进行合并,用来传递给ReduceTask,然后ReduceTask对临时文件进行计算。本Demo基于Hadoop3.1.4实验。
Map阶段
以人名wordcount为例: maptask输入 k-v 代表 偏移量-行数据。 maptask输出 k-v 代表 人名-1。 maptask输出临时文件结果示例:
代码语言:javascript复制张三 1
张三 1
李四 1
王五 1
赵六 1
李四 1
李四 1
Reduce阶段
此时会经历一个网络传输,Map阶段最后生成的临时文件会在这里合并,合并临时文件是将上面进行分组和聚合,生成一个新文件; 按k分组,这里并不是真实的数组,而是迭代器:
代码语言:javascript复制张三 [1,1]
李四 [1,1,1]
王五 [1]
赵六 [1]
reduce输入 k-v :人名-[1,1,1,1,1,1,…] reduce输出 k-v:人名-sum or count
代码语言:javascript复制张三 2
李四 3
王五 1
赵六 1
这样就能解决内存小不够计算的问题
代码实现
mapTask和reduceTask阶段
代码语言:javascript复制/*
继承Mapper类,只要输入的是文字,泛型中有固定的数据类型:
k-LongWritable == L
v-Text == String
不固定的是输出的 k-v,在该示例中:
人名-k-Text
1-v-IntWritable == Integer
*/
static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {//输入输入、输出输出
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//Mapper类根据每个逻辑切片中的数据行数,调用对应次数的map方法,源码中是while循环
String s = value.toString();
String[] arr = s.split(" "); // 将人名按照空格分隔并写入数组arr
for (String name : arr) {
// 遍历迭代器,利用context工具类,将人名写入k,1写入v
context.write(new Text(name),new IntWritable(1));
}
}
}
static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
//map的输出是什么,reduce的输入就是什么,泛型可以直接复制
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
//对合并后的文件进行遍历,需要一个累加器
for (IntWritable value : values) {
int i = value.get(); //每一个value表示其中的一个1,循环通过get()方法转为int类型
sum = i;
}
context.write(key, new IntWritable(sum));
}
}
// 输出
// 张三 12
// 李四 6
// 王五 6
job启动入口:
代码语言:javascript复制// 1. 初始化配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop10:8020");
//2. 创建job
Job job = Job.getInstance(conf);
job.setJarByClass(CountCleanJob.class);
//3. 设置输入格式化工具和输出格式化
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//4. 设置输入路径和输出路径
// TextInputFormat读取hdfs中的文件,根据文件大小和个数对其split逻辑切片
// MapReduce根据切片多少,启动对应的MapTask任务
TextInputFormat.addInputPath(job, new Path("/wordcount.txt"));
// 将输出文件发送到hdfs
TextOutputFormat.setOutputPath(job, new Path("/Count_Out"));
//5. 设置mapper和reducer
job.setMapperClass(CountCleanMapper.class);
job.setReducerClass(CountCleanReducer.class);
// 6. 设置mapper的kv类型和reducer的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 7. 启动job
boolean b = job.waitForCompletion(true);
System.out.println(b);//true
```