MapReduce-WorldCount编程思路

2024-07-25 15:48:33 浏览数 (2)

基本概念

Map:分布计算 Reduce:汇总计算

代码语言:javascript复制
这里要与Spark中的map和reduce算子做区分。Spark的map()和reduce()与Hadoop MapReduce中的同名函数没有直接对应关系。

split阶段将文件逻辑拆分,为了分布式计算做准备,每一个MapTask生成一个临时文件,多个临时文件会进行合并,用来传递给ReduceTask,然后ReduceTask对临时文件进行计算。本Demo基于Hadoop3.1.4实验。

Map阶段

以人名wordcount为例: maptask输入 k-v 代表 偏移量-行数据。 maptask输出 k-v 代表 人名-1。 maptask输出临时文件结果示例:

代码语言:javascript复制
张三 1
张三 1
李四 1
王五 1
赵六 1
李四 1
李四 1
Reduce阶段

此时会经历一个网络传输,Map阶段最后生成的临时文件会在这里合并,合并临时文件是将上面进行分组和聚合,生成一个新文件; 按k分组,这里并不是真实的数组,而是迭代器:

代码语言:javascript复制
张三 [1,1]
李四 [1,1,1]
王五 [1]
赵六 [1]

reduce输入 k-v :人名-[1,1,1,1,1,1,…] reduce输出 k-v:人名-sum or count

代码语言:javascript复制
张三 2
李四 3
王五 1
赵六 1

这样就能解决内存小不够计算的问题

代码实现
mapTask和reduceTask阶段
代码语言:javascript复制
/*
	继承Mapper类,只要输入的是文字,泛型中有固定的数据类型:
	k-LongWritable		== L
	v-Text				== String
    不固定的是输出的 k-v,在该示例中:
	人名-k-Text
	1-v-IntWritable	 	== Integer
*/
static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {//输入输入、输出输出

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      //Mapper类根据每个逻辑切片中的数据行数,调用对应次数的map方法,源码中是while循环
        String s = value.toString();
        String[] arr = s.split(" ");	// 将人名按照空格分隔并写入数组arr
        for (String name : arr) {
            // 遍历迭代器,利用context工具类,将人名写入k,1写入v
            context.write(new Text(name),new IntWritable(1));
        }
    }
}

static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
//map的输出是什么,reduce的输入就是什么,泛型可以直接复制
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        //对合并后的文件进行遍历,需要一个累加器
        for (IntWritable value : values) {
            int i = value.get();	//每一个value表示其中的一个1,循环通过get()方法转为int类型
            sum  = i;
        }
        context.write(key, new IntWritable(sum));
    }
}
// 输出
// 张三	12
// 李四	6
// 王五	6
job启动入口:
代码语言:javascript复制
// 1. 初始化配置
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop10:8020");
        //2. 创建job
        Job job = Job.getInstance(conf);
        job.setJarByClass(CountCleanJob.class);

        //3. 设置输入格式化工具和输出格式化
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        //4. 设置输入路径和输出路径
    	// TextInputFormat读取hdfs中的文件,根据文件大小和个数对其split逻辑切片
		// MapReduce根据切片多少,启动对应的MapTask任务
        TextInputFormat.addInputPath(job, new Path("/wordcount.txt"));
    	// 将输出文件发送到hdfs
        TextOutputFormat.setOutputPath(job, new Path("/Count_Out"));

        //5. 设置mapper和reducer
        job.setMapperClass(CountCleanMapper.class);
        job.setReducerClass(CountCleanReducer.class);

        // 6. 设置mapper的kv类型和reducer的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 7. 启动job
        boolean b = job.waitForCompletion(true);
        System.out.println(b);//true
        ```

0 人点赞