MapReduce应用:广告数据分析

2022-05-06 19:55:18 浏览数 (1)

1、数据

1.1 名词解释
  • 广告曝光量:广告被浏览的次数,简称PV(page view)
  • 广告点击量:广告被点击的次数,常用click表示
  • 广告点击率:广告点击量/广告曝光量,clicks/views,常用click_ratio表示 比如某个广告对用户播放了10000次,其中有100个用户点击了该广告,那么click_ratio=100/10000=1%
1.2 日志数据格式

字段

类型

说明

area_id

字符串

地域编码

user_id

字符串

用户编码

view_type

整数

浏览类型,1:曝光,2:点击

data

字符串

日期,格式yyyyMMdd

字段之间通过tab键分割

1.3 上传数据
代码语言:javascript复制
[root@node1 ~]# hadoop fs -mkdir -p /ad/pv_click
[root@node1 ~]# hadoop fs -ls /ad
Found 1 items
drwxr-xr-x   - root supergroup          0 2018-04-06 10:30 /ad/pv_click        
[root@node1 ~]# hadoop fs -put /root/data/ad/ad_data* /ad/pv_click
[root@node1 ~]# hadoop fs -ls /ad/pv_click
Found 5 items
-rw-r--r--   3 root supergroup        413 2018-04-06 10:31 /ad/pv_click/ad_data_20171224.txt
-rw-r--r--   3 root supergroup        629 2018-04-06 10:31 /ad/pv_click/ad_data_20171225.txt
-rw-r--r--   3 root supergroup        353 2018-04-06 10:31 /ad/pv_click/ad_data_20171226.txt
-rw-r--r--   3 root supergroup        499 2018-04-06 10:31 /ad/pv_click/ad_data_20171227.txt
-rw-r--r--   3 root supergroup        247 2018-04-06 10:31 /ad/pv_click/ad_data_20171228.txt
[root@node1 ~]#
代码语言:javascript复制
[root@node1 ~]# hadoop fs -cat /ad/pv_click/ad_data_20171228.txt
11  xiaoming    1   20171228
11  xiaofang    1   20171228
11  xiaofang    2   20171228
11  xiaoshan    1   20171228
11  xiaoli  1   20171228
12  zhangsan    1   20171228
12  lisi    1   20171228
31  wangwu  1   20171228
31  mazi    1   20171228
12  daming  1   20171228
11  suancai 1   20171228[root@node1 ~]#

2、统计需求1

2.1 需求说明

一批TB或者PB量级的历史广告数据,需要完成如下功能

  • 统计粒度:按天统计
  • 统计指标:计算曝光量(PV)
  • 按曝光量升序和倒序排序
2.2 问题分析

整个需求可以分为两个作业 - 统计作业:按天统计报告量 - 排序作业:按照曝光量进行全排序;依赖于前一个作业的输出结果;升序依赖MR的shuffle阶段对key进行升序排序的特征;降序需要重写key的比较器

2.3 程序实现
代码语言:javascript复制
package cn.hadron.ad;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

/**
 * 需求1:一批TB或者PB量级的历史广告数据,需要完成如下功能
 统计粒度:按天统计
 统计指标:计算曝光量(PV)
 按照曝光量升序排列和倒序排列
 */
