Hadoop基础教程-第6章 MapReduce入门(6.5 温度统计)

2022-05-06 18:45:51 浏览数 (1)

第6章 MapReduce入门

6.5 温度统计

6.5.1 问题描述

《HADOOP权威指南 第3版 》教程中有个经典例子,既是温度统计。作者Tom White在书中写了程序和讲解了原理,认为读者们都会MapReduce程序的基本环境搭建部署,所以这里轻描淡写给带过了,对于初学者来说,这是一个“天坑”,程序跑步起来,也就消磨了Hadoop初学者的兴趣和意志。

本节内容的Java项目目录结构请参见6.4节。

这里根据《HADOOP权威指南 第3版 》书中描写,再简单描述一下这个问题。 这是来自美国气象数据中心的数据,示例数据如下。

省略号表示一些未使用的列(暂不考虑)。 将这些若干多行数据以键值对方式作为map方法的输入,行号(行偏移量)作为key,value是一行数据本身。

为了方便温度统计,我们需要从每一行数据中提取出年份和温度(上图的黑体字即是)。其中4位年份是一行的第15到到第18个字符;温度被放大了100倍,每行从第87个字符,到 第91个字符,第87个字符表示温度正负符号。

2017-6-19更新 下载数据 ftp://ftp.ncdc.noaa.gov/pub/data/gsod/

为了测试方便,我们只下载了2016年和2017年的数据

(1)解压年份压缩包

代码语言:javascript复制
[root@nb0 data]# tar -xvf gsod_2016.tar
[root@nb0 data]# tar -xvf gsod_2017.tar
[root@nb0 data]# ls
........
151130-99999-2016.op.gz  416200-99999-2016.op.gz  689230-99999-2016.op.gz  723284-93839-2017.op.gz  871210-99999-2016.op.gz
151130-99999-2017.op.gz  416200-99999-2017.op.gz  689230-99999-2017.op.gz  723290-03935-2016.op.gz  871210-99999-2017.op.gz
151170-99999-2016.op.gz  416240-99999-2016.op.gz  689250-99999-2016.op.gz  723290-03935-2017.op.gz  871271-99999-2017.op.gz
151170-99999-2017.op.gz  416240-99999-2017.op.gz  689250-99999-2017.op.gz  723300-03975-2016.op.gz  871290-99999-2016.op.gz
151180-99999-2016.op.gz  416360-99999-2016.op.gz  689260-99999-2016.op.gz  723300-03975-2017.op.gz  871290-99999-2017.op.gz

(2)合并文件 使用zcat命令把这些数据文件解压并合并到一个ncdc.txt文件中

代码语言:javascript复制
[root@nb0 data]# zcat *.gz > ncdc.txt
[root@nb0 data]# ll |grep ncdc
-rw-r--r-- 1 root root 874976505 Jun 19 21:10 ncdc.txt

(3)查看ncdc.txt文件

