需求
对以下txt文档进行单词出现次数统计(txt文档在/Users/lizhengi/test/input/目录下)
代码语言:javascript复制hadoop take spring
spark hadoop hdfs
mapreduce take Tomcat
tomcat
kafka kafka flume
flume
hive
实现
1、新建Maven工程,pom.xml依赖如下
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.lizhengi</groupId>
<artifactId>Hadoop-API</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
</project>
2、src/main/resources目录下,新建一个文件,命名为“log4j.properties”,添加内容如下
代码语言:javascript复制log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
3、编写Mapper类-WcMapper
代码语言:javascript复制package com.lizhengi.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* @author lizhengi
* @create 2020-07-20
*/
public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 拿到传入进来的一行内容,把数据类型转化为String
String line = value.toString();
// 2 将这一行内容按照分隔符进行一行内容的切割 切割成一个单词数组
String[] words = line.split(" ");
// 3 遍历数组,每出现一个单词 就标记一个数字1 <单词,1>
for (String word : words) {
//使用mr程序的上下文context 把mapper阶段处理的数据发送出去
//作为reduce节点的输入数据
k.set(word);
context.write(k, v);
}
}
}
4、编写Reducer类-WcReducer
代码语言:javascript复制package com.lizhengi.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* @author lizhengi
* @create 2020-07-20
*/
public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
int sum;
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
// 1 定义一个计数器
sum = 0;
// 2 遍历一组迭代器,把每一个数量1累加起来就构成了单词的总次数
for (IntWritable count : values) {
sum = count.get();
}
// 3 输出最终的结果
v.set(sum);
context.write(key,v);
}
}
5、编写Driver驱动类-WcDriver
代码语言:javascript复制package com.lizhengi.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @author lizhengi
* @create 2020-07-20
*/
public class WcDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 获取配置信息以及封装任务
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 设置jar加载路径
job.setJarByClass(WcDriver.class);
// 3 设置map和reduce类
job.setMapperClass(WcMapper.class);
job.setReducerClass(WcReducer.class);
// 4 设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, "/Users/lizhengi/test/input");
FileOutputFormat.setOutputPath(job, new Path("/Users/lizhengi/test/output"));
// 7 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
结果
代码语言:javascript复制[root@carlota1]ls /Users/lizhengi/test/output/
#多了两个文件
_SUCCESS part-r-00000
代码语言:javascript复制[root@carlota1 output]cat part-r-00000
flume 2
hadoop 2
hdfs 1
hive 1
kafka 2
mapreduce 1
spark 1
spring 1
take 2
tomcat 2