MapReduce练习
写在之前
已经安装好hadoop 能上传文件到hdfs hadoop版本:2.7
项目依赖
代码语言:javascript复制<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>2.7.2</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
统计
需求:有以下数据,对该数据统计每个单词的出现次数
代码语言:javascript复制hello word
hello page
123456 789
生如夏花 死如秋叶
mapper
代码语言:javascript复制public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
@Override
protected void map(LongWritable key, Text value,Mapper<LongWritable,Text,Text,LongWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] datas = line.split(" ");
for (String data : datas) {
context.write(new Text(data),new LongWritable(1));
}
}
}
reduce
代码语言:javascript复制public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values,Reducer<Text,LongWritable,Text,LongWritable>.Context context) throws IOException, InterruptedException {
String data = key.toString();
long count = 0;
for (LongWritable value : values) {
count = value.get();
}
context.write(new Text(data),new LongWritable(count));
}
}
App
代码语言:javascript复制public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置启动类
job.setJarByClass(App.class);
//wordCount
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//设置输入输出类型
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//设置读取输入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
job.waitForCompletion(true);
}
数据去重
需求:有以下数据,对该数据进行去重处理
代码语言:javascript复制192.168.234.21
192.168.234.22
192.168.234.21
192.168.234.21
192.168.234.23
192.168.234.21
192.168.234.21
192.168.234.21
192.168.234.25
192.168.234.21
192.168.234.21
192.168.234.26
192.168.234.21
192.168.234.27
192.168.234.21
192.168.234.27
192.168.234.21
192.168.234.29
192.168.234.21
192.168.234.26
192.168.234.21
192.168.234.25
192.168.234.25
192.168.234.21
192.168.234.22
192.168.234.21
mapper
代码语言:javascript复制public class DistinctMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable,Text,Text,NullWritable>.Context context) throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}
resure
代码语言:javascript复制public class DistinctReduce extends Reducer<Text,NullWritable,Text,NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text,NullWritable,Text,NullWritable>.Context context) throws IOException, InterruptedException {
context.write(key,NullWritable.get());
}
}
App
代码语言:javascript复制public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置启动类
job.setJarByClass(App.class);
job.setMapperClass(DistinctMapper.class);
job.setReducerClass(DistinctReduce.class);
job.setMapOutputValueClass(NullWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
//设置读取输入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
job.waitForCompletion(true);
}
求平均值
需求:有以下数据,求出他们的平均值
代码语言:javascript复制tom 69
tom 84
tom 68
jary 89
jary 90
jary 81
jary 35
rose 23
rose 100
rose 230
mapper
代码语言:javascript复制public class AvgMapper extends Mapper<LongWritable, Text,Text, FloatWritable> {
@Override
protected void map(LongWritable key, Text value,Mapper<LongWritable, Text,Text,FloatWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] datas = line.split(" ");
String name = datas[0];
int score = Integer.parseInt(datas[1]);
context.write(new Text(name),new FloatWritable(score));
}
}
reduce
代码语言:javascript复制public class AvgReduce extends Reducer<Text, FloatWritable,Text,FloatWritable> {
@Override
protected void reduce(Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException {
Integer i = 0;
Float sums = 0f;
for (FloatWritable value : values) {
sums =value.get();
i =1;
}
sums=sums/i;
context.write(key,new FloatWritable(sums));
}
}
App
代码语言:javascript复制public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置启动类
job.setJarByClass(App.class);
job.setMapperClass(AvgMapper.class);
job.setReducerClass(AvgReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FloatWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
//设置读取输入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
job.waitForCompletion(true);
}
求最大值
需求:假设我们需要处理一批有关天气的数据,其格式如下:
按照ASCII码存储,每行一条记录。每行共24个字符(包含符号在内)
第9、10、11、12字符为年份,第20、21、22、23字符代表温度,求每年的最高温度
代码语言:javascript复制2329999919500515070000
9909999919500515120022
9909999919500515180011
9509999919490324120111
6509999919490324180078
9909999919370515070001
9909999919370515120002
9909999919450515180001
6509999919450324120002
8509999919450324180078
mapper
代码语言:javascript复制public class MaxMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String str = value.toString();
String year = str.substring(8,12);
int temp = Integer.parseInt(str.substring(18,22));
context.write(new Text(year),new IntWritable(temp));
}
}
reduce
代码语言:javascript复制public class MaxReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int max = 0;
for (IntWritable temp : values) {
if (max<temp.get()){
max = temp.get();
}
}
context.write(key,new IntWritable(max));
}
}
App
代码语言:javascript复制public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置启动类
job.setJarByClass(App.class);
job.setMapperClass(MaxMapper.class);
job.setReducerClass(MaxReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置读取输入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
job.waitForCompletion(true);
}
统计流量
需求:有以下数据统计出每个用户的流量使用情况
代码语言:javascript复制12321445 zs bj 343
12321312 ww sh 234
12321445 zs bj 343
12321312 ww cd 234
12345678 zs cd 156
新建FlowPojo类
代码语言:javascript复制public class FlowPojo implements Writable {
private String name;
private String phone;
private String addr;
private long flow;
public FlowPojo() {
}
public FlowPojo(String name, String phone, String addr, long flow) {
this.name = name;
this.phone = phone;
this.addr = addr;
this.flow = flow;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public String getAddr() {
return addr;
}
public void setAddr(String addr) {
this.addr = addr;
}
public long getFlow() {
return flow;
}
public void setFlow(long flow) {
this.flow = flow;
}
@Override
public String toString() {
return "["
"name:'" name '''
", phone:'" phone '''
", addr:'" addr '''
", flow:" flow
']';
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(name);
dataOutput.writeUTF(phone);
dataOutput.writeUTF(addr);
dataOutput.writeLong(flow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.name = dataInput.readUTF();
this.phone = dataInput.readUTF();
this.addr = dataInput.readUTF();
this.flow = dataInput.readLong();
}
}
mapper
代码语言:javascript复制public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowPojo> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String str = value.toString();
String[] datas = str.split(" ");
String phone = datas[0];
String name = datas[1];
String addr = datas[2];
Long flow = Long.parseLong(datas[3]);
FlowPojo flowPojo = new FlowPojo(name, phone, addr, flow);
context.write(new Text(phone),flowPojo);
}
}
reduce
代码语言:javascript复制public class FlowReduce extends Reducer<Text, FlowPojo,Text,FlowPojo> {
@Override
protected void reduce(Text key, Iterable<FlowPojo> values, Context context) throws IOException, InterruptedException {
FlowPojo flowPojo = new FlowPojo();
long flow = 0l;
for (FlowPojo value : values) {
flow =value.getFlow();
//判断名字是否赋值
if (value.getName().isEmpty()){
flowPojo.setName(value.getName());
flowPojo.setPhone(value.getPhone());
flowPojo.setAddr(value.getAddr());
}
}
flowPojo.setFlow(flow);
System.out.println(flow);
context.write(key,flowPojo);
}
}
App
代码语言:javascript复制public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置启动类
job.setJarByClass(App.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowMapper.class);
//设置读取输入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
job.waitForCompletion(true);
}
分区案例
需求:有以下数据,统计每个月的利润,按月分区
mapper
代码语言:javascript复制public class ProfitMapper extends Mapper<LongWritable, Text,Text, FloatWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] datas = value.toString().split(" ");
String month = datas[0];
float profit = Float.parseFloat(datas[1]);
context.write(new Text(month),new FloatWritable(profit));
}
}
reduce
代码语言:javascript复制public class ProfitReduce extends Reducer<Text, FloatWritable,Text,FloatWritable> {
@Override
protected void reduce(Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException {
float count= 0f;
for (FloatWritable value : values) {
count =value.get();
}
context.write(key,new FloatWritable(count));
}
}
App
代码语言:javascript复制public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置启动类
job.setJarByClass(App.class);
job.setNumReduceTasks(3);
job.setMapperClass(ProfitMapper.class);
job.setReducerClass(ProfitReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FloatWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
//设置读取输入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
job.waitForCompletion(true);
}
mapper
代码语言:javascript复制public class FlowsMapper extends Mapper<LongWritable, Text,Text, FlowPojo> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] datas = value.toString().split(" ");
FlowPojo flowPojo = new FlowPojo();
flowPojo.setPhone(datas[0]);
flowPojo.setAddr(datas[1]);
flowPojo.setName(datas[2]);
flowPojo.setFlow(Long.parseLong(datas[3]));
context.write(new Text(flowPojo.getName()),flowPojo);
}
}
reduce
代码语言:javascript复制public class FlowsReduce extends Reducer<Text, FlowPojo, Text,FlowPojo> {
@Override
protected void reduce(Text key, Iterable<FlowPojo> values, Context context) throws IOException, InterruptedException {
FlowPojo flowPojo = new FlowPojo();
for (FlowPojo value : values) {
flowPojo.setFlow(flowPojo.getFlow() value.getFlow());
flowPojo.setName(value.getName());
flowPojo.setAddr(value.getAddr());
flowPojo.setPhone(value.getPhone());
}
context.write(key,flowPojo);
}
}
自定义分区
需求:有以下数据,按地区进行分区,做流量统计
代码语言:javascript复制13877779999 bj zs 2145
13766668888 sh ls 1028
13766668888 sh ls 9987
13877779999 bj zs 5678
13544445555 sz ww 10577
13877779999 sh zs 2145
13766668888 sh ls 9987
FlowPartition
代码语言:javascript复制public class FlowPartition extends Partitioner<Text, FlowPojo> {
@Override
public int getPartition(Text text, FlowPojo flowPojo, int i) {
if (flowPojo.getAddr().equals("bj")){
return 0;
}else if (flowPojo.getAddr().equals("sh")){
return 1;
}else if (flowPojo.getAddr().equals("sz")){
return 2;
}else {
return 3;
}
}
}
mapper
代码语言:javascript复制public class FlowsMapper extends Mapper<LongWritable, Text,Text, FlowPojo> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] datas = value.toString().split(" ");
FlowPojo flowPojo = new FlowPojo();
flowPojo.setPhone(datas[0]);
flowPojo.setAddr(datas[1]);
flowPojo.setName(datas[2]);
flowPojo.setFlow(Long.parseLong(datas[3]));
context.write(new Text(flowPojo.getName()),flowPojo);
}
}
reduce
代码语言:javascript复制public class FlowsReduce extends Reducer<Text, FlowPojo, Text,FlowPojo> {
@Override
protected void reduce(Text key, Iterable<FlowPojo> values, Context context) throws IOException, InterruptedException {
FlowPojo flowPojo = new FlowPojo();
for (FlowPojo value : values) {
flowPojo.setFlow(flowPojo.getFlow() value.getFlow());
flowPojo.setName(value.getName());
flowPojo.setAddr(value.getAddr());
flowPojo.setPhone(value.getPhone());
}
context.write(key,flowPojo);
}
}
App
代码语言:javascript复制public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置启动类
job.setJarByClass(App.class);
job.setPartitionerClass(FlowPartition.class);
job.setNumReduceTasks(4);
job.setMapperClass(FlowsMapper.class);
job.setReducerClass(FlowsReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowPojo.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowPojo.class);
//设置读取输入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
job.waitForCompletion(true);
}
分区求和
需求:有三个成绩文件,chinese.txt,english.txt,math.txt,计算每个人 三个月,每科的总成绩
chinese.txt
代码语言:javascript复制1 zhang 89
2 zhang 73
3 zhang 67
1 wang 49
2 wang 83
3 wang 27
english.txt
代码语言:javascript复制1 zhang 55
2 zhang 69
3 zhang 75
1 wang 44
2 wang 64
3 wang 86
math.txt
代码语言:javascript复制1 zhang 85
2 zhang 59
3 zhang 95
1 wang 74
2 wang 67
3 wang 96
新建StudentPojo
代码语言:javascript复制public class StudentPojo implements Writable {
private String name;
private int math;
private int chinese;
private int english;
public StudentPojo() {
}
public StudentPojo(String name, int math, int chinese, int english) {
this.name = name;
this.math = math;
this.chinese = chinese;
this.english = english;
}
@Override
public String toString() {
return "["
"name:'" name '''
", math:" math
", chinese:" chinese
", english:" english
']';
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getMath() {
return math;
}
public void setMath(int math) {
this.math = math;
}
public int getChinese() {
return chinese;
}
public void setChinese(int chinese) {
this.chinese = chinese;
}
public int getEnglish() {
return english;
}
public void setEnglish(int english) {
this.english = english;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(name);
dataOutput.writeInt(math);
dataOutput.writeInt(chinese);
dataOutput.writeInt(english);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.name = dataInput.readUTF();
this.math = dataInput.readInt();
this.chinese = dataInput.readInt();
this.english = dataInput.readInt();
}
}
mapper
代码语言:javascript复制public class ScoreMapper extends Mapper<LongWritable, Text,Text, StudentPojo> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] datas = value.toString().split(" ");
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String course = fileSplit.getPath().getName();//获取文件名
StudentPojo studentPojo = new StudentPojo();
studentPojo.setName(datas[1]);
if ("math.txt".equals(course)){
studentPojo.setMath(Integer.parseInt(datas[2]));
}
if ("chinese.txt".equals(course)){
studentPojo.setChinese(Integer.parseInt(datas[2]));
}
if ("english.txt".equals(course)){
studentPojo.setEnglish(Integer.parseInt(datas[2]));
}
context.write(new Text(datas[1]),studentPojo);
}
}
reduce
代码语言:javascript复制public class ScoreReduce extends Reducer<Text, StudentPojo,Text,StudentPojo> {
@Override
protected void reduce(Text key, Iterable<StudentPojo> values, Context context) throws IOException, InterruptedException {
StudentPojo studentPojo = new StudentPojo();
studentPojo.setName(key.toString());
for (StudentPojo value : values) {
studentPojo.setMath(studentPojo.getMath() value.getMath());
studentPojo.setChinese(studentPojo.getChinese() value.getChinese());
studentPojo.setEnglish(studentPojo.getEnglish() value.getEnglish());
}
context.write(key,studentPojo);
}
}
App
代码语言:javascript复制public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置启动类
job.setJarByClass(App.class);
job.setMapperClass(ScoreMapper.class);
job.setReducerClass(ScoreReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(StudentPojo.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(StudentPojo.class);
//设置读取输入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
job.waitForCompletion(true);
}
排序
需求:有以下数据,根据热度对电影进行降序排序
代码语言:javascript复制惊天破 72
机械师2 83
奇异博士 67
但丁密码 79
比利林恩的中场战事 94
侠探杰克:永不回头 68
龙珠Z:复活的弗利萨 79
长城 56
夺路而逃 69
神奇动物在哪里 57
驴得水 68
我不是潘金莲 56
你的名字 77
大闹天竺 96
捉迷藏 78
凶手还未睡 23
魔发精灵 68
勇士之门 35
罗曼蒂克消亡史 67
小明和他的小伙伴们 36
MoviePojo
代码语言:javascript复制public class MoviePojo implements WritableComparable<MoviePojo> {
private String name;
private int hot;
public MoviePojo() {
}
public MoviePojo(String name, int hot) {
this.name = name;
this.hot = hot;
}
@Override
public String toString() {
return "name:'" name '''
", hot:" hot ;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getHot() {
return hot;
}
public void setHot(int hot) {
this.hot = hot;
}
@Override
public int compareTo(MoviePojo o) {
return o.getHot()-this.hot;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.name);
dataOutput.writeInt(this.hot);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.name = dataInput.readUTF();
this.hot = dataInput.readInt();
}
}
mapper
代码语言:javascript复制public class MovieMapper extends Mapper<LongWritable, Text,MoviePojo, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] datas = value.toString().split(" ");
MoviePojo moviePojo = new MoviePojo();
moviePojo.setName(datas[0]);
moviePojo.setHot(Integer.parseInt(datas[1]));
context.write(moviePojo,NullWritable.get());
}
}
App
代码语言:javascript复制public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置启动类
job.setJarByClass(App.class);
job.setMapperClass(MovieMapper.class);
job.setOutputKeyClass(MoviePojo.class);
job.setOutputValueClass(NullWritable.class);
//设置读取输入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
job.waitForCompletion(true);
}
全排序统计
需求:有这样一组数字,要求利用3个reduce来处理,并且生成的三个结果文件,是整体有序的。
将1到100装入第一个分区,100到1000装入第二个分区,大于1000的装入第三个分区
代码语言:javascript复制82 239 231
23 22 213
123 232 124
213 3434 232
4546 565 123
231 231
2334 231
1123 5656 657
12313 4324 213
123 2 232 32
343 123 4535
12321 3442 453
1233 342 453
1231 322 452
232 343 455
3123 3434 3242
TotalsortPartition
代码语言:javascript复制public class TotalsortPartition extends Partitioner<LongWritable, IntWritable> {
@Override
public int getPartition(LongWritable longWritable, IntWritable intWritable, int i) {
Long val = longWritable.get();
if (val<100){
return 0;
}else if(val>=100 && val<1000){
return 1;
}else {
return 2;
}
}
}
mapper
代码语言:javascript复制public class TotalsortMapper extends Mapper<LongWritable, Text,LongWritable, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] datas = value.toString().split(" ");
for (String data : datas) {
context.write(new LongWritable(Long.valueOf(data)),new IntWritable(1));
}
}
}
reduce
代码语言:javascript复制public class TotalsortReduce extends Reducer<LongWritable, IntWritable,LongWritable,IntWritable> {
@Override
protected void reduce(LongWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count =value.get();
}
context.write(key,new IntWritable(count));
}
}
App
代码语言:javascript复制public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置启动类
job.setJarByClass(App.class);
job.setPartitionerClass(TotalsortPartition.class);
job.setNumReduceTasks(3);
job.setMapperClass(TotalsortMapper.class);
job.setReducerClass(TotalsortReduce.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(IntWritable.class);
//设置读取输入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
job.waitForCompletion(true);
}