代码语言:javascript复制
[root@nb0 data]# head -12 ncdc.txt 
STN--- WBAN   YEARMODA    TEMP       DEWP      SLP        STP       VISIB      WDSP     MXSPD   GUST    MAX     MIN   PRCP   SNDP   FRSHTT
007026 99999  20160622    94.7  7    66.7  7  9999.9  0  9999.9  0    6.2  4    0.0  7  999.9  999.9   100.4*   87.8*  0.00I 999.9  000000
007026 99999  20160623    88.3 24    69.7 24  9999.9  0  9999.9  0    6.2 24    0.0 24  999.9  999.9    98.6*   78.8*  0.00I 999.9  000000
007026 99999  20160624    80.5 24    69.3 24  9999.9  0  9999.9  0    5.8 22    0.0 24  999.9  999.9    93.2*   69.8* 99.99  999.9  010000
007026 99999  20160625    81.4 24    71.8 24  9999.9  0  9999.9  0    5.9 23    0.0 24  999.9  999.9    89.6*   73.4*  0.00I 999.9  000000
007026 99999  20160626    80.5 24    63.4 24  9999.9  0  9999.9  0    6.2 22    0.0 24  999.9  999.9    91.4*   69.8*  0.00I 999.9  000000
007026 99999  20160627    80.6 24    64.2 24  9999.9  0  9999.9  0    6.0 23    0.0 24  999.9  999.9    93.2*   68.0*  0.00I 999.9  000000
007026 99999  20160628    77.4 24    70.8 24  9999.9  0  9999.9  0    5.1 17    0.0 24  999.9  999.9    87.8*   71.6* 99.99  999.9  010000
007026 99999  20160629    74.3 16    71.9 16  9999.9  0  9999.9  0    2.7 13    0.0 16  999.9  999.9    86.0*   69.8*  0.00I 999.9  000000
STN--- WBAN   YEARMODA    TEMP       DEWP      SLP        STP       VISIB      WDSP     MXSPD   GUST    MAX     MIN   PRCP   SNDP   FRSHTT
007026 99999  20170210    46.9 11    19.4 11  9999.9  0  9999.9  0    6.2 11    3.6 11    8.0   13.0    53.6*   35.6*  0.00I 999.9  000000
007026 99999  20170211    54.4 24    35.0 24  9999.9  0  9999.9  0    6.2 24    4.7 24   12.0   21.0    77.0*   42.8*  0.00I 999.9  000000

(4)数据格式说明 ftp://ftp.ncdc.noaa.gov/pub/data/gsod/readme.txt

字段

定位

类型

说明

STN—

1-6

Int.

气象站编号

WBAN

8-12

Int.

WBAN号码适用 - 这是历史上“气象局空军海军”编号 - 以WBAN为首字母缩写。

YEAR

15-18

Int.

年份.

MODA

19-22

Int.

月日.

TEMP

25-30

Real

当天平均气温在华氏度,丢失值为9999.9

Count

32-33

Int.

用于计算平均温度的观察数。

DEWP

36-41

Real

平均露点为华氏度,丢失值为9999.9

Count

43-44

Int.

用于计算平均露点的观测数。

SLP

47-52

Real

平均海平面压力为白天毫巴,丢失值为9999.9

Count

54-55

Int.

用于计算平均海平面压力的观测数量。

STP

58-63

Real

本站气压,丢失值为9999.9

Count

65-66

Int.

用于计算平均站压力的观测数量

VISIB

69-73

Real

当天的平均可见度。丢失值为999.9

Count

75-76

Int.

用于计算平均可见度的观察数量。

WDSP

79-83

Real

当天平均风速. 丢失值为999.9

Count

85-86

Int.

用于计算平均风速的观测数。

MXSPD

89-93

Real

当天报告的最大持续风速,丢失值为999.9

GUST

96-100

Real

最高阵风报告为一天. 丢失值为999.9

MAX

103-108

Real

华氏天气报告的最高气温(因国家和地区而异),丢失值为9999.9

Flag

109-109

Char

空白表示最高温度不是从“小时”数据中获取的, *表示从小时数据得出最大温度。

MIN

111-116

Real

白天报告的最低气温在华氏温度,丢失值为9999.9

Flag

117-117

Char

空白表示最小温度不是从“小时数据”中获取的,*表示从小时数据得出最小温度。

PRCP

119-123

Real

白天报告的总降雨量(雨和/或融雪),.00表示没有可测量的降水,缺失=99.99

SNDP

126-130

Real:

雪深度, 缺少= 999.9

FRSHTT

133-138

Int.

指标

(5)删除标题行 使用sed命令删除匹配STN的行

代码语言:javascript复制
[root@nb0 data]# sed -i '/STN/d' ncdc.txt

查看结果