public class MrPvSortByDayApp{
    static class PvMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            //输入的line字符串字段格式"area_id user_id visit_type date",分隔符是Tab键
            String[] fields = line.split("t");
            //浏览类型view_type,1表示曝光,2表示点击
            int view_type = Integer.parseInt(fields[2]);
            //日期
            String date = fields[3];
            int pv = 0;
            if(view_type == 1){
                pv = 1;
            }
            context.write(new Text(date),new IntWritable(pv));
        }
    }
    static class PvReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum_pv = 0;
            //计算当天的曝光量
            for(IntWritable pv : values){
                sum_pv  = pv.get();
            }
            context.write(key,new IntWritable(sum_pv) );
        }
    }
    static class PvSortMapper extends Mapper<LongWritable,Text,IntWritable,Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            //输入的line字符串字段格式"date pv"
            String[] fields = line.split("t");
            String date = fields[0];
            int pv = Integer.parseInt(fields[1]);
            //交互key和value,按照新的key进行排序
            context.write(new IntWritable(pv),new Text(date));
            //可能出现相同的pv值不同日期的数据将发送到同一个Reduce端处理
        }
    }
    static class PvSortReducer extends Reducer<IntWritable,Text,Text,IntWritable>{
        @Override
        protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            //解析当前pv值的对应的不同日期
            for(Text dt : values){
                String date = dt.toString();
                //再次交互key和value
                context.write(new Text(date),key);
            }
        }
    }

    //重写IntWritable.Comparator比较器,默认返回正数,是升序,倒序返回负数
    public static class IntWritableDescComparator extends
            IntWritable.Comparator {
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return -super.compare(b1, s1, l1, b2, s2, l2);
        }
    }

    /**
    *  两个Job穿行执行
    */
    public static void main(String[] args) throws Exception {
        /**
        * 第一个Job:计算曝光量
        */
        String inputPath = args[0];
        String outputPath = args[1];
        Path tmpPath = new Path("MrPvSortByDayTmp");
        Configuration conf = new Configuration();
        Job jobPv = Job.getInstance(conf,"MrPvByDay");
        //设置作业执行主类
        jobPv.setJarByClass(MrPvSortByDayApp.class);
        //设置作业中使用的mapper和reducer业务类
        jobPv.setMapperClass(PvMapper.class);
        jobPv.setReducerClass(PvReducer.class);
        //设置Mapper输出类型
        jobPv.setMapOutputKeyClass(Text.class);
        jobPv.setMapOutputValueClass(IntWritable.class);
        //设置Reducer输出类型
        jobPv.setOutputKeyClass(Text.class);
        jobPv.setOutputValueClass(IntWritable.class);
        //指定job输入数据路径
        FileInputFormat.setInputPaths(jobPv,new Path(inputPath));
        //指定job输出结果数据路径
        FileOutputFormat.setOutputPath(jobPv,tmpPath);
        boolean jobPvStatus = jobPv.waitForCompletion(true);
        /**
        * 第2个Job:排序
        */
        Job jobPvSort = Job.getInstance(conf,"MrPvSortByDay");
        //设置作业执行主类
        jobPvSort.setJarByClass(MrPvSortByDayApp.class);
        //设置作业中使用的mapper和reducer业务类
        jobPvSort.setMapperClass(PvSortMapper.class);
        jobPvSort.setReducerClass(PvSortReducer.class);
        //在一个reduce任务中实现全局排序,设置reduce任务数为1,
        jobPvSort.setNumReduceTasks(1);
        //添加倒序排列比较器
        jobPvSort.setSortComparatorClass(IntWritableDescComparator.class);
        //设置Mapper输出类型
        jobPvSort.setMapOutputKeyClass(IntWritable.class);
        jobPvSort.setMapOutputValueClass(Text.class);
        //设置Reducer输出类型
        jobPvSort.setOutputKeyClass(Text.class);
        jobPvSort.setOutputValueClass(IntWritable.class);
        //指定job输入数据路径
        FileInputFormat.setInputPaths(jobPvSort,tmpPath);
        //指定job输出结果数据路径
        FileOutputFormat.setOutputPath(jobPvSort,new Path(outputPath));
        //等待第1个Job执行成功之后,再执行第2个Job
        if(jobPvStatus){
            System.exit(jobPvSort.waitForCompletion(true)?0:1);
        }
    }
}
2.4 打包运行

编译打包成ad.jar,并上传到node1节点的/root/jar目录下。

