文章目录
-
- 1. 生成数据
- 2. 编写实体类
- 3. Mapper类
- 4. Reducer类
- 5. Driver类
- 6. 运行
参考书:《Hadoop大数据原理与应用》
相关文章:MapReduce 编程实践
1. 生成数据
超市消费者 数据: id, 时间,消费金额,会员/非会员
使用 Python 生成虚拟数据
代码语言:javascript复制import random
import time
consumer_type = ['会员', '非会员']
vip, novip = 0, 0
vipValue, novipValue = 0, 0
with open("consumer.txt",'w',encoding='utf-8') as f:
for i in range(1000): # 1000条数据
random.seed(time.time() i)
id = random.randint(0, 10000)
t = time.strftime('%Y-%m-%d %H:%M',time.localtime(time.time()))
value = random.randint(1, 500)
type = consumer_type[random.randint(0, 1)]
f.write(str(id) 't' t 't' str(value) 't' type 'n')
if type == consumer_type[0]:
vip = 1
vipValue = value
else:
novip = 1
novipValue = value
print(consumer_type[0] ": 人数 " str(vip) ", 总金额: " str(vipValue) ", 平均金额:" str(vipValue/vip))
print(consumer_type[1] ": 人数 " str(novip) ", 总金额: " str(novipValue) ", 平均金额:" str(novipValue/novip))
代码语言:javascript复制[dnn@master HDFS_example]$ python test.py
会员: 人数 510, 总金额: 128744, 平均金额:252.439215686
非会员: 人数 490, 总金额: 123249, 平均金额:251.528571429
2. 编写实体类
- Consumer.java
package com.michael.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;
public class Consumer implements Writable{
private String id;
private int money;
private int vip; // 0 no, 1 yes
public Consumer() {
}
public Consumer(String id, int money, int vip) {
this.id = id;
this.money = money;
this.vip = vip;
}
public int getMoney() {
return money;
}
public void setMoney() {
this.money = money;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public int getVip() {
return vip;
}
public void setVip(int vip) {
this.vip = vip;
}
public void write(DataOutput dataOutput) throws IOException{
dataOutput.writeUTF(id);
dataOutput.writeInt(money);
dataOutput.writeInt(vip);
}
public void readFields(DataInput dataInput) throws IOException{
this.id = dataInput.readUTF();
this.money = dataInput.readInt();
this.vip = dataInput.readInt();
}
@Override
public String toString() {
return this.id "t" this.money "t" this.vip;
}
}
3. Mapper类
- ConsumerMapper.java
package com.michael.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ConsumerMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
String[] fields = line.split("t");
String id = fields[0];
int money = Integer.parseInt(fields[2]);
String vip = fields[3];
context.write(new Text(vip), new IntWritable(money));
}
}
4. Reducer类
- ConsumerReducer.java
package com.michael.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class ConsumerReducer extends Reducer<Text, IntWritable, Text, LongWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
int count = 0;
long sum = 0;
for(IntWritable v : values) {
count ;
sum = v.get();
}
long avg = sum/count;
context.write(key, new LongWritable(avg));
}
}
5. Driver类
- ConsumerDriver.java
package com.michael.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ConsumerDriver {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(ConsumerDriver.class);
job.setMapperClass(ConsumerMapper.class);
job.setReducerClass(ConsumerReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
6. 运行
- 复制数据到 hdfs
[dnn@master Desktop]$ hadoop dfs -copyFromLocal /home/dnn/eclipse-workspace/HDFS_example/consumer.txt /InputDataTest
- 导出 jar 在 bash 命令行运行
hadoop jar /home/dnn/eclipse-workspace/HDFS_example/consumer_avg.jar com.michael.mapreduce.ConsumerDriver /InputDataTest/consumer.txt /OutputTest2
- 运行结果
[dnn@master Desktop]$ hadoop fs -cat /OutputTest2/part-r-00000
会员 252
非会员 251