文章目录
- 准备数据
- 创建maven项目,pom文件如下
- 定义一个map类
- 定义一个reduce类
- 定义一个主类,用来描述job并提交job
准备数据
准备一个文档 wordcount.txt 内容如下
word count count hadoop hadoop hadoop spark spark spark hive storm flume kafka redis hbase storm flume kafka redis storm flume hbase storm flume storm flume kafka redis redis redis hbase spark spark spark aaa
创建maven项目,pom文件如下
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.czxy</groupId>
<artifactId>test</artifactId>
<version>1.0-SNAPSHOT</version>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.Hadoop</groupId>
<artifactId>Hadoop-client</artifactId>
<version>2.6.0-mr1-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.Hadoop</groupId>
<artifactId>Hadoop-common</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.Hadoop</groupId>
<artifactId>Hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.Hadoop</groupId>
<artifactId>Hadoop-mapreduce-client-core</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
定义一个map类
代码语言:javascript复制package com.czxy.test01;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMap extends Mapper<LongWritable, Text,Text,LongWritable> {
//第一个参数 LongWritable 是map的输入的 key(数据类型固定,偏移量)
//第二个参数 Text是map的输入的 value(数据类型固定,输入的文档)
//第三个参数 Text是map的输出的 key(数据类型可变,根据需求而变)
//第四个参数 LongWritable 是map的输出的 value(数据类型可变,根据需求而变)
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//key 是行首字母的偏移量
// 偏移量:每个字符移动到当前文档的最前面需要移动的字符个数(空格和回车也要算)
//value 代码中一行数据
//1.将 Text 类型的 value 转化为 string
String data = value.toString();
//2.使用“ ”对数据切分
String[] strings = data.split(" ");
//3.遍历每个单词,进行输出(一个单词输出一次)
for (String string : strings) {
//输出数据
//context 上下文对象
//根据业务需求进行切分,逐一输出
context.write(new Text(string),new LongWritable(1));
}
}
}
定义一个reduce类
代码语言:javascript复制package com.czxy.test01;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReduce extends Reducer<Text,LongWritable,Text,LongWritable> {
//第一个参数 Text是reduce的输入的 key(数据类型与map的输出key一致)
//第二个参数 LongWritable 是reduce的输入的 value(数据类型与map的输出value一致)
//第三个参数 Text是reduce的输出的 key(数据类型reduce的输入的 key一致)
//第四个参数 LongWritable 是reduce的输出的 value(数据类型reduce的输入的 value一致)
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
//key 表示去重后的单词
//value 表示标记的1 (好多 1,key出现几次就有几个1)
int sum =0;
//求和: 遍历values ,逐一求和
for (LongWritable value : values) {
sum = value.get();
}
//将结果输出
context.write(key,new LongWritable(sum));
}
}
定义一个主类,用来描述job并提交job
代码语言:javascript复制package com.czxy.test01;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCountDriver extends Configured implements Tool {
/**
* 将自己的 map Reduce 代码添加到框架中
*/
@Override
public int run(String[] strings) throws Exception {
//1.实例一个 Job
Job job = Job.getInstance(new Configuration(), "wordCount12");
//2.设置读取数据的class
job.setInputFormatClass(TextInputFormat.class);
// 设置读取数据的路径
TextInputFormat.addInputPath(job,new Path("D:\d12-19\idea\work\wordcount.txt"));
//3.设置map
job.setMapperClass(WordCountMap.class);
// 设置map输出的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//4.设置reduce
job.setReducerClass(WordCountReduce.class);
// 设置reduce的输出的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//5.设置输出数据的class
job.setOutputFormatClass(TextOutputFormat.class);
// 设置输出数据的路径
TextOutputFormat.setOutputPath(job,new Path("E://wordCount//output"));
//6.等待代码执行(返回状态码)
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) throws Exception {
//调用执行
int run = ToolRunner.run(new WordCountDriver(), args);
}
}
注意: map的输出是一个 “key value的” list reduce输入是 key “value的list”
错误提醒 1.代码一定不要写错,如果出不了效果,不是代码写错了,就是环境有问题。 2.如果报了一下的错误
代码语言:javascript复制 Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory E:/wordCount/output already exists
别慌,你只是之前输出结果的文件目录已存在,删除数据输出的文件夹就好了