代码语言:javascript复制
[root@node1 ~]# hadoop jar /root/jar/ad.jar cn.hadron.ad.MrPvSortByDayApp /ad/pv_click/ /ad/pv_click/output
18/04/06 10:56:04 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/04/06 10:56:06 INFO input.FileInputFormat: Total input paths to process : 5
18/04/06 10:56:07 INFO mapreduce.JobSubmitter: number of splits:5
18/04/06 10:56:08 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1523020114171_0001
18/04/06 10:56:12 INFO impl.YarnClientImpl: Submitted application application_1523020114171_0001
18/04/06 10:56:13 INFO mapreduce.Job: The url to track the job: http://node1:8088/proxy/application_1523020114171_0001/
18/04/06 10:56:13 INFO mapreduce.Job: Running job: job_1523020114171_0001
18/04/06 10:56:44 INFO mapreduce.Job: Job job_1523020114171_0001 running in uber mode : false
18/04/06 10:56:44 INFO mapreduce.Job:  map 0% reduce 0%
18/04/06 11:04:14 INFO mapreduce.Job:  map 100% reduce 0%
18/04/06 11:05:00 INFO mapreduce.Job:  map 100% reduce 67%
18/04/06 11:05:03 INFO mapreduce.Job:  map 100% reduce 100%
18/04/06 11:05:10 INFO mapreduce.Job: Job job_1523020114171_0001 completed successfully
18/04/06 11:05:10 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=1431
        FILE: Number of bytes written=728429
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=2686
        HDFS: Number of bytes written=60
        HDFS: Number of read operations=18
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=5
        Launched reduce tasks=1
        Data-local map tasks=5
        Total time spent by all maps in occupied slots (ms)=2237681
        Total time spent by all reduces in occupied slots (ms)=41648
        Total time spent by all map tasks (ms)=2237681
        Total time spent by all reduce tasks (ms)=41648
        Total vcore-milliseconds taken by all map tasks=2237681
        Total vcore-milliseconds taken by all reduce tasks=41648
        Total megabyte-milliseconds taken by all map tasks=2291385344
        Total megabyte-milliseconds taken by all reduce tasks=42647552
    Map-Reduce Framework
        Map input records=95
        Map output records=95
        Map output bytes=1235
        Map output materialized bytes=1455
        Input split bytes=545
        Combine input records=0
        Combine output records=0
        Reduce input groups=5
        Reduce shuffle bytes=1455
        Reduce input records=95
        Reduce output records=5
        Spilled Records=190
        Shuffled Maps =5
        Failed Shuffles=0
        Merged Map outputs=5
        GC time elapsed (ms)=18555
        CPU time spent (ms)=56630
        Physical memory (bytes) snapshot=1103552512
        Virtual memory (bytes) snapshot=12614860800
        Total committed heap usage (bytes)=756371456
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=2141
    File Output Format Counters 
        Bytes Written=60
