Hadoop使用学习笔记
2. 基本Map-Reduce工作配置与原理(下)
我们先用老版本的API编写,下一篇会用新的API,并解释区别: 环境配置: 提交Job,开发IDE所在机器环境:Windows 7,4C8G,IntelliJ IDEA 15. Hadoop集群环境:第一篇中已经提到,Linux环境的集群。
由于我们是跨环境提交任务,所以源代码和配置上多了很多麻烦事。 首先,确认windows系统能识别hadoop集群的域名,如果不能,先修改C:/Windows/System32/drivers/etc/hosts文件,添加域名解析。 我们之后把之前Linux上的hadoop拉下来到我们的windows系统中。其实只拉配置目录就行,我们只需要其中的配置文件。
我们在IDEA中新建maven工程,比如叫HadoopT。修改pom文件:
代码语言:javascript复制<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<groupId>com.hash.testgroupId>
<artifactId>hadoopTartifactId>
<version>1.0-SNAPSHOTversion>
<properties>
<hadoop_version>2.7.2hadoop_version>
properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-commonartifactId>
<version>${hadoop_version}version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-hdfsartifactId>
<version>${hadoop_version}version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>${hadoop_version}version>
dependency>
<dependency>
<groupId>junitgroupId>
<artifactId>junitartifactId>
<version>4.9version>
<scope>testscope>
dependency>
dependencies>
project>
我们这个工程需要common模块(hdfs还有client模块依赖),hdfs模块(访问HDFS)还有client模块(提交任务)。 之后编写词语统计WordCount类:
代码语言:javascript复制package com.hash.test.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
/**
* @author Hash Zhang
* @version 1.0.0
* @date 2016/7/20
*/
public class WordCount {
private static void deleteDir(Configuration conf, String dirPath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path targetPath = new Path(dirPath);
//如果文件夹存在,则删除
if (fs.exists(targetPath)) {
boolean delResult = fs.delete(targetPath, true);
if (delResult) {
System.out.println(targetPath " has been deleted sucessfullly.");
} else {
System.out.println(targetPath " deletion failed.");
}
}
}
public static void main(String[] args) throws IOException {
//设置工作类,就是main方法所在类
JobConf jobConf = new JobConf(WordCount.class);
//配置需要运行的JAR在本地的位置,就是本类所在的JAR包
jobConf.set("mapreduce.job.jar", "D:\Users\862911\hadoopT\target\hadoopT-1.0-SNAPSHOT.jar");
//远程Hadoop集群的用户,防止没有权限
System.setProperty("HADOOP_USER_NAME", "sfdba");
//设置Job名称
jobConf.setJobName("My Word Count");
//设置每阶段输出格式
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputValueClass(IntWritable.class);
//设置Map的类和reduce的类
jobConf.setMapperClass(Map.class);
jobConf.setReducerClass(Reduce.class);
//设置输入输出格式
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setOutputFormat(TextOutputFormat.class);
//设置输入输出路径,远程Hdfs需要加链接地址
FileInputFormat.setInputPaths(jobConf, args[0]);
FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
//先删除输出目录
deleteDir(jobConf, args[1]);
//执行Job
JobClient.runJob(jobConf);
}
}
class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private final Text key = new Text();
public void map(LongWritable longWritable, Text text, OutputCollector outputCollector, Reporter reporter) throws IOException {
String line = text.toString();
StringTokenizer stringTokenizer = new StringTokenizer(line);
while (stringTokenizer.hasMoreTokens()) {
key.set(stringTokenizer.nextToken());
outputCollector.collect(key, one);
}
}
}
class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text text, Iterator iterator, OutputCollector outputCollector, Reporter reporter) throws IOException {
int sum = 0;
while (iterator.hasNext()) {
sum = iterator.next().get();
}
outputCollector.collect(text, new IntWritable(sum));
}
}
源代码结合我们之前的图很好理解
这里我们重点强调如下: 因为/test/output/每次会被生成,所以,每次执行这个程序,程序开始会先清空输出目录。保证不出错。 由于是本地将任务提交到远程,我们需要指定这个工程需要生成的jar包,通过设置mapred.jar属性来实现。 这里,我配置了WordCount 执行的配置:
这里我们先修改Program Aruguments,程序中我们取第一个参数为输入文件夹,第二个为输出。这里我们配置的都在HDFS上。 之后在下面的运行配置中添加“Run Maven Goal “HadoopT: Package””。这样,保证我们在代码中配置的jar永远是最新(这个jar地址就是maven package后生成的jar包地址)的。
代码语言:javascript复制jobConf.set("mapred.jar", "D:\Users\862911\hadoopT\target\hadoopT-1.0-SNAPSHOT.jar");
之后,在nosql1上利用hdfs命令,添加file1和file2至hdfs上的/test/input目录。注意,用户一定要是hadoop。
要指定Hadoop用户,否则没有权限执行Map-red job。这个用户使用System.setproperties来配置,因为每个Hadoop应用可能不一样。 之前我们在linux下用的hadoop用户,所以在这里我们设置:
代码语言:javascript复制System.setProperty("HADOOP_USER_NAME", "hadoop");
之后,我们在IDEA中运行,输出如下(注意,我们的日志级别是WARN):
代码语言:javascript复制16/08/03 11:10:43 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
hdfs://nosql1:9000/test/output has been deleted sucessfullly.
16/08/03 11:10:45 INFO client.RMProxy: Connecting to ResourceManager at nosql1/10.202.7.184:8032
16/08/03 11:10:45 INFO client.RMProxy: Connecting to ResourceManager at nosql1/10.202.7.184:8032
16/08/03 11:10:45 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/08/03 11:10:46 INFO mapred.FileInputFormat: Total input paths to process : 2
16/08/03 11:10:46 INFO mapreduce.JobSubmitter: number of splits:3
16/08/03 11:10:46 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1470040052262_0010
16/08/03 11:10:47 INFO impl.YarnClientImpl: Submitted application application_1470040052262_0010
16/08/03 11:10:47 INFO mapreduce.Job: The url to track the job: http://nosql1:8088/proxy/application_1470040052262_0010/
16/08/03 11:10:47 INFO mapreduce.Job: Running job: job_1470040052262_0010
16/08/03 11:10:54 INFO mapreduce.Job: Job job_1470040052262_0010 running in uber mode : false
16/08/03 11:10:54 INFO mapreduce.Job: map 0% reduce 0%
16/08/03 11:10:59 INFO mapreduce.Job: map 100% reduce 0%
16/08/03 11:11:05 INFO mapreduce.Job: map 100% reduce 100%
16/08/03 11:11:05 INFO mapreduce.Job: Job job_1470040052262_0010 completed successfully
16/08/03 11:11:05 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=93
FILE: Number of bytes written=476815
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=311
HDFS: Number of bytes written=39
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=3
Launched reduce tasks=1
Data-local map tasks=3
Total time spent by all maps in occupied slots (ms)=9380
Total time spent by all reduces in occupied slots (ms)=3400
Total time spent by all map tasks (ms)=9380
Total time spent by all reduce tasks (ms)=3400
Total vcore-milliseconds taken by all map tasks=9380
Total vcore-milliseconds taken by all reduce tasks=3400
Total megabyte-milliseconds taken by all map tasks=9605120
Total megabyte-milliseconds taken by all reduce tasks=3481600
Map-Reduce Framework
Map input records=5
Map output records=8
Map output bytes=71
Map output materialized bytes=105
Input split bytes=261
Combine input records=0
Combine output records=0
Reduce input groups=6
Reduce shuffle bytes=105
Reduce input records=8
Reduce output records=6
Spilled Records=16
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=114
CPU time spent (ms)=2180
Physical memory (bytes) snapshot=1001734144
Virtual memory (bytes) snapshot=3987656704
Total committed heap usage (bytes)=805306368
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=50
File Output Format Counters
Bytes Written=39
之后,我们通过YARN可以看到这个Job的输出和记录。 我们编写一个本地程序,输出输出目录的文件内容:
代码语言:javascript复制package com.hash.test.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
/**
* @author Hash Zhang
* @version 1.0.0
* @date 2016/7/20
*/
public class TestHDFS {
public static void main(String[] args) throws IOException {
String uri = "hdfs://nosql1:9000/";
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), config);
// 显示在hdfs的/user/fkong下指定文件的内容
InputStream is = fs.open(new Path("/test/output/part-00000"));
IOUtils.copyBytes(is, System.out, 1024, true);
}
}
执行后输出为:
代码语言:javascript复制aa 1
apple 3
asd 1
bbb 1
ccccc 1
egg 1
用新的API如下:
代码语言:javascript复制package com.hash.test.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.StringTokenizer;
/**
* @author Hash Zhang
* @version 1.0.0
* @date 2016/8/3
*/
public class WordCount extends Configured implements Tool {
private static void deleteDir(Configuration conf, String dirPath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path targetPath = new Path(dirPath);
//如果文件夹存在,则删除
if (fs.exists(targetPath)) {
boolean delResult = fs.delete(targetPath, true);
if (delResult) {
System.out.println(targetPath " has been deleted sucessfullly.");
} else {
System.out.println(targetPath " deletion failed.");
}
}
}
public int run(String[] strings) throws Exception {
System.setProperty("HADOOP_USER_NAME", "sfdba");
Job job = Job.getInstance();
job.setJar("D:\Users\862911\hadoopT\target\hadoopT-1.0-SNAPSHOT.jar");
//设置Job名称
job.setJobName("My Word Count");
//设置每阶段输出格式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置Map的类和reduce的类
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
//设置输入输出格式
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//设置输入输出路径,远程Hdfs需要加链接地址
FileInputFormat.setInputPaths(job, strings[0]);
FileOutputFormat.setOutputPath(job, new Path(strings[1]));
//先删除输出目录
deleteDir(job.getConfiguration(), strings[1]);
final boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new WordCount(),args);
System.exit(ret);
}
}
class Map extends Mapper {
private static final IntWritable one = new IntWritable(1);
private final Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer stringTokenizer = new StringTokenizer(line);
while (stringTokenizer.hasMoreTokens()) {
word.set(stringTokenizer.nextToken());
context.write(word, one);
}
}
}
class Reduce extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum = value.get();
}
context.write(key, new IntWritable(sum));
}
}
之后的演示,我们都会使用新版的API。