代码语言:javascript复制
[root@nb0 data]# head -12 ncdc.txt
007026 99999  20160622    94.7  7    66.7  7  9999.9  0  9999.9  0    6.2  4    0.0  7  999.9  999.9   100.4*   87.8*  0.00I 999.9  000000
007026 99999  20160623    88.3 24    69.7 24  9999.9  0  9999.9  0    6.2 24    0.0 24  999.9  999.9    98.6*   78.8*  0.00I 999.9  000000
007026 99999  20160624    80.5 24    69.3 24  9999.9  0  9999.9  0    5.8 22    0.0 24  999.9  999.9    93.2*   69.8* 99.99  999.9  010000
007026 99999  20160625    81.4 24    71.8 24  9999.9  0  9999.9  0    5.9 23    0.0 24  999.9  999.9    89.6*   73.4*  0.00I 999.9  000000
007026 99999  20160626    80.5 24    63.4 24  9999.9  0  9999.9  0    6.2 22    0.0 24  999.9  999.9    91.4*   69.8*  0.00I 999.9  000000
007026 99999  20160627    80.6 24    64.2 24  9999.9  0  9999.9  0    6.0 23    0.0 24  999.9  999.9    93.2*   68.0*  0.00I 999.9  000000
007026 99999  20160628    77.4 24    70.8 24  9999.9  0  9999.9  0    5.1 17    0.0 24  999.9  999.9    87.8*   71.6* 99.99  999.9  010000
007026 99999  20160629    74.3 16    71.9 16  9999.9  0  9999.9  0    2.7 13    0.0 16  999.9  999.9    86.0*   69.8*  0.00I 999.9  000000
007026 99999  20170210    46.9 11    19.4 11  9999.9  0  9999.9  0    6.2 11    3.6 11    8.0   13.0    53.6*   35.6*  0.00I 999.9  000000
007026 99999  20170211    54.4 24    35.0 24  9999.9  0  9999.9  0    6.2 24    4.7 24   12.0   21.0    77.0*   42.8*  0.00I 999.9  000000
007026 99999  20170212    70.3 24    52.9 24  9999.9  0  9999.9  0    6.2 24    5.2 24   15.0   21.0    84.2*   62.6*  0.00I 999.9  000000
007026 99999  20170213    61.1 22    35.4 22  9999.9  0  9999.9  0    6.2 22    8.9 22   15.0   24.1    75.2*   50.0*  0.00I 999.9  000000

6.5.2 准备数据

(1)下载数据文件到其中一个节点

代码语言:javascript复制
[root@node1 ~]# ls
anaconda-ks.cfg  cite75_99.txt  hadoop-2.7.3.tar.gz  jdk-8u112-linux-x64.tar.gz  rs.txt  word1.txt  wordcount.jar

(2)上传数据库文件到HDFS

代码语言:javascript复制
[root@node1 ~]# hdfs dfs -mkdir -p temperature/input
[root@node1 ~]#  hdfs dfs -put temperature.txt temperature/input
[root@node1 ~]# hdfs dfs -ls temperature/input
Found 1 items
-rw-r--r--   3 root supergroup   46337829 2017-06-01 09:31 temperature/input/temperature.txt
[root@node1 ~]# 

6.5.3 求解最高温度

代码语言:javascript复制
package cn.hadron.mrDemo;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperature extends Configured implements Tool {
    //Mapper
    public static class MaxTemperatureMapper extends Mapper< LongWritable, Text, Text, IntWritable> {
        //9999代码表示缺失
        private static final int MISSING = 9999;
        @Override 
        public void map(LongWritable key, Text value, Context context) 
                throws IOException, InterruptedException {
            String line = value.toString();
            String year = line.substring(15, 19);
            int airTemperature;
            if(line.charAt(87) == ' ') {
                airTemperature = Integer.parseInt(line.substring(88, 92));
            } else {
                airTemperature = Integer.parseInt(line.substring(87, 92));
            }
            String quality = line.substring(92, 93);
            if(airTemperature != MISSING && quality.matches("[01459]")) {
                context.write(new Text(year), new IntWritable(airTemperature));
            } 
        }
    }
    //Reducer
    public static class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) 
                throws IOException, InterruptedException {
            int maxValue = Integer.MIN_VALUE;
            for (IntWritable value : values) {  
                maxValue = Math.max(maxValue, value.get());  
            }  
            context.write(key, new IntWritable(maxValue));  
        }
    }
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf(); //读取配置文件
        conf.set("fs.defaultFS", "hdfs://192.168.80.131:9000");
        Job job = Job.getInstance(conf, "MaxTemperature");
        job.setJarByClass(MaxTemperature.class);
        Path in = new Path(args[0]);//输入路径
        Path out = new Path(args[1]);//输出路径
        FileSystem hdfs = out.getFileSystem(conf);
        if (hdfs.isDirectory(out)) {//如果输出路径存在就删除
            hdfs.delete(out, true);
        }
        FileInputFormat.setInputPaths(job, in);//文件输入
        FileOutputFormat.setOutputPath(job, out);//文件输出
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        return job.waitForCompletion(true)?0:1;//等待作业完成退出   
    }
    public static void main(String[] args){
        System.setProperty("HADOOP_USER_NAME", "root");
        try {
            //程序参数:输入路径、输出路径
            String[] args0 ={"/user/root/temperature/input","/user/root/temperature/output/"};
            //本地运行:第三个参数可通过数组传入,程序中设置为args0
            //集群运行:第三个参数可通过命令行传入,程序中设置为args
            //这里设置为本地运行,参数为args0
            int res = ToolRunner.run(new Configuration(), new MaxTemperature(), args0);
            System.exit(res);
        } catch (Exception e) {
            e.printStackTrace();
        }       
    }
}