18/04/06 11:05:10 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/04/06 11:05:11 INFO input.FileInputFormat: Total input paths to process : 1
18/04/06 11:05:13 INFO mapreduce.JobSubmitter: number of splits:1
18/04/06 11:05:13 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1523020114171_0002
18/04/06 11:05:14 INFO impl.YarnClientImpl: Submitted application application_1523020114171_0002
18/04/06 11:05:14 INFO mapreduce.Job: The url to track the job: http://node1:8088/proxy/application_1523020114171_0002/
18/04/06 11:05:14 INFO mapreduce.Job: Running job: job_1523020114171_0002
18/04/06 11:05:51 INFO mapreduce.Job: Job job_1523020114171_0002 running in uber mode : false
18/04/06 11:05:51 INFO mapreduce.Job:  map 0% reduce 0%
18/04/06 11:06:24 INFO mapreduce.Job:  map 100% reduce 0%
18/04/06 11:06:40 INFO mapreduce.Job:  map 100% reduce 100%
18/04/06 11:06:42 INFO mapreduce.Job: Job job_1523020114171_0002 completed successfully
18/04/06 11:06:42 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=81
        FILE: Number of bytes written=242409
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=176
        HDFS: Number of bytes written=60
        HDFS: Number of read operations=6
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=1
        Launched reduce tasks=1
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=28812
        Total time spent by all reduces in occupied slots (ms)=10410
        Total time spent by all map tasks (ms)=28812
        Total time spent by all reduce tasks (ms)=10410
        Total vcore-milliseconds taken by all map tasks=28812
        Total vcore-milliseconds taken by all reduce tasks=10410
        Total megabyte-milliseconds taken by all map tasks=29503488
        Total megabyte-milliseconds taken by all reduce tasks=10659840
    Map-Reduce Framework
        Map input records=5
        Map output records=5
        Map output bytes=65
        Map output materialized bytes=81
        Input split bytes=116
        Combine input records=0
        Combine output records=0
        Reduce input groups=5
        Reduce shuffle bytes=81
        Reduce input records=5
        Reduce output records=5
        Spilled Records=10
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=405
        CPU time spent (ms)=1970
        Physical memory (bytes) snapshot=332242944
        Virtual memory (bytes) snapshot=4230119424
        Total committed heap usage (bytes)=200867840
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=60
    File Output Format Counters 
        Bytes Written=60
[root@node1 ~]# 

查看输出结果

代码语言:javascript复制
[root@node1 ~]# hadoop fs -ls /ad/pv_click/output
Found 2 items
-rw-r--r--   3 root supergroup          0 2018-04-06 11:06 /ad/pv_click/output/_SUCCESS
-rw-r--r--   3 root supergroup         60 2018-04-06 11:06 /ad/pv_click/output/part-r-00000
[root@node1 ~]# hadoop fs -cat /ad/pv_click/output/part-r-00000
20171225    25
20171227    20
20171224    16
20171226    15
20171228    10
[root@node1 ~]#

3、统计需求2

3.1 需求说明

对前一天的广告数据进行统计,需要完成以下功能

  • 统计粒度:按天统计
  • 统计频率:每天统计前一天的数据
  • 统计指标:曝光量pv,点击量click,点击率clickc_ratio
  • 统计维度:地域area_id
3.2 问题分析
  • 统计的指标有pv、click和click_ratio,click_ratio可以通过在同一个Reduce任务中使用click/pv计算求得
  • 统计指标是多个,需要自定义对象封装多个指标,对象需要实现Writable序列化接口
  • 按照地域和日期两个维度统计,使用组合键,保证同一天同一个地域的数据被分发到同一个Reduce Task中
3.3 编程实现
代码语言:javascript复制
package cn.hadron.ad;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * 实现Writable接口,序列化
 */

public class AdMetricBean implements Writable{
    //点击量
    private long click;
    //曝光量
    private long pv;

    //反序列化时需要调用空参构造函数,如果空参构造函数被覆盖,一定要显示定义一下,否则在反序列时会抛异常
    public AdMetricBean(){}

    //定义带参数的构造方法
    public AdMetricBean(long pv,long click){
        this.pv = pv;
        this.click = click;
    }

    /**
     * 序列化时按顺序写出
     */
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(pv);
        dataOutput.writeLong(click);
    }
    /**
     * 反序列化的顺序跟序列化的顺序一致
     */
    public void readFields(DataInput dataInput) throws IOException {
        //由于在序列化的时候先写入的pv,所以反序列化的时候先拿出来pv
        pv = dataInput.readLong();
        click = dataInput.readLong();
    }
    //省略getter和setter
}
代码语言:javascript复制
package cn.hadron.ad;

import cn.hadron.ad.AdMetricBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 需求2:对前一天产生的广告数据进行统计,需要完成如下功能
 * 统计粒度:按天统计
 * 统计频率:每天统计前一天的数据
 * 统计指标:曝光量pv,点击量click,点击率click_ratio
 * 统计维度:地域area_id
 */
