MapReduce 编程实践

2021-09-06 10:21:45 浏览数 (1)

文章目录

    • 1. MapReduce 作业流程
    • 2. 实践
      • 2.1 启动 hadoop
      • 2.2 创建 java 项目
      • 2.3 MapReduce shell
      • 2.4 MapReduce Web UI
    • 3. MapReduce 编程实践:统计对象中的某些属性

参考书:《Hadoop大数据原理与应用》

1. MapReduce 作业流程

2. 实践

2.1 启动 hadoop

代码语言:javascript复制
start-dfs.sh
start-yarn.sh
mr-jobhistory-daemon.sh start historyserver
# 第三条可以用下面的命令,上面的显示过期了,以后弃用
mapred --daemon start historyserver

2.2 创建 java 项目

  • WordCountMapper.java
代码语言:javascript复制
package com.michael.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	//self define map method 自定义 map 方法
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
	{
		String line = value.toString();
		String[] words = line.split(" ");
		for(String word : words)
			context.write(new Text(word), new IntWritable(1));
			// context.write() give to next stage: shuffle
	}
}
  • WordCountReducer.java
代码语言:javascript复制
package com.michael.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
	// 自定义 reduce 方法
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws
		IOException, InterruptedException
	{
		int sum = 0;
		for(IntWritable value : values)
			sum  = value.get();
		context.write(key, new IntWritable(sum));
	}
}
  • WordCountDriver.java,dirver 类设置本次 job
代码语言:javascript复制
package com.michael.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {
	// args 参数 输入输出文件路径
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
		Configuration conf = new Configuration();
		// map compress, 开启 map 阶段的压缩
		conf.setBoolean("mapreduce.map.output.compress", true);
		// compress type,指定压缩类型
		conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
		
		Job job = Job.getInstance(conf, "word count diy:");
		job.setJarByClass(WordCountDriver.class);
		job.setMapperClass(WordCountMapper.class);
		
		// 自定义 Combine
		job.setCombinerClass(WordCountReducer.class);
		job.setReducerClass(WordCountReducer.class);
		
		// 指定 map 输出数据的类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		// 指定 reduce 输出数据类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		// 设置输入文件路径
		FileInputFormat.setInputPaths(job,  new Path(args[0]));
		// 设置输出文件路径
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		// 开启 reduce 阶段的解压缩
		FileOutputFormat.setCompressOutput(job, true);
		// 指定解压缩类型(跟上面压缩类型一致)
		FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
		
		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}
  • 导出 wordcount_diy.jar
  • 提交hadoop执行
代码语言:javascript复制
hadoop jar /home/dnn/eclipse-workspace/HDFS_example/wordcount_diy.jar com.michael.mapreduce.WordCountDriver /InputDataTest /OutputDataTest1
  • 查看结果
代码语言:javascript复制
hdfs dfs -cat /OutputDataTesdfs dfs -cat /OutputDataTest1/part-r-00000.bz2

显示乱码,需要下载然后解压,再查看

  • 下载
代码语言:javascript复制
hdfs dfs -get /OutputDataTest1/part-r-00000.bz2 /home/dnn/eclipse-workspace/HDFS_example/part-r-00000.bz2
  • 查看
代码语言:javascript复制
bzcat /home/dnn/eclipse-workspace/HDFS_example/part-r-00000.bz2

2.3 MapReduce shell

查看作业状态

代码语言:javascript复制
mapred job -status job_1615849408082_0001
代码语言:javascript复制
[dnn@master Desktop]$ mapred job -status job_1615849408082_0001
WARNING: HADOOP_MAPRED_PID_DIR has been replaced by HADOOP_PID_DIR. Using value of HADOOP_MAPRED_PID_DIR.
2021-03-26 04:25:14,881 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at master/192.168.253.130:8032
2021-03-26 04:25:15,939 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server

Job: job_1615849408082_0001
Job File: hdfs://192.168.253.130:9000/tmp/hadoop-yarn/staging/history/done/2021/03/24/000000/job_1615849408082_0001_conf.xml
Job Tracking URL : http://master:19888/jobhistory/job/job_1615849408082_0001
Uber job : false
Number of maps: 3
Number of reduces: 1
map() completion: 1.0
reduce() completion: 1.0
Job state: SUCCEEDED
retired: false
reason for failure:  
Counters: 54
	File System Counters
		FILE: Number of bytes read=6640
		FILE: Number of bytes written=1072644
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=25631
		HDFS: Number of bytes written=4967
		HDFS: Number of read operations=14
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
		HDFS: Number of bytes read erasure-coded=0
	Job Counters 
		Launched map tasks=3
		Launched reduce tasks=1
		Data-local map tasks=3
		Total time spent by all maps in occupied slots (ms)=43801
		Total time spent by all reduces in occupied slots (ms)=5037
		Total time spent by all map tasks (ms)=43801
		Total time spent by all reduce tasks (ms)=5037
		Total vcore-milliseconds taken by all map tasks=43801
		Total vcore-milliseconds taken by all reduce tasks=5037
		Total megabyte-milliseconds taken by all map tasks=44852224
		Total megabyte-milliseconds taken by all reduce tasks=5157888
	Map-Reduce Framework
		Map input records=667
		Map output records=3833
		Map output bytes=40605
		Map output materialized bytes=8455
		Input split bytes=358
		Combine input records=3833
		Combine output records=1264
		Reduce input groups=913
		Reduce shuffle bytes=8455
		Reduce input records=1264
		Reduce output records=913
		Spilled Records=2528
		Shuffled Maps =3
		Failed Shuffles=0
		Merged Map outputs=3
		GC time elapsed (ms)=818
		CPU time spent (ms)=3140
		Physical memory (bytes) snapshot=599461888
		Virtual memory (bytes) snapshot=10950950912
		Total committed heap usage (bytes)=385351680
		Peak Map Physical memory (bytes)=167784448
		Peak Map Virtual memory (bytes)=2735529984
		Peak Reduce Physical memory (bytes)=96972800
		Peak Reduce Virtual memory (bytes)=2744360960
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=25273
	File Output Format Counters 
		Bytes Written=4967

2.4 MapReduce Web UI

代码语言:javascript复制
http://192.168.253.130:19888/jobhistory

3. MapReduce 编程实践:统计对象中的某些属性

MapReduce 编程实践:统计对象中的某些属性

0 人点赞