在Eclipse中直接运行

输出同统计结果

代码语言:javascript复制
[root@node1 ~]# hdfs dfs -cat /user/root/temperature/output/part-r-00000
1971    400
1972    411
1973    430

6.5.4 最低温度

代码语言:javascript复制
package cn.hadron.mrDemo;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Reducer;
public class MinTemperature extends Configured implements Tool {
    //Mapper
    public static class MinTemperatureMapper extends Mapper< LongWritable, Text, Text, IntWritable> {
        private static final int MISSING = 9999;
        @Override 
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String year = line.substring(15, 19);
            int airTemperature;
            if(line.charAt(87) == ' ') {
                airTemperature = Integer.parseInt(line.substring(88, 92));
            } else {
                airTemperature = Integer.parseInt(line.substring(87, 92));
            }
            String quality = line.substring(92, 93);
            if(airTemperature != MISSING && quality.matches("[01459]")) {
                context.write(new Text(year), new IntWritable(airTemperature));
            }
        }
    }
    //Reducer
    public static class MinTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

            int minValue = Integer.MAX_VALUE;
            for(IntWritable value : values) {
                minValue = Math.min(minValue, value.get());
            }
            context.write(key, new IntWritable(minValue));
        }
    }
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf(); //读取配置文件
        conf.set("fs.defaultFS", "hdfs://192.168.80.131:9000");
        Job job = Job.getInstance(conf, "Min temperature");
        job.setJarByClass(MinTemperature.class);
        Path in = new Path(args[0]);//输入路径
        Path out = new Path(args[1]);//输出路径
        FileSystem hdfs = out.getFileSystem(conf);
        if (hdfs.isDirectory(out)) {//如果输出路径存在就删除
            hdfs.delete(out, true);
        }
        FileInputFormat.setInputPaths(job, in);//文件输入
        FileOutputFormat.setOutputPath(job, out);//文件输出

        job.setMapperClass(MinTemperatureMapper.class);
        job.setReducerClass(MinTemperatureReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        return job.waitForCompletion(true)?0:1;//等待作业完成退出   
    }
    public static void main(String[] args){
        System.setProperty("HADOOP_USER_NAME", "root");
        try {
            //程序参数:输入路径、输出路径
            String[] args0 ={"/user/root/temperature/input","/user/root/temperature/output/"};
            //本地运行:第三个参数可通过数组传入,程序中设置为args0
            //集群运行:第三个参数可通过命令行传入,程序中设置为args
            //这里设置为本地运行,参数为args0
            int res = ToolRunner.run(new Configuration(), new MinTemperature(), args0);
            System.exit(res);
        } catch (Exception e) {
            e.printStackTrace();
        }       
    }
}
代码语言:javascript复制
[root@node1 ~]# hdfs dfs -cat /user/root/temperature/output/part-r-00000
1971    -461
1972    -267
1973    -390