public class MrPvClickByAreaDayApp {
    static class PvClickMapper extends Mapper<LongWritable,Text,Text,AdMetricBean>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            //输入的line字符串字段格式area_id user_id visit_type date,分隔符是Tab键
            String[] fields = line.split("t");
            //地域ID
            String area_id = fields[0];
            //浏览类型view_type,1表示曝光,2表示点击
            int view_type = Integer.parseInt(fields[2]);
            //日期
            String date = fields[3];
            int pv = 0;
            int click = 0;
            if(view_type == 1){
                pv = 1;
            }else if(view_type == 2){
                click = 1;
            }
            //组合键,保证同一区域同一日期的数据发送到同一Reduce处理
            String keyStr = area_id   "-"   date;
            context.write(new Text(keyStr),new AdMetricBean(pv,click));
        }
    }
    static class PvClickReducer extends Reducer<Text,AdMetricBean,Text,NullWritable>{
        /**
         * reduce方法是针对输入的一组数据,一个key和它的所有value组成一组(k:v1,v2,v3)
         */
        @Override
        protected void reduce(Text key, Iterable<AdMetricBean> values, Context context) throws IOException, InterruptedException {
            //定义计数器
            long pv = 0;
            long click = 0;
            //遍历一组数据,将key出现次数累加到count
            for(AdMetricBean amb : values){
                pv  = amb.getPv();
                click  = amb.getClick();
            }
            double clickRatio = (double)click / pv * 100;
            //保留两位小数
            String clickRatioStr = String.format("%.2f", clickRatio).toString()   "%";
            String[] keys = key.toString().split("-");
            String line = keys[1]   "t"   keys[0]  "t"   pv   "t"   click   "t"   clickRatioStr;

            context.write(new Text(line),NullWritable.get());

        }
    }
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        String inputPath = args[0];
        String outputPath = args[1];
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //设置作业名称
        job.setJobName("MrPvClickByAreaDayApp");
        //设置主类
        job.setJarByClass(MrPvClickByAreaDayApp.class);
        //设置作业中使用的Mapper和Reducer类
        job.setMapperClass(PvClickMapper.class);
        job.setReducerClass(PvClickReducer.class);
        //设置Mapper阶段的输出key类型和value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(AdMetricBean.class);
        //设置reducer阶段的输出key类型和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //设置job的输入路径和输出路径
        FileInputFormat.setInputPaths(job,new Path(inputPath));
        FileOutputFormat.setOutputPath(job,new Path(outputPath));
        System.exit(job.waitForCompletion(true)?0:1);
    }

}
3.4 打包运行

重新编译打包成ad.jar,并上传到node1节点的/root/jar目录下。

