1、数据
1.1 名词解释
- 广告曝光量:广告被浏览的次数,简称PV(page view)
- 广告点击量:广告被点击的次数,常用click表示
- 广告点击率:广告点击量/广告曝光量,clicks/views,常用click_ratio表示 比如某个广告对用户播放了10000次,其中有100个用户点击了该广告,那么click_ratio=100/10000=1%
1.2 日志数据格式
字段 | 类型 | 说明 |
---|---|---|
area_id | 字符串 | 地域编码 |
user_id | 字符串 | 用户编码 |
view_type | 整数 | 浏览类型,1:曝光,2:点击 |
data | 字符串 | 日期,格式yyyyMMdd |
字段之间通过tab键分割
1.3 上传数据
代码语言:javascript复制[root@node1 ~]# hadoop fs -mkdir -p /ad/pv_click
[root@node1 ~]# hadoop fs -ls /ad
Found 1 items
drwxr-xr-x - root supergroup 0 2018-04-06 10:30 /ad/pv_click
[root@node1 ~]# hadoop fs -put /root/data/ad/ad_data* /ad/pv_click
[root@node1 ~]# hadoop fs -ls /ad/pv_click
Found 5 items
-rw-r--r-- 3 root supergroup 413 2018-04-06 10:31 /ad/pv_click/ad_data_20171224.txt
-rw-r--r-- 3 root supergroup 629 2018-04-06 10:31 /ad/pv_click/ad_data_20171225.txt
-rw-r--r-- 3 root supergroup 353 2018-04-06 10:31 /ad/pv_click/ad_data_20171226.txt
-rw-r--r-- 3 root supergroup 499 2018-04-06 10:31 /ad/pv_click/ad_data_20171227.txt
-rw-r--r-- 3 root supergroup 247 2018-04-06 10:31 /ad/pv_click/ad_data_20171228.txt
[root@node1 ~]#
代码语言:javascript复制[root@node1 ~]# hadoop fs -cat /ad/pv_click/ad_data_20171228.txt
11 xiaoming 1 20171228
11 xiaofang 1 20171228
11 xiaofang 2 20171228
11 xiaoshan 1 20171228
11 xiaoli 1 20171228
12 zhangsan 1 20171228
12 lisi 1 20171228
31 wangwu 1 20171228
31 mazi 1 20171228
12 daming 1 20171228
11 suancai 1 20171228[root@node1 ~]#
2、统计需求1
2.1 需求说明
一批TB或者PB量级的历史广告数据,需要完成如下功能
- 统计粒度:按天统计
- 统计指标:计算曝光量(PV)
- 按曝光量升序和倒序排序
2.2 问题分析
整个需求可以分为两个作业 - 统计作业:按天统计报告量 - 排序作业:按照曝光量进行全排序;依赖于前一个作业的输出结果;升序依赖MR的shuffle阶段对key进行升序排序的特征;降序需要重写key的比较器
2.3 程序实现
代码语言:javascript复制package cn.hadron.ad;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 需求1:一批TB或者PB量级的历史广告数据,需要完成如下功能
统计粒度:按天统计
统计指标:计算曝光量(PV)
按照曝光量升序排列和倒序排列
*/
public class MrPvSortByDayApp{
static class PvMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
//输入的line字符串字段格式"area_id user_id visit_type date",分隔符是Tab键
String[] fields = line.split("t");
//浏览类型view_type,1表示曝光,2表示点击
int view_type = Integer.parseInt(fields[2]);
//日期
String date = fields[3];
int pv = 0;
if(view_type == 1){
pv = 1;
}
context.write(new Text(date),new IntWritable(pv));
}
}
static class PvReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum_pv = 0;
//计算当天的曝光量
for(IntWritable pv : values){
sum_pv = pv.get();
}
context.write(key,new IntWritable(sum_pv) );
}
}
static class PvSortMapper extends Mapper<LongWritable,Text,IntWritable,Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
//输入的line字符串字段格式"date pv"
String[] fields = line.split("t");
String date = fields[0];
int pv = Integer.parseInt(fields[1]);
//交互key和value,按照新的key进行排序
context.write(new IntWritable(pv),new Text(date));
//可能出现相同的pv值不同日期的数据将发送到同一个Reduce端处理
}
}
static class PvSortReducer extends Reducer<IntWritable,Text,Text,IntWritable>{
@Override
protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//解析当前pv值的对应的不同日期
for(Text dt : values){
String date = dt.toString();
//再次交互key和value
context.write(new Text(date),key);
}
}
}
//重写IntWritable.Comparator比较器,默认返回正数,是升序,倒序返回负数
public static class IntWritableDescComparator extends
IntWritable.Comparator {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
/**
* 两个Job穿行执行
*/
public static void main(String[] args) throws Exception {
/**
* 第一个Job:计算曝光量
*/
String inputPath = args[0];
String outputPath = args[1];
Path tmpPath = new Path("MrPvSortByDayTmp");
Configuration conf = new Configuration();
Job jobPv = Job.getInstance(conf,"MrPvByDay");
//设置作业执行主类
jobPv.setJarByClass(MrPvSortByDayApp.class);
//设置作业中使用的mapper和reducer业务类
jobPv.setMapperClass(PvMapper.class);
jobPv.setReducerClass(PvReducer.class);
//设置Mapper输出类型
jobPv.setMapOutputKeyClass(Text.class);
jobPv.setMapOutputValueClass(IntWritable.class);
//设置Reducer输出类型
jobPv.setOutputKeyClass(Text.class);
jobPv.setOutputValueClass(IntWritable.class);
//指定job输入数据路径
FileInputFormat.setInputPaths(jobPv,new Path(inputPath));
//指定job输出结果数据路径
FileOutputFormat.setOutputPath(jobPv,tmpPath);
boolean jobPvStatus = jobPv.waitForCompletion(true);
/**
* 第2个Job:排序
*/
Job jobPvSort = Job.getInstance(conf,"MrPvSortByDay");
//设置作业执行主类
jobPvSort.setJarByClass(MrPvSortByDayApp.class);
//设置作业中使用的mapper和reducer业务类
jobPvSort.setMapperClass(PvSortMapper.class);
jobPvSort.setReducerClass(PvSortReducer.class);
//在一个reduce任务中实现全局排序,设置reduce任务数为1,
jobPvSort.setNumReduceTasks(1);
//添加倒序排列比较器
jobPvSort.setSortComparatorClass(IntWritableDescComparator.class);
//设置Mapper输出类型
jobPvSort.setMapOutputKeyClass(IntWritable.class);
jobPvSort.setMapOutputValueClass(Text.class);
//设置Reducer输出类型
jobPvSort.setOutputKeyClass(Text.class);
jobPvSort.setOutputValueClass(IntWritable.class);
//指定job输入数据路径
FileInputFormat.setInputPaths(jobPvSort,tmpPath);
//指定job输出结果数据路径
FileOutputFormat.setOutputPath(jobPvSort,new Path(outputPath));
//等待第1个Job执行成功之后,再执行第2个Job
if(jobPvStatus){
System.exit(jobPvSort.waitForCompletion(true)?0:1);
}
}
}
2.4 打包运行
编译打包成ad.jar,并上传到node1节点的/root/jar目录下。
代码语言:javascript复制[root@node1 ~]# hadoop jar /root/jar/ad.jar cn.hadron.ad.MrPvSortByDayApp /ad/pv_click/ /ad/pv_click/output
18/04/06 10:56:04 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/04/06 10:56:06 INFO input.FileInputFormat: Total input paths to process : 5
18/04/06 10:56:07 INFO mapreduce.JobSubmitter: number of splits:5
18/04/06 10:56:08 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1523020114171_0001
18/04/06 10:56:12 INFO impl.YarnClientImpl: Submitted application application_1523020114171_0001
18/04/06 10:56:13 INFO mapreduce.Job: The url to track the job: http://node1:8088/proxy/application_1523020114171_0001/
18/04/06 10:56:13 INFO mapreduce.Job: Running job: job_1523020114171_0001
18/04/06 10:56:44 INFO mapreduce.Job: Job job_1523020114171_0001 running in uber mode : false
18/04/06 10:56:44 INFO mapreduce.Job: map 0% reduce 0%
18/04/06 11:04:14 INFO mapreduce.Job: map 100% reduce 0%
18/04/06 11:05:00 INFO mapreduce.Job: map 100% reduce 67%
18/04/06 11:05:03 INFO mapreduce.Job: map 100% reduce 100%
18/04/06 11:05:10 INFO mapreduce.Job: Job job_1523020114171_0001 completed successfully
18/04/06 11:05:10 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=1431
FILE: Number of bytes written=728429
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2686
HDFS: Number of bytes written=60
HDFS: Number of read operations=18
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=5
Launched reduce tasks=1
Data-local map tasks=5
Total time spent by all maps in occupied slots (ms)=2237681
Total time spent by all reduces in occupied slots (ms)=41648
Total time spent by all map tasks (ms)=2237681
Total time spent by all reduce tasks (ms)=41648
Total vcore-milliseconds taken by all map tasks=2237681
Total vcore-milliseconds taken by all reduce tasks=41648
Total megabyte-milliseconds taken by all map tasks=2291385344
Total megabyte-milliseconds taken by all reduce tasks=42647552
Map-Reduce Framework
Map input records=95
Map output records=95
Map output bytes=1235
Map output materialized bytes=1455
Input split bytes=545
Combine input records=0
Combine output records=0
Reduce input groups=5
Reduce shuffle bytes=1455
Reduce input records=95
Reduce output records=5
Spilled Records=190
Shuffled Maps =5
Failed Shuffles=0
Merged Map outputs=5
GC time elapsed (ms)=18555
CPU time spent (ms)=56630
Physical memory (bytes) snapshot=1103552512
Virtual memory (bytes) snapshot=12614860800
Total committed heap usage (bytes)=756371456
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=2141
File Output Format Counters
Bytes Written=60
18/04/06 11:05:10 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/04/06 11:05:11 INFO input.FileInputFormat: Total input paths to process : 1
18/04/06 11:05:13 INFO mapreduce.JobSubmitter: number of splits:1
18/04/06 11:05:13 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1523020114171_0002
18/04/06 11:05:14 INFO impl.YarnClientImpl: Submitted application application_1523020114171_0002
18/04/06 11:05:14 INFO mapreduce.Job: The url to track the job: http://node1:8088/proxy/application_1523020114171_0002/
18/04/06 11:05:14 INFO mapreduce.Job: Running job: job_1523020114171_0002
18/04/06 11:05:51 INFO mapreduce.Job: Job job_1523020114171_0002 running in uber mode : false
18/04/06 11:05:51 INFO mapreduce.Job: map 0% reduce 0%
18/04/06 11:06:24 INFO mapreduce.Job: map 100% reduce 0%
18/04/06 11:06:40 INFO mapreduce.Job: map 100% reduce 100%
18/04/06 11:06:42 INFO mapreduce.Job: Job job_1523020114171_0002 completed successfully
18/04/06 11:06:42 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=81
FILE: Number of bytes written=242409
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=176
HDFS: Number of bytes written=60
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=28812
Total time spent by all reduces in occupied slots (ms)=10410
Total time spent by all map tasks (ms)=28812
Total time spent by all reduce tasks (ms)=10410
Total vcore-milliseconds taken by all map tasks=28812
Total vcore-milliseconds taken by all reduce tasks=10410
Total megabyte-milliseconds taken by all map tasks=29503488
Total megabyte-milliseconds taken by all reduce tasks=10659840
Map-Reduce Framework
Map input records=5
Map output records=5
Map output bytes=65
Map output materialized bytes=81
Input split bytes=116
Combine input records=0
Combine output records=0
Reduce input groups=5
Reduce shuffle bytes=81
Reduce input records=5
Reduce output records=5
Spilled Records=10
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=405
CPU time spent (ms)=1970
Physical memory (bytes) snapshot=332242944
Virtual memory (bytes) snapshot=4230119424
Total committed heap usage (bytes)=200867840
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=60
File Output Format Counters
Bytes Written=60
[root@node1 ~]#
查看输出结果
代码语言:javascript复制[root@node1 ~]# hadoop fs -ls /ad/pv_click/output
Found 2 items
-rw-r--r-- 3 root supergroup 0 2018-04-06 11:06 /ad/pv_click/output/_SUCCESS
-rw-r--r-- 3 root supergroup 60 2018-04-06 11:06 /ad/pv_click/output/part-r-00000
[root@node1 ~]# hadoop fs -cat /ad/pv_click/output/part-r-00000
20171225 25
20171227 20
20171224 16
20171226 15
20171228 10
[root@node1 ~]#
3、统计需求2
3.1 需求说明
对前一天的广告数据进行统计,需要完成以下功能
- 统计粒度:按天统计
- 统计频率:每天统计前一天的数据
- 统计指标:曝光量pv,点击量click,点击率clickc_ratio
- 统计维度:地域area_id
3.2 问题分析
- 统计的指标有pv、click和click_ratio,click_ratio可以通过在同一个Reduce任务中使用click/pv计算求得
- 统计指标是多个,需要自定义对象封装多个指标,对象需要实现Writable序列化接口
- 按照地域和日期两个维度统计,使用组合键,保证同一天同一个地域的数据被分发到同一个Reduce Task中
3.3 编程实现
代码语言:javascript复制package cn.hadron.ad;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 实现Writable接口,序列化
*/
public class AdMetricBean implements Writable{
//点击量
private long click;
//曝光量
private long pv;
//反序列化时需要调用空参构造函数,如果空参构造函数被覆盖,一定要显示定义一下,否则在反序列时会抛异常
public AdMetricBean(){}
//定义带参数的构造方法
public AdMetricBean(long pv,long click){
this.pv = pv;
this.click = click;
}
/**
* 序列化时按顺序写出
*/
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(pv);
dataOutput.writeLong(click);
}
/**
* 反序列化的顺序跟序列化的顺序一致
*/
public void readFields(DataInput dataInput) throws IOException {
//由于在序列化的时候先写入的pv,所以反序列化的时候先拿出来pv
pv = dataInput.readLong();
click = dataInput.readLong();
}
//省略getter和setter
}
代码语言:javascript复制package cn.hadron.ad;
import cn.hadron.ad.AdMetricBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 需求2:对前一天产生的广告数据进行统计,需要完成如下功能
* 统计粒度:按天统计
* 统计频率:每天统计前一天的数据
* 统计指标:曝光量pv,点击量click,点击率click_ratio
* 统计维度:地域area_id
*/
public class MrPvClickByAreaDayApp {
static class PvClickMapper extends Mapper<LongWritable,Text,Text,AdMetricBean>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
//输入的line字符串字段格式area_id user_id visit_type date,分隔符是Tab键
String[] fields = line.split("t");
//地域ID
String area_id = fields[0];
//浏览类型view_type,1表示曝光,2表示点击
int view_type = Integer.parseInt(fields[2]);
//日期
String date = fields[3];
int pv = 0;
int click = 0;
if(view_type == 1){
pv = 1;
}else if(view_type == 2){
click = 1;
}
//组合键,保证同一区域同一日期的数据发送到同一Reduce处理
String keyStr = area_id "-" date;
context.write(new Text(keyStr),new AdMetricBean(pv,click));
}
}
static class PvClickReducer extends Reducer<Text,AdMetricBean,Text,NullWritable>{
/**
* reduce方法是针对输入的一组数据,一个key和它的所有value组成一组(k:v1,v2,v3)
*/
@Override
protected void reduce(Text key, Iterable<AdMetricBean> values, Context context) throws IOException, InterruptedException {
//定义计数器
long pv = 0;
long click = 0;
//遍历一组数据,将key出现次数累加到count
for(AdMetricBean amb : values){
pv = amb.getPv();
click = amb.getClick();
}
double clickRatio = (double)click / pv * 100;
//保留两位小数
String clickRatioStr = String.format("%.2f", clickRatio).toString() "%";
String[] keys = key.toString().split("-");
String line = keys[1] "t" keys[0] "t" pv "t" click "t" clickRatioStr;
context.write(new Text(line),NullWritable.get());
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String inputPath = args[0];
String outputPath = args[1];
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置作业名称
job.setJobName("MrPvClickByAreaDayApp");
//设置主类
job.setJarByClass(MrPvClickByAreaDayApp.class);
//设置作业中使用的Mapper和Reducer类
job.setMapperClass(PvClickMapper.class);
job.setReducerClass(PvClickReducer.class);
//设置Mapper阶段的输出key类型和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(AdMetricBean.class);
//设置reducer阶段的输出key类型和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置job的输入路径和输出路径
FileInputFormat.setInputPaths(job,new Path(inputPath));
FileOutputFormat.setOutputPath(job,new Path(outputPath));
System.exit(job.waitForCompletion(true)?0:1);
}
}
3.4 打包运行
重新编译打包成ad.jar,并上传到node1节点的/root/jar目录下。
代码语言:javascript复制[root@node1 ~]# hadoop jar /root/jar/ad.jar cn.hadron.ad.MrPvClickByAreaDayApp /ad/pv_click/ad_data_20171228.txt /ad/pv_click/output2
18/04/06 23:32:43 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/04/06 23:32:45 INFO input.FileInputFormat: Total input paths to process : 1
18/04/06 23:32:46 INFO mapreduce.JobSubmitter: number of splits:1
18/04/06 23:32:46 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1523071856530_0001
18/04/06 23:32:47 INFO impl.YarnClientImpl: Submitted application application_1523071856530_0001
18/04/06 23:32:47 INFO mapreduce.Job: The url to track the job: http://node1:8088/proxy/application_1523071856530_0001/
18/04/06 23:32:47 INFO mapreduce.Job: Running job: job_1523071856530_0001
18/04/06 23:33:12 INFO mapreduce.Job: Job job_1523071856530_0001 running in uber mode : false
18/04/06 23:33:12 INFO mapreduce.Job: map 0% reduce 0%
18/04/06 23:33:31 INFO mapreduce.Job: map 100% reduce 0%
18/04/06 23:34:08 INFO mapreduce.Job: map 100% reduce 100%
18/04/06 23:34:12 INFO mapreduce.Job: Job job_1523071856530_0001 completed successfully
18/04/06 23:34:12 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=336
FILE: Number of bytes written=242563
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=356
HDFS: Number of bytes written=67
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=11976
Total time spent by all reduces in occupied slots (ms)=34802
Total time spent by all map tasks (ms)=11976
Total time spent by all reduce tasks (ms)=34802
Total vcore-milliseconds taken by all map tasks=11976
Total vcore-milliseconds taken by all reduce tasks=34802
Total megabyte-milliseconds taken by all map tasks=12263424
Total megabyte-milliseconds taken by all reduce tasks=35637248
Map-Reduce Framework
Map input records=11
Map output records=11
Map output bytes=308
Map output materialized bytes=336
Input split bytes=109
Combine input records=0
Combine output records=0
Reduce input groups=3
Reduce shuffle bytes=336
Reduce input records=11
Reduce output records=3
Spilled Records=22
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=355
CPU time spent (ms)=3260
Physical memory (bytes) snapshot=331587584
Virtual memory (bytes) snapshot=4228476928
Total committed heap usage (bytes)=195104768
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=247
File Output Format Counters
Bytes Written=67
[root@node1 ~]#
查看结果
代码语言:javascript复制[root@node1 ~]# hadoop fs -ls /ad/pv_click/output2
Found 2 items
-rw-r--r-- 3 root supergroup 0 2018-04-06 23:34 /ad/pv_click/output2/_SUCCESS
-rw-r--r-- 3 root supergroup 67 2018-04-06 23:34 /ad/pv_click/output2/part-r-00000
[root@node1 ~]# hadoop fs -cat /ad/pv_click/output2/part-r-00000
20171228 11 5 1 20.00%
20171228 12 3 0 0.00%
20171228 31 2 0 0.00%
[root@node1 ~]#
4、统计需求3
4.1 需求说明
找出不同性别的不同年龄段用户对某个产品的最高打分,需要完成如下功能 (1)日志格式
- 姓名,name,字符串
- 年龄,age,整数
- 性别,gender,字符串
- 打分,core,整数,0到100
(2)统计指标
- 指标:最高分
- 维度:姓名,性别,年龄
(3)上传数据
代码语言:javascript复制[root@node1 ~]# hadoop fs -put /root/data/ad/user_core.txt /ad/pv_click
[root@node1 ~]# hadoop fs -cat /ad/pv_click/user_core.txt
xiaoming 18 female 50
xiaoqiang 17 female 90
xiaofang 16 male 80
xiaoqing 13 male 60
zhangsan 25 female 70
lisi 30 female 80
wangwu 28 female 95
zhaosi 35 male 80
guangkun 40 male 60
tangseng 56 male 70
sunwukong 60 male 80
zhubajie 65 female 90
shaseng 55 female 70[root@node1 ~]#
4.2 问题分析
(1)保证同一年龄段的数据发送到同一Reduce Task中
- 默认HashPartitioner不满足要求
- 自定义Partitioner
(2)借助Reduce阶段的shuffle对相同key的数据整合成key对应一组数据的特征,保证同一性别的数据被一个reduce方法处理 (3)默认Reduce Task 数为1,需要根据分区数调整Reduce Task数
4.3 编程实现
代码语言:javascript复制package cn.hadron.ad;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* 两个泛型参数对应Map输出的Key和Value类型
*/
public class AgePartitioner extends Partitioner<Text, Text> {
/**
* 按照不同年龄段进行分区,也就是相同年龄段的数据放到同一个Reduce处理
*/
public int getPartition(Text key, Text value, int numReduceTasks) {
String nameAgeScore = value.toString();
String[] fields = nameAgeScore.split("t");
int age = Integer.parseInt(fields[1]);
if(age <=20){
return 0;
}else if(age > 20 && age <=50){
return 1 % numReduceTasks;
}else{
return 2 % numReduceTasks;
}
}
}
代码语言:javascript复制package cn.hadron.ad;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 找出不同性别的不同年龄段用户对某个产品的最高打分,需要完成如下功能
* 日志数据格式
* 姓名(name),字符串类型
* 年龄(age),整数类型
* 性别(gender),字符串类型
* 打分(core),整数类型,0到100的整数值
* 统计指标
* 指标:最高分
* 维度:姓名,性别,年龄
*/
public class MrUserAgeMaxCoreApp {
static class UserAgeMaxCoreMapper extends Mapper<LongWritable,Text,Text,Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//拿到一行数据,将输入的序列化数据转换成字符串
String line = value.toString();
//将一行数据按照分隔符拆分,字段格式name age gender score
String[] fields = line.split("t");
String gender = fields[2];
String nameAgeScore = fields[0] "t" fields[1] "t" fields[3];
/**
* 注意:此处不使用模式Hash分区,使用自定义的年龄段
* 保证同一年龄段的数据发送到同一Reduce Task中
*/
context.write(new Text(gender),new Text(nameAgeScore));
}
}
/**
* Reduce Task通过网络远程复制Map Task的结果文件中属于它的分区数据:
* 1)合并所有已经复制来的数据文件
* 2)采用递归排序算法,对文件数据内容进行排序,将相同key的数据分为一组,不同key之间有序
* 3)最终生成一个key对应一组值的数据集,一个key对应的一组数据会调用一次reduce方法
*/
static class UserAgeMaxCoreReducer extends Reducer<Text,Text,Text,Text>{
/**
* 一个reduce处理同一个年龄段的数据
* reduce端进行一个归并排序预处理(shuffle阶段),按照key进行分组:男性1组,女性1组
* 分区是指定任务,分组是按照key分组,对于每组数据调用一次reduce方法
*/
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int maxCore = 0;
String name = "";
String age = "";
String gender = "";
for(Text val : values){
String nameAgeScore = val.toString();
String[] fields = nameAgeScore.split("t");
int score = Integer.parseInt(fields[2]);
if(score > maxCore){
name = fields[0];
age = fields[1];
gender = key.toString();
maxCore = score;
}
}
//每组数据写出一行数据
context.write(new Text(name),new Text(age "t" gender "t" maxCore));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String inputPath = args[0];
String outputPath = args[1];
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置作业名称
job.setJobName("UserAgeMaxCoreApp");
//设置主类
job.setJarByClass(MrUserAgeMaxCoreApp.class);
//设置作业中使用的Mapper和Reducer类
job.setMapperClass(UserAgeMaxCoreMapper.class);
job.setReducerClass(UserAgeMaxCoreReducer.class);
//设置自定义partitioner
job.setPartitionerClass(AgePartitioner.class);
job.setNumReduceTasks(3);
//设置Mapper阶段的输出key类型和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//设置reducer阶段的输出key类型和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置job的输入路径和输出路径
FileInputFormat.setInputPaths(job,new Path(inputPath));
FileOutputFormat.setOutputPath(job,new Path(outputPath));
System.exit(job.waitForCompletion(true)?0:1);
}
}
4.4 运行
代码语言:javascript复制[root@node1 ~]# hadoop jar /root/jar/ad.jar cn.hadron.ad.MrUserAgeMaxCoreApp /ad/pv_click/user_core.txt /ad/pv_click/output3
18/04/07 08:48:39 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/04/07 08:48:40 INFO input.FileInputFormat: Total input paths to process : 1
18/04/07 08:48:41 INFO mapreduce.JobSubmitter: number of splits:1
18/04/07 08:48:43 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1523071856530_0003
18/04/07 08:48:44 INFO impl.YarnClientImpl: Submitted application application_1523071856530_0003
18/04/07 08:48:44 INFO mapreduce.Job: The url to track the job: http://node1:8088/proxy/application_1523071856530_0003/
18/04/07 08:48:44 INFO mapreduce.Job: Running job: job_1523071856530_0003
18/04/07 08:49:12 INFO mapreduce.Job: Job job_1523071856530_0003 running in uber mode : false
18/04/07 08:49:12 INFO mapreduce.Job: map 0% reduce 0%
18/04/07 08:49:26 INFO mapreduce.Job: map 100% reduce 0%
18/04/07 08:49:46 INFO mapreduce.Job: map 100% reduce 33%
18/04/07 08:49:47 INFO mapreduce.Job: map 100% reduce 67%
18/04/07 08:50:17 INFO mapreduce.Job: map 100% reduce 100%
18/04/07 08:50:21 INFO mapreduce.Job: Job job_1523071856530_0003 completed successfully
18/04/07 08:50:22 INFO mapreduce.Job: Counters: 50
File System Counters
FILE: Number of bytes read=311
FILE: Number of bytes written=485005
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=380
HDFS: Number of bytes written=124
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=6
Job Counters
Killed reduce tasks=2
Launched map tasks=1
Launched reduce tasks=4
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=9635
Total time spent by all reduces in occupied slots (ms)=94473
Total time spent by all map tasks (ms)=9635
Total time spent by all reduce tasks (ms)=94473
Total vcore-milliseconds taken by all map tasks=9635
Total vcore-milliseconds taken by all reduce tasks=94473
Total megabyte-milliseconds taken by all map tasks=9866240
Total megabyte-milliseconds taken by all reduce tasks=96740352
Map-Reduce Framework
Map input records=13
Map output records=13
Map output bytes=267
Map output materialized bytes=311
Input split bytes=102
Combine input records=0
Combine output records=0
Reduce input groups=6
Reduce shuffle bytes=311
Reduce input records=13
Reduce output records=6
Spilled Records=26
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=816
CPU time spent (ms)=8670
Physical memory (bytes) snapshot=514211840
Virtual memory (bytes) snapshot=8406781952
Total committed heap usage (bytes)=226758656
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=278
File Output Format Counters
Bytes Written=124
[root@node1 ~]#
4.5 查看结果
代码语言:javascript复制[root@node1 ~]# hadoop fs -ls /ad/pv_click/output3
Found 4 items
-rw-r--r-- 3 root supergroup 0 2018-04-07 08:50 /ad/pv_click/output3/_SUCCESS
-rw-r--r-- 3 root supergroup 43 2018-04-07 08:49 /ad/pv_click/output3/part-r-00000
-rw-r--r-- 3 root supergroup 38 2018-04-07 08:49 /ad/pv_click/output3/part-r-00001
-rw-r--r-- 3 root supergroup 43 2018-04-07 08:50 /ad/pv_click/output3/part-r-00002
[root@node1 ~]# hadoop fs -cat /ad/pv_click/output3/part-r-00000
xiaoqiang 17 female 90
xiaofang 16 male 80
[root@node1 ~]# hadoop fs -cat /ad/pv_click/output3/part-r-00001
wangwu 28 female 95
zhaosi 35 male 80
[root@node1 ~]# hadoop fs -cat /ad/pv_click/output3/part-r-00002
zhubajie 65 female 90
sunwukong 60 male 80
[root@node1 ~]#