6.5.5 平均温度

代码语言:javascript复制
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


import org.apache.hadoop.mapreduce.Reducer;
public class AvgTemperature extends Configured implements Tool{
    //Mapper
    public static class AvgTemperatureMapper extends Mapper<LongWritable, Text, Text, Text> {
        private static final int MISSING = 9999;
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
            String line = value.toString();
            String year = line.substring(15, 19);
            int airTemperature;
            if(line.charAt(87) == ' ') {
                airTemperature = Integer.parseInt(line.substring(88, 92));
            } else {
                airTemperature =  Integer.parseInt(line.substring(87, 92));
            }
            String quality = line.substring(92, 93);
            if(airTemperature != MISSING && !quality.matches("[01459]")) {
                context.write(new Text(year), new Text(String.valueOf(airTemperature)));
                //System.out.println("Mapper输出<"  year   ","   airTemperature   ">");
            }
        }
    }
    //对于平均值而言,各局部平均值的平均值将不再是整体的平均值了,所以不能直接用combiner。
    //可以通过变通的办法使用combiner来计算平均值,即在combiner的键值对中不直接存储最后的平均值,
    //而是存储所有值的和个数,最后在reducer输出时再用和除以个数得到平均值。
    //Combiner
    public static class AvgTemperatureCombiner extends Reducer<Text, Text, Text, Text>{
        @Override
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            System.out.println("....AvgTemperatureCombiner.... ");
            double sumValue = 0;
            long numValue = 0;
            for(Text value : values) {
                sumValue  = Double.parseDouble(value.toString());
                numValue   ;
            }
            context.write(key, new Text(String.valueOf(sumValue)   ','   String.valueOf(numValue)));
            System.out.println("Combiner输出键值对<"   key   ","  sumValue  "," numValue ">");
        }
    }
    //java.lang.NoSuchMethodException: temperature.AvgTemperature$AvgTemperatureReducer.<init>()
    //Mapper和Reducer作为内部类必须是静态static
    //Reducer
    public static class AvgTemperatureReducer extends Reducer<Text, Text, Text, IntWritable>{
        @Override
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            System.out.println("....AvgTemperatureReducer.... ");
            double sumValue = 0;
            long numValue = 0;
            int avgValue = 0;
            for(Text value : values) {
                String[] valueAll = value.toString().split(",");
                sumValue  = Double.parseDouble(valueAll[0]);
                numValue  = Integer.parseInt(valueAll[1]);
            }
            avgValue  = (int)(sumValue/numValue);
            context.write(key, new IntWritable(avgValue));
        }
    }
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf(); //读取配置文件
        conf.set("fs.defaultFS", "hdfs://192.168.11.81:9000");
        Job job = new Job(conf,"Avg Temperature");
        job.setJarByClass(AvgTemperature.class);
        Path in = new Path(args[0]);//输入路径
        Path out = new Path(args[1]);//输出路径
        FileSystem hdfs = out.getFileSystem(conf);
        if (hdfs.isDirectory(out)) {//如果输出路径存在就删除
            hdfs.delete(out, true);
        }
        FileInputFormat.setInputPaths(job, in);//文件输入
        FileOutputFormat.setOutputPath(job, out);//文件输出
        job.setMapperClass(AvgTemperatureMapper.class);
        job.setCombinerClass(AvgTemperatureCombiner.class);
        job.setReducerClass(AvgTemperatureReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        return job.waitForCompletion(true)?0:1;//等待作业完成退出   
    }
    public static void main(String[] args){
        System.setProperty("HADOOP_USER_NAME", "root");
        try {
            String[] args0 ={"/user/root/temperature/input","/user/root/temperature/output/"};
            int res = ToolRunner.run(new Configuration(), new AvgTemperature(), args0);
            System.exit(res);
        } catch (Exception e) {
            e.printStackTrace();
        }       
    }
}
代码语言:javascript复制
[root@node1 ~]# hdfs dfs -cat /user/root/temperature/output/part-r-00000
17/03/04 04:13:21 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
1971    69
1972    147
1973    176

0 人点赞