代码语言:javascript复制
[root@node1 ~]# hadoop jar /root/jar/ad.jar cn.hadron.ad.MrPvClickByAreaDayApp /ad/pv_click/ad_data_20171228.txt /ad/pv_click/output2
18/04/06 23:32:43 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/04/06 23:32:45 INFO input.FileInputFormat: Total input paths to process : 1
18/04/06 23:32:46 INFO mapreduce.JobSubmitter: number of splits:1
18/04/06 23:32:46 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1523071856530_0001
18/04/06 23:32:47 INFO impl.YarnClientImpl: Submitted application application_1523071856530_0001
18/04/06 23:32:47 INFO mapreduce.Job: The url to track the job: http://node1:8088/proxy/application_1523071856530_0001/
18/04/06 23:32:47 INFO mapreduce.Job: Running job: job_1523071856530_0001
18/04/06 23:33:12 INFO mapreduce.Job: Job job_1523071856530_0001 running in uber mode : false
18/04/06 23:33:12 INFO mapreduce.Job:  map 0% reduce 0%
18/04/06 23:33:31 INFO mapreduce.Job:  map 100% reduce 0%
18/04/06 23:34:08 INFO mapreduce.Job:  map 100% reduce 100%
18/04/06 23:34:12 INFO mapreduce.Job: Job job_1523071856530_0001 completed successfully
18/04/06 23:34:12 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=336
        FILE: Number of bytes written=242563
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=356
        HDFS: Number of bytes written=67
        HDFS: Number of read operations=6
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=1
        Launched reduce tasks=1
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=11976
        Total time spent by all reduces in occupied slots (ms)=34802
        Total time spent by all map tasks (ms)=11976
        Total time spent by all reduce tasks (ms)=34802
        Total vcore-milliseconds taken by all map tasks=11976
        Total vcore-milliseconds taken by all reduce tasks=34802
        Total megabyte-milliseconds taken by all map tasks=12263424
        Total megabyte-milliseconds taken by all reduce tasks=35637248
    Map-Reduce Framework
        Map input records=11
        Map output records=11
        Map output bytes=308
        Map output materialized bytes=336
        Input split bytes=109
        Combine input records=0
        Combine output records=0
        Reduce input groups=3
        Reduce shuffle bytes=336
        Reduce input records=11
        Reduce output records=3
        Spilled Records=22
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=355
        CPU time spent (ms)=3260
        Physical memory (bytes) snapshot=331587584
        Virtual memory (bytes) snapshot=4228476928
        Total committed heap usage (bytes)=195104768
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=247
    File Output Format Counters 
        Bytes Written=67
[root@node1 ~]#

查看结果

代码语言:javascript复制
[root@node1 ~]# hadoop fs -ls /ad/pv_click/output2
Found 2 items
-rw-r--r--   3 root supergroup          0 2018-04-06 23:34 /ad/pv_click/output2/_SUCCESS
-rw-r--r--   3 root supergroup         67 2018-04-06 23:34 /ad/pv_click/output2/part-r-00000
[root@node1 ~]# hadoop fs -cat /ad/pv_click/output2/part-r-00000
20171228    11  5   1   20.00%
20171228    12  3   0   0.00%
20171228    31  2   0   0.00%
[root@node1 ~]#

4、统计需求3

4.1 需求说明

找出不同性别的不同年龄段用户对某个产品的最高打分,需要完成如下功能 (1)日志格式

  • 姓名,name,字符串
  • 年龄,age,整数
  • 性别,gender,字符串
  • 打分,core,整数,0到100

(2)统计指标

  • 指标:最高分
  • 维度:姓名,性别,年龄

(3)上传数据

代码语言:javascript复制
[root@node1 ~]# hadoop fs -put /root/data/ad/user_core.txt /ad/pv_click
[root@node1 ~]# hadoop fs -cat /ad/pv_click/user_core.txt
xiaoming    18  female  50
xiaoqiang   17  female  90
xiaofang    16  male    80
xiaoqing    13  male    60
zhangsan    25  female  70
lisi    30  female  80
wangwu  28  female  95
zhaosi  35  male    80
guangkun    40  male    60
tangseng    56  male    70
sunwukong   60  male    80
zhubajie    65  female  90
shaseng 55  female  70[root@node1 ~]#
4.2 问题分析

(1)保证同一年龄段的数据发送到同一Reduce Task中

  • 默认HashPartitioner不满足要求
  • 自定义Partitioner

(2)借助Reduce阶段的shuffle对相同key的数据整合成key对应一组数据的特征,保证同一性别的数据被一个reduce方法处理 (3)默认Reduce Task 数为1,需要根据分区数调整Reduce Task数

4.3 编程实现
代码语言:javascript复制
package cn.hadron.ad;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * 两个泛型参数对应Map输出的Key和Value类型
 */
