MapReduce 编程实践:统计对象中的某些属性

2021-09-06 10:22:39 浏览数 (1)

文章目录

    • 1. 生成数据
    • 2. 编写实体类
    • 3. Mapper类
    • 4. Reducer类
    • 5. Driver类
    • 6. 运行

参考书:《Hadoop大数据原理与应用》

相关文章:MapReduce 编程实践

1. 生成数据

超市消费者 数据: id, 时间,消费金额,会员/非会员

使用 Python 生成虚拟数据

代码语言:javascript复制
import random
import time
consumer_type = ['会员', '非会员']
vip, novip = 0, 0
vipValue, novipValue = 0, 0
with open("consumer.txt",'w',encoding='utf-8') as f:
	for i in range(1000): # 1000条数据
		random.seed(time.time() i)
		id = random.randint(0, 10000)
		t = time.strftime('%Y-%m-%d %H:%M',time.localtime(time.time()))
		value = random.randint(1, 500)
		type = consumer_type[random.randint(0, 1)]
		f.write(str(id) 't' t 't' str(value) 't' type 'n')

		if type == consumer_type[0]:
			vip  = 1
			vipValue  = value
		else:
			novip  = 1
			novipValue  = value
print(consumer_type[0]   ": 人数 "   str(vip)   ", 总金额: "   str(vipValue)  ", 平均金额:" str(vipValue/vip))
print(consumer_type[1]   ": 人数 "   str(novip)   ", 总金额: "   str(novipValue)  ", 平均金额:" str(novipValue/novip))
代码语言:javascript复制
[dnn@master HDFS_example]$ python test.py
会员: 人数 510, 总金额: 128744, 平均金额:252.439215686
非会员: 人数 490, 总金额: 123249, 平均金额:251.528571429

2. 编写实体类

  • Consumer.java
代码语言:javascript复制
package com.michael.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;

public class Consumer implements Writable{
	private String id;
	private int money;
	private int vip; // 0 no, 1 yes
	public Consumer() {
		
	}
	public Consumer(String id, int money, int vip) {
		this.id = id;
		this.money = money;
		this.vip = vip;
	}
	public int getMoney() {
		return money;
	}
	public void setMoney() {
		this.money = money;
	}
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public int getVip() {
		return vip;
	}
	public void setVip(int vip) {
		this.vip = vip;
	}
	
	public void write(DataOutput dataOutput) throws IOException{
		dataOutput.writeUTF(id);
		dataOutput.writeInt(money);
		dataOutput.writeInt(vip);
	}
	
	public void readFields(DataInput dataInput) throws IOException{
		this.id = dataInput.readUTF();
		this.money = dataInput.readInt();
		this.vip = dataInput.readInt();
	}
	
	@Override
	public String toString() {
		return this.id   "t"   this.money   "t"   this.vip;
	}
}

3. Mapper类

  • ConsumerMapper.java
代码语言:javascript复制
package com.michael.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class ConsumerMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
		String line = value.toString();
		String[] fields = line.split("t");
		String id = fields[0];
		int money = Integer.parseInt(fields[2]);
		String vip = fields[3];
		
		context.write(new Text(vip), new IntWritable(money));
	}
}

4. Reducer类

  • ConsumerReducer.java
代码语言:javascript复制
package com.michael.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class ConsumerReducer extends Reducer<Text, IntWritable, Text, LongWritable>{
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
		int count = 0;
		long sum = 0;
		for(IntWritable v : values) {
			count  ;
			sum  = v.get();
		}
		long avg = sum/count;
		context.write(key, new LongWritable(avg));
	}
}

5. Driver类

  • ConsumerDriver.java
代码语言:javascript复制
package com.michael.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ConsumerDriver {
	public static void main(String[] args) throws Exception{
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		job.setJarByClass(ConsumerDriver.class);
		job.setMapperClass(ConsumerMapper.class);
		job.setReducerClass(ConsumerReducer.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		FileInputFormat.setInputPaths(job,  new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}

6. 运行

  • 复制数据到 hdfs
代码语言:javascript复制
[dnn@master Desktop]$ hadoop dfs -copyFromLocal /home/dnn/eclipse-workspace/HDFS_example/consumer.txt /InputDataTest
  • 导出 jar 在 bash 命令行运行
代码语言:javascript复制
hadoop jar /home/dnn/eclipse-workspace/HDFS_example/consumer_avg.jar com.michael.mapreduce.ConsumerDriver /InputDataTest/consumer.txt /OutputTest2
  • 运行结果
代码语言:javascript复制
[dnn@master Desktop]$ hadoop fs -cat /OutputTest2/part-r-00000
会员	252
非会员	251

0 人点赞