第7章 MapReduce进阶
原文地址:http://blog.csdn.net/chengyuqiang/article/details/73441493
7.4 自定义Key类型
Hadoop提供了多种基本的Writable类型,但是在实际开发中这些基本的Writable类型可能不能满足需求,这时候需要根据具体情况自定义Writable类型。
7.4.1 问题描述
针对NCDC提供的气象数据,求2016年和2017年每个月份最高温度。
7.4.2 上传数据
hdfs dfs -mkdir -p input hdfs dfs -put /root/data/ncdc.txt input
代码语言:javascript复制[root@node1 ~]# hdfs dfs -mkdir -p input
[root@node1 ~]# hdfs dfs -put /root/data/ncdc.txt input
[root@node1 ~]# hdfs dfs -ls input
Found 1 items
-rw-r--r-- 3 root hbase 871353053 2017-06-21 20:32 input/ncdc.txt
7.4.2 自定义Key
如何区分每一条数据,也就是如何寻求key的类型?
代码语言:javascript复制package cn.hadron.mr.ncdc;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class Weather implements WritableComparable<Weather> {
private int year;
private int month;
private double hot;
public Weather() {
}
public Weather(int year, int month, double hot) {
this.year = year;
this.month = month;
this.hot = hot;
}
@Override
public String toString() {
return "[year=" year ", month=" month "]";
}
/**
* 从输入流in中读取字节流反序列化为对象
*/
@Override
public void readFields(DataInput in) throws IOException {
this.year = in.readInt();
this.month = in.readInt();
this.hot = in.readDouble();
}
/**
* 将对象转换为字节流并写入到输出流out中
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeInt(month);
out.writeDouble(hot);
}
// 判断对象是否是同一个对象,当该对象作为输出的key
@Override
public int compareTo(Weather t) {
int r1 = Integer.compare(this.year, t.getYear());
if (r1 == 0) {
//如果年份相同,则判断月份
int r2 = Integer.compare(this.month, t.getMonth());
if (r2 == 0) {
return Double.compare(this.hot, t.getHot());
} else {
return r2;
}
} else {
return r1;
}
}
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public double getHot() {
return hot;
}
public void setHot(double hot) {
this.hot = hot;
}
}
自定义Partitioner
代码语言:javascript复制package cn.hadron.mr.ncdc;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class MyPartitioner extends HashPartitioner<Weather, DoubleWritable> {
// 执行时间越短越好
public int getPartition(Weather key, DoubleWritable value, int numReduceTasks) {
// 根据年份分区
return key.getYear() % numReduceTasks;
}
}
自定义比较器
代码语言:javascript复制package cn.hadron.mr.ncdc;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class MyComparator extends WritableComparator {
protected MyComparator() {
super(Weather.class, true);
}
@Override
public int compare(WritableComparable k1, WritableComparable k2) {
Weather key1=(Weather)k1;
Weather key2=(Weather)k2;
int r1 = Integer.compare(key1.getYear(), key2.getYear());
if (r1 == 0) {
//如果年份相同,则判断月份
return Integer.compare(key1.getMonth(), key2.getMonth());
} else {
return r1;
}
}
}
Mapper和Reducer
代码语言:javascript复制package cn.hadron.mr.ncdc;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class RunJob {
public static void main(String[] args) {
// 设置环境变量HADOOP_USER_NAME,其值是root
System.setProperty("HADOOP_USER_NAME", "root");
// Configuration类包含了Hadoop的配置
Configuration config = new Configuration();
// 设置fs.defaultFS
config.set("fs.defaultFS", "hdfs://192.168.1.117:8020");
// 设置yarn.resourcemanager节点
config.set("yarn.resourcemanager.hostname", "node1");
try {
FileSystem fs = FileSystem.get(config);
Job job = Job.getInstance(config);
job.setJarByClass(RunJob.class);
job.setJobName("weather");
job.setMapperClass(WeatherMapper.class);
job.setReducerClass(WeatherReducer.class);
job.setMapOutputKeyClass(Weather.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setPartitionerClass(MyPartitioner.class);
job.setSortComparatorClass(MyComparator.class);
//只有两年的数据,所以ReduceTask设置2
job.setNumReduceTasks(2);
FileInputFormat.addInputPath(job, new Path("/user/root/input/ncdc.txt"));
Path outpath = new Path("/user/root/output");
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);
System.out.println(job.waitForCompletion(true));
} catch (Exception e) {
e.printStackTrace();
}
}
public static class WeatherMapper extends Mapper<LongWritable, Text, Weather, DoubleWritable> {
private static final String MISSING = "9999.9";
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String val=value.toString();
int year=0;
int month=0;
double hot=0.0;
Weather w=null;
try{
year = Integer.parseInt(val.substring(14,18));
month =Integer.parseInt(val.substring(18,20));
String hotStr=val.substring(102,108);
if(!MISSING.equals(hotStr)){
hot =Double.parseDouble(hotStr);
w=new Weather(year,month,hot);
context.write(w, new DoubleWritable(hot));
}
}catch(Exception e){
System.out.println(e);
}
}
}
public static class WeatherReducer extends Reducer<Weather, DoubleWritable, Text, DoubleWritable> {
protected void reduce(Weather key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException {
double maxValue = 0.0;
for(DoubleWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(new Text(key.toString()), new DoubleWritable(maxValue));
}
}
}
运行
Eclipse运行结果
代码语言:javascript复制log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
true
HDFS查看结果 hdfs dfs -ls /user/root/output
代码语言:javascript复制[root@hds117 data]# hdfs dfs -ls /user/root/output
Found 3 items
-rw-r--r-- 3 root hbase 0 2017-06-27 16:29 /user/root/output/_SUCCESS
-rw-r--r-- 3 root hbase 327 2017-06-27 16:29 /user/root/output/part-r-00000
-rw-r--r-- 3 root hbase 162 2017-06-27 16:29 /user/root/output/part-r-00001
You have new mail in /var/spool/mail/root
hdfs dfs -cat /user/root/output/part-r-00000 hdfs dfs -cat /user/root/output/part-r-00001
代码语言:javascript复制[root@hds117 data]# hdfs dfs -cat /user/root/output/part-r-00000
[year=2016, month=1] 119.5
[year=2016, month=2] 118.6
[year=2016, month=3] 122.0
[year=2016, month=4] 120.2
[year=2016, month=5] 126.5
[year=2016, month=6] 129.0
[year=2016, month=7] 127.2
[year=2016, month=8] 127.4
[year=2016, month=9] 124.2
[year=2016, month=10] 121.1
[year=2016, month=11] 114.1
[year=2016, month=12] 126.9
[root@hds117 data]# hdfs dfs -cat /user/root/output/part-r-00001
[year=2017, month=1] 116.1
[year=2017, month=2] 117.3
[year=2017, month=3] 123.8
[year=2017, month=4] 129.6
[year=2017, month=5] 129.2
[year=2017, month=6] 123.6