第6章 MapReduce入门
6.5 温度统计
6.5.1 问题描述
《HADOOP权威指南 第3版 》教程中有个经典例子,既是温度统计。作者Tom White在书中写了程序和讲解了原理,认为读者们都会MapReduce程序的基本环境搭建部署,所以这里轻描淡写给带过了,对于初学者来说,这是一个“天坑”,程序跑步起来,也就消磨了Hadoop初学者的兴趣和意志。
本节内容的Java项目目录结构请参见6.4节。
这里根据《HADOOP权威指南 第3版 》书中描写,再简单描述一下这个问题。 这是来自美国气象数据中心的数据,示例数据如下。
省略号表示一些未使用的列(暂不考虑)。 将这些若干多行数据以键值对方式作为map方法的输入,行号(行偏移量)作为key,value是一行数据本身。
为了方便温度统计,我们需要从每一行数据中提取出年份和温度(上图的黑体字即是)。其中4位年份是一行的第15到到第18个字符;温度被放大了100倍,每行从第87个字符,到 第91个字符,第87个字符表示温度正负符号。
2017-6-19更新 下载数据 ftp://ftp.ncdc.noaa.gov/pub/data/gsod/
为了测试方便,我们只下载了2016年和2017年的数据
(1)解压年份压缩包
代码语言:javascript复制[root@nb0 data]# tar -xvf gsod_2016.tar
[root@nb0 data]# tar -xvf gsod_2017.tar
[root@nb0 data]# ls
........
151130-99999-2016.op.gz 416200-99999-2016.op.gz 689230-99999-2016.op.gz 723284-93839-2017.op.gz 871210-99999-2016.op.gz
151130-99999-2017.op.gz 416200-99999-2017.op.gz 689230-99999-2017.op.gz 723290-03935-2016.op.gz 871210-99999-2017.op.gz
151170-99999-2016.op.gz 416240-99999-2016.op.gz 689250-99999-2016.op.gz 723290-03935-2017.op.gz 871271-99999-2017.op.gz
151170-99999-2017.op.gz 416240-99999-2017.op.gz 689250-99999-2017.op.gz 723300-03975-2016.op.gz 871290-99999-2016.op.gz
151180-99999-2016.op.gz 416360-99999-2016.op.gz 689260-99999-2016.op.gz 723300-03975-2017.op.gz 871290-99999-2017.op.gz
(2)合并文件 使用zcat命令把这些数据文件解压并合并到一个ncdc.txt文件中
代码语言:javascript复制[root@nb0 data]# zcat *.gz > ncdc.txt
[root@nb0 data]# ll |grep ncdc
-rw-r--r-- 1 root root 874976505 Jun 19 21:10 ncdc.txt
(3)查看ncdc.txt文件
代码语言:javascript复制[root@nb0 data]# head -12 ncdc.txt
STN--- WBAN YEARMODA TEMP DEWP SLP STP VISIB WDSP MXSPD GUST MAX MIN PRCP SNDP FRSHTT
007026 99999 20160622 94.7 7 66.7 7 9999.9 0 9999.9 0 6.2 4 0.0 7 999.9 999.9 100.4* 87.8* 0.00I 999.9 000000
007026 99999 20160623 88.3 24 69.7 24 9999.9 0 9999.9 0 6.2 24 0.0 24 999.9 999.9 98.6* 78.8* 0.00I 999.9 000000
007026 99999 20160624 80.5 24 69.3 24 9999.9 0 9999.9 0 5.8 22 0.0 24 999.9 999.9 93.2* 69.8* 99.99 999.9 010000
007026 99999 20160625 81.4 24 71.8 24 9999.9 0 9999.9 0 5.9 23 0.0 24 999.9 999.9 89.6* 73.4* 0.00I 999.9 000000
007026 99999 20160626 80.5 24 63.4 24 9999.9 0 9999.9 0 6.2 22 0.0 24 999.9 999.9 91.4* 69.8* 0.00I 999.9 000000
007026 99999 20160627 80.6 24 64.2 24 9999.9 0 9999.9 0 6.0 23 0.0 24 999.9 999.9 93.2* 68.0* 0.00I 999.9 000000
007026 99999 20160628 77.4 24 70.8 24 9999.9 0 9999.9 0 5.1 17 0.0 24 999.9 999.9 87.8* 71.6* 99.99 999.9 010000
007026 99999 20160629 74.3 16 71.9 16 9999.9 0 9999.9 0 2.7 13 0.0 16 999.9 999.9 86.0* 69.8* 0.00I 999.9 000000
STN--- WBAN YEARMODA TEMP DEWP SLP STP VISIB WDSP MXSPD GUST MAX MIN PRCP SNDP FRSHTT
007026 99999 20170210 46.9 11 19.4 11 9999.9 0 9999.9 0 6.2 11 3.6 11 8.0 13.0 53.6* 35.6* 0.00I 999.9 000000
007026 99999 20170211 54.4 24 35.0 24 9999.9 0 9999.9 0 6.2 24 4.7 24 12.0 21.0 77.0* 42.8* 0.00I 999.9 000000
(4)数据格式说明 ftp://ftp.ncdc.noaa.gov/pub/data/gsod/readme.txt
字段 | 定位 | 类型 | 说明 |
---|---|---|---|
STN— | 1-6 | Int. | 气象站编号 |
WBAN | 8-12 | Int. | WBAN号码适用 - 这是历史上“气象局空军海军”编号 - 以WBAN为首字母缩写。 |
YEAR | 15-18 | Int. | 年份. |
MODA | 19-22 | Int. | 月日. |
TEMP | 25-30 | Real | 当天平均气温在华氏度,丢失值为9999.9 |
Count | 32-33 | Int. | 用于计算平均温度的观察数。 |
DEWP | 36-41 | Real | 平均露点为华氏度,丢失值为9999.9 |
Count | 43-44 | Int. | 用于计算平均露点的观测数。 |
SLP | 47-52 | Real | 平均海平面压力为白天毫巴,丢失值为9999.9 |
Count | 54-55 | Int. | 用于计算平均海平面压力的观测数量。 |
STP | 58-63 | Real | 本站气压,丢失值为9999.9 |
Count | 65-66 | Int. | 用于计算平均站压力的观测数量 |
VISIB | 69-73 | Real | 当天的平均可见度。丢失值为999.9 |
Count | 75-76 | Int. | 用于计算平均可见度的观察数量。 |
WDSP | 79-83 | Real | 当天平均风速. 丢失值为999.9 |
Count | 85-86 | Int. | 用于计算平均风速的观测数。 |
MXSPD | 89-93 | Real | 当天报告的最大持续风速,丢失值为999.9 |
GUST | 96-100 | Real | 最高阵风报告为一天. 丢失值为999.9 |
MAX | 103-108 | Real | 华氏天气报告的最高气温(因国家和地区而异),丢失值为9999.9 |
Flag | 109-109 | Char | 空白表示最高温度不是从“小时”数据中获取的, *表示从小时数据得出最大温度。 |
MIN | 111-116 | Real | 白天报告的最低气温在华氏温度,丢失值为9999.9 |
Flag | 117-117 | Char | 空白表示最小温度不是从“小时数据”中获取的,*表示从小时数据得出最小温度。 |
PRCP | 119-123 | Real | 白天报告的总降雨量(雨和/或融雪),.00表示没有可测量的降水,缺失=99.99 |
SNDP | 126-130 | Real: | 雪深度, 缺少= 999.9 |
FRSHTT | 133-138 | Int. | 指标 |
(5)删除标题行 使用sed命令删除匹配STN的行
代码语言:javascript复制[root@nb0 data]# sed -i '/STN/d' ncdc.txt
查看结果
代码语言:javascript复制[root@nb0 data]# head -12 ncdc.txt
007026 99999 20160622 94.7 7 66.7 7 9999.9 0 9999.9 0 6.2 4 0.0 7 999.9 999.9 100.4* 87.8* 0.00I 999.9 000000
007026 99999 20160623 88.3 24 69.7 24 9999.9 0 9999.9 0 6.2 24 0.0 24 999.9 999.9 98.6* 78.8* 0.00I 999.9 000000
007026 99999 20160624 80.5 24 69.3 24 9999.9 0 9999.9 0 5.8 22 0.0 24 999.9 999.9 93.2* 69.8* 99.99 999.9 010000
007026 99999 20160625 81.4 24 71.8 24 9999.9 0 9999.9 0 5.9 23 0.0 24 999.9 999.9 89.6* 73.4* 0.00I 999.9 000000
007026 99999 20160626 80.5 24 63.4 24 9999.9 0 9999.9 0 6.2 22 0.0 24 999.9 999.9 91.4* 69.8* 0.00I 999.9 000000
007026 99999 20160627 80.6 24 64.2 24 9999.9 0 9999.9 0 6.0 23 0.0 24 999.9 999.9 93.2* 68.0* 0.00I 999.9 000000
007026 99999 20160628 77.4 24 70.8 24 9999.9 0 9999.9 0 5.1 17 0.0 24 999.9 999.9 87.8* 71.6* 99.99 999.9 010000
007026 99999 20160629 74.3 16 71.9 16 9999.9 0 9999.9 0 2.7 13 0.0 16 999.9 999.9 86.0* 69.8* 0.00I 999.9 000000
007026 99999 20170210 46.9 11 19.4 11 9999.9 0 9999.9 0 6.2 11 3.6 11 8.0 13.0 53.6* 35.6* 0.00I 999.9 000000
007026 99999 20170211 54.4 24 35.0 24 9999.9 0 9999.9 0 6.2 24 4.7 24 12.0 21.0 77.0* 42.8* 0.00I 999.9 000000
007026 99999 20170212 70.3 24 52.9 24 9999.9 0 9999.9 0 6.2 24 5.2 24 15.0 21.0 84.2* 62.6* 0.00I 999.9 000000
007026 99999 20170213 61.1 22 35.4 22 9999.9 0 9999.9 0 6.2 22 8.9 22 15.0 24.1 75.2* 50.0* 0.00I 999.9 000000
6.5.2 准备数据
(1)下载数据文件到其中一个节点
代码语言:javascript复制[root@node1 ~]# ls
anaconda-ks.cfg cite75_99.txt hadoop-2.7.3.tar.gz jdk-8u112-linux-x64.tar.gz rs.txt word1.txt wordcount.jar
(2)上传数据库文件到HDFS
代码语言:javascript复制[root@node1 ~]# hdfs dfs -mkdir -p temperature/input
[root@node1 ~]# hdfs dfs -put temperature.txt temperature/input
[root@node1 ~]# hdfs dfs -ls temperature/input
Found 1 items
-rw-r--r-- 3 root supergroup 46337829 2017-06-01 09:31 temperature/input/temperature.txt
[root@node1 ~]#
6.5.3 求解最高温度
代码语言:javascript复制package cn.hadron.mrDemo;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperature extends Configured implements Tool {
//Mapper
public static class MaxTemperatureMapper extends Mapper< LongWritable, Text, Text, IntWritable> {
//9999代码表示缺失
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if(line.charAt(87) == ' ') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if(airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
//Reducer
public static class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf(); //读取配置文件
conf.set("fs.defaultFS", "hdfs://192.168.80.131:9000");
Job job = Job.getInstance(conf, "MaxTemperature");
job.setJarByClass(MaxTemperature.class);
Path in = new Path(args[0]);//输入路径
Path out = new Path(args[1]);//输出路径
FileSystem hdfs = out.getFileSystem(conf);
if (hdfs.isDirectory(out)) {//如果输出路径存在就删除
hdfs.delete(out, true);
}
FileInputFormat.setInputPaths(job, in);//文件输入
FileOutputFormat.setOutputPath(job, out);//文件输出
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true)?0:1;//等待作业完成退出
}
public static void main(String[] args){
System.setProperty("HADOOP_USER_NAME", "root");
try {
//程序参数:输入路径、输出路径
String[] args0 ={"/user/root/temperature/input","/user/root/temperature/output/"};
//本地运行:第三个参数可通过数组传入,程序中设置为args0
//集群运行:第三个参数可通过命令行传入,程序中设置为args
//这里设置为本地运行,参数为args0
int res = ToolRunner.run(new Configuration(), new MaxTemperature(), args0);
System.exit(res);
} catch (Exception e) {
e.printStackTrace();
}
}
}
在Eclipse中直接运行
输出同统计结果
代码语言:javascript复制[root@node1 ~]# hdfs dfs -cat /user/root/temperature/output/part-r-00000
1971 400
1972 411
1973 430
6.5.4 最低温度
代码语言:javascript复制package cn.hadron.mrDemo;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Reducer;
public class MinTemperature extends Configured implements Tool {
//Mapper
public static class MinTemperatureMapper extends Mapper< LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if(line.charAt(87) == ' ') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if(airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
//Reducer
public static class MinTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int minValue = Integer.MAX_VALUE;
for(IntWritable value : values) {
minValue = Math.min(minValue, value.get());
}
context.write(key, new IntWritable(minValue));
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf(); //读取配置文件
conf.set("fs.defaultFS", "hdfs://192.168.80.131:9000");
Job job = Job.getInstance(conf, "Min temperature");
job.setJarByClass(MinTemperature.class);
Path in = new Path(args[0]);//输入路径
Path out = new Path(args[1]);//输出路径
FileSystem hdfs = out.getFileSystem(conf);
if (hdfs.isDirectory(out)) {//如果输出路径存在就删除
hdfs.delete(out, true);
}
FileInputFormat.setInputPaths(job, in);//文件输入
FileOutputFormat.setOutputPath(job, out);//文件输出
job.setMapperClass(MinTemperatureMapper.class);
job.setReducerClass(MinTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true)?0:1;//等待作业完成退出
}
public static void main(String[] args){
System.setProperty("HADOOP_USER_NAME", "root");
try {
//程序参数:输入路径、输出路径
String[] args0 ={"/user/root/temperature/input","/user/root/temperature/output/"};
//本地运行:第三个参数可通过数组传入,程序中设置为args0
//集群运行:第三个参数可通过命令行传入,程序中设置为args
//这里设置为本地运行,参数为args0
int res = ToolRunner.run(new Configuration(), new MinTemperature(), args0);
System.exit(res);
} catch (Exception e) {
e.printStackTrace();
}
}
}
代码语言:javascript复制[root@node1 ~]# hdfs dfs -cat /user/root/temperature/output/part-r-00000
1971 -461
1972 -267
1973 -390
6.5.5 平均温度
代码语言:javascript复制import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Reducer;
public class AvgTemperature extends Configured implements Tool{
//Mapper
public static class AvgTemperatureMapper extends Mapper<LongWritable, Text, Text, Text> {
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if(line.charAt(87) == ' ') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if(airTemperature != MISSING && !quality.matches("[01459]")) {
context.write(new Text(year), new Text(String.valueOf(airTemperature)));
//System.out.println("Mapper输出<" year "," airTemperature ">");
}
}
}
//对于平均值而言,各局部平均值的平均值将不再是整体的平均值了,所以不能直接用combiner。
//可以通过变通的办法使用combiner来计算平均值,即在combiner的键值对中不直接存储最后的平均值,
//而是存储所有值的和个数,最后在reducer输出时再用和除以个数得到平均值。
//Combiner
public static class AvgTemperatureCombiner extends Reducer<Text, Text, Text, Text>{
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
System.out.println("....AvgTemperatureCombiner.... ");
double sumValue = 0;
long numValue = 0;
for(Text value : values) {
sumValue = Double.parseDouble(value.toString());
numValue ;
}
context.write(key, new Text(String.valueOf(sumValue) ',' String.valueOf(numValue)));
System.out.println("Combiner输出键值对<" key "," sumValue "," numValue ">");
}
}
//java.lang.NoSuchMethodException: temperature.AvgTemperature$AvgTemperatureReducer.<init>()
//Mapper和Reducer作为内部类必须是静态static
//Reducer
public static class AvgTemperatureReducer extends Reducer<Text, Text, Text, IntWritable>{
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
System.out.println("....AvgTemperatureReducer.... ");
double sumValue = 0;
long numValue = 0;
int avgValue = 0;
for(Text value : values) {
String[] valueAll = value.toString().split(",");
sumValue = Double.parseDouble(valueAll[0]);
numValue = Integer.parseInt(valueAll[1]);
}
avgValue = (int)(sumValue/numValue);
context.write(key, new IntWritable(avgValue));
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf(); //读取配置文件
conf.set("fs.defaultFS", "hdfs://192.168.11.81:9000");
Job job = new Job(conf,"Avg Temperature");
job.setJarByClass(AvgTemperature.class);
Path in = new Path(args[0]);//输入路径
Path out = new Path(args[1]);//输出路径
FileSystem hdfs = out.getFileSystem(conf);
if (hdfs.isDirectory(out)) {//如果输出路径存在就删除
hdfs.delete(out, true);
}
FileInputFormat.setInputPaths(job, in);//文件输入
FileOutputFormat.setOutputPath(job, out);//文件输出
job.setMapperClass(AvgTemperatureMapper.class);
job.setCombinerClass(AvgTemperatureCombiner.class);
job.setReducerClass(AvgTemperatureReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true)?0:1;//等待作业完成退出
}
public static void main(String[] args){
System.setProperty("HADOOP_USER_NAME", "root");
try {
String[] args0 ={"/user/root/temperature/input","/user/root/temperature/output/"};
int res = ToolRunner.run(new Configuration(), new AvgTemperature(), args0);
System.exit(res);
} catch (Exception e) {
e.printStackTrace();
}
}
}
代码语言:javascript复制[root@node1 ~]# hdfs dfs -cat /user/root/temperature/output/part-r-00000
17/03/04 04:13:21 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
1971 69
1972 147
1973 176