public class AgePartitioner extends Partitioner<Text, Text> {
    /**
     * 按照不同年龄段进行分区,也就是相同年龄段的数据放到同一个Reduce处理
     */
    public int getPartition(Text key, Text value, int numReduceTasks) {
        String nameAgeScore = value.toString();
        String[] fields = nameAgeScore.split("t");
        int age = Integer.parseInt(fields[1]);
        if(age <=20){
            return 0;
        }else if(age > 20 && age <=50){
            return 1 % numReduceTasks;
        }else{
            return 2 % numReduceTasks;
        }
    }
}
代码语言:javascript复制
package cn.hadron.ad;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 找出不同性别的不同年龄段用户对某个产品的最高打分,需要完成如下功能
 * 日志数据格式
 * 姓名(name),字符串类型
 * 年龄(age),整数类型
 * 性别(gender),字符串类型
 * 打分(core),整数类型,0到100的整数值
 * 统计指标
 *  指标:最高分
 *  维度:姓名,性别,年龄
 */
public class MrUserAgeMaxCoreApp {
    static class UserAgeMaxCoreMapper extends Mapper<LongWritable,Text,Text,Text>{

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //拿到一行数据,将输入的序列化数据转换成字符串
            String line = value.toString();
            //将一行数据按照分隔符拆分,字段格式name age gender score
            String[] fields = line.split("t");
            String gender = fields[2];
            String nameAgeScore = fields[0]   "t"   fields[1]   "t"   fields[3];
            /**
             * 注意:此处不使用模式Hash分区,使用自定义的年龄段
             * 保证同一年龄段的数据发送到同一Reduce Task中
             */
            context.write(new Text(gender),new Text(nameAgeScore));
        }
    }

    /**
    * Reduce Task通过网络远程复制Map Task的结果文件中属于它的分区数据:
    * 1)合并所有已经复制来的数据文件
    * 2)采用递归排序算法,对文件数据内容进行排序,将相同key的数据分为一组,不同key之间有序
    * 3)最终生成一个key对应一组值的数据集,一个key对应的一组数据会调用一次reduce方法
    */
    static class UserAgeMaxCoreReducer extends Reducer<Text,Text,Text,Text>{

        /**
         * 一个reduce处理同一个年龄段的数据
         * reduce端进行一个归并排序预处理(shuffle阶段),按照key进行分组:男性1组,女性1组
         * 分区是指定任务,分组是按照key分组,对于每组数据调用一次reduce方法
         */
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            int maxCore = 0;
            String name = "";
            String age = "";
            String gender = "";
            for(Text val : values){
                String nameAgeScore = val.toString();
                String[] fields = nameAgeScore.split("t");
                int score = Integer.parseInt(fields[2]);
                if(score > maxCore){
                    name = fields[0];
                    age = fields[1];
                    gender = key.toString();
                    maxCore = score;
                }
            }
            //每组数据写出一行数据
            context.write(new Text(name),new Text(age   "t"   gender   "t"   maxCore));

        }
    }
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        String inputPath = args[0];
        String outputPath = args[1];
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //设置作业名称
        job.setJobName("UserAgeMaxCoreApp");
        //设置主类
        job.setJarByClass(MrUserAgeMaxCoreApp.class);
        //设置作业中使用的Mapper和Reducer类
        job.setMapperClass(UserAgeMaxCoreMapper.class);
        job.setReducerClass(UserAgeMaxCoreReducer.class);
        //设置自定义partitioner
        job.setPartitionerClass(AgePartitioner.class);
        job.setNumReduceTasks(3);
        //设置Mapper阶段的输出key类型和value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //设置reducer阶段的输出key类型和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //设置job的输入路径和输出路径
        FileInputFormat.setInputPaths(job,new Path(inputPath));
        FileOutputFormat.setOutputPath(job,new Path(outputPath));
        System.exit(job.waitForCompletion(true)?0:1);
    }

}
4.4 运行
代码语言:javascript复制
[root@node1 ~]# hadoop jar /root/jar/ad.jar cn.hadron.ad.MrUserAgeMaxCoreApp /ad/pv_click/user_core.txt /ad/pv_click/output3
18/04/07 08:48:39 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/04/07 08:48:40 INFO input.FileInputFormat: Total input paths to process : 1
18/04/07 08:48:41 INFO mapreduce.JobSubmitter: number of splits:1
18/04/07 08:48:43 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1523071856530_0003
18/04/07 08:48:44 INFO impl.YarnClientImpl: Submitted application application_1523071856530_0003
18/04/07 08:48:44 INFO mapreduce.Job: The url to track the job: http://node1:8088/proxy/application_1523071856530_0003/
18/04/07 08:48:44 INFO mapreduce.Job: Running job: job_1523071856530_0003
18/04/07 08:49:12 INFO mapreduce.Job: Job job_1523071856530_0003 running in uber mode : false
18/04/07 08:49:12 INFO mapreduce.Job:  map 0% reduce 0%
18/04/07 08:49:26 INFO mapreduce.Job:  map 100% reduce 0%
18/04/07 08:49:46 INFO mapreduce.Job:  map 100% reduce 33%
18/04/07 08:49:47 INFO mapreduce.Job:  map 100% reduce 67%
18/04/07 08:50:17 INFO mapreduce.Job:  map 100% reduce 100%
18/04/07 08:50:21 INFO mapreduce.Job: Job job_1523071856530_0003 completed successfully
18/04/07 08:50:22 INFO mapreduce.Job: Counters: 50
    File System Counters
        FILE: Number of bytes read=311
        FILE: Number of bytes written=485005
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=380
        HDFS: Number of bytes written=124
        HDFS: Number of read operations=12
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=6
    Job Counters 
        Killed reduce tasks=2
        Launched map tasks=1
        Launched reduce tasks=4
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=9635
        Total time spent by all reduces in occupied slots (ms)=94473
        Total time spent by all map tasks (ms)=9635
        Total time spent by all reduce tasks (ms)=94473
        Total vcore-milliseconds taken by all map tasks=9635
        Total vcore-milliseconds taken by all reduce tasks=94473
        Total megabyte-milliseconds taken by all map tasks=9866240
        Total megabyte-milliseconds taken by all reduce tasks=96740352
    Map-Reduce Framework
        Map input records=13
        Map output records=13
        Map output bytes=267
        Map output materialized bytes=311
        Input split bytes=102
        Combine input records=0
        Combine output records=0
        Reduce input groups=6
        Reduce shuffle bytes=311
        Reduce input records=13
        Reduce output records=6
        Spilled Records=26
        Shuffled Maps =3
        Failed Shuffles=0
        Merged Map outputs=3
        GC time elapsed (ms)=816
        CPU time spent (ms)=8670
        Physical memory (bytes) snapshot=514211840
        Virtual memory (bytes) snapshot=8406781952
        Total committed heap usage (bytes)=226758656
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=278
    File Output Format Counters 
        Bytes Written=124
[root@node1 ~]#
4.5 查看结果
代码语言:javascript复制
[root@node1 ~]# hadoop fs -ls /ad/pv_click/output3
Found 4 items
-rw-r--r--   3 root supergroup          0 2018-04-07 08:50 /ad/pv_click/output3/_SUCCESS
-rw-r--r--   3 root supergroup         43 2018-04-07 08:49 /ad/pv_click/output3/part-r-00000
-rw-r--r--   3 root supergroup         38 2018-04-07 08:49 /ad/pv_click/output3/part-r-00001
-rw-r--r--   3 root supergroup         43 2018-04-07 08:50 /ad/pv_click/output3/part-r-00002
[root@node1 ~]# hadoop fs -cat /ad/pv_click/output3/part-r-00000
xiaoqiang   17  female  90
xiaofang    16  male    80
[root@node1 ~]# hadoop fs -cat /ad/pv_click/output3/part-r-00001
wangwu  28  female  95
zhaosi  35  male    80
[root@node1 ~]# hadoop fs -cat /ad/pv_click/output3/part-r-00002
zhubajie    65  female  90
sunwukong   60  male    80
[root@node1 ~]#

0 人点赞