MapReduce实现内连接查询

2021-04-27 10:32:34 浏览数 (1)

1、测试数据

file1(模拟地区表)

代码语言:javascript复制
1	北京
2	上海
3	广州
4	深圳
5	南京
6	杭州
7	成都
8	重庆
9	厦门
10	武汉

file2(模拟年度地区收入)

代码语言:javascript复制
1	2017	6435
1	2018	7535
2	2016	6432
2	2017	8532
2	2018	7432
6	2019	8534
6	2017	6434
6	2019	6321
6	2018	4222
7	2019	3424
8	2016	4690
9	2019	4650
10	2019	4443

2、代码实现 

对象序列化类 BigDataWorker

代码语言:javascript复制
package com.xtd.hadoop;
 
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
 
import org.apache.hadoop.io.WritableComparable;
/**
 * TODO 各地区年收入信息
 * @author   com
 * @Date	 2019年10月3日 	 
 */
public class BigDataWorker implements WritableComparable{
 
	private String workerId = "";	//id
	private String area = "";		//地区
	private String date = "";		//时间
	private String amount = "";		//收入
	public String getWorkerId() {
		return workerId;
	}
	public void setWorkerId(String workerId) {
		this.workerId = workerId;
	}
	public String getArea() {
		return area;
	}
	public void setArea(String area) {
		this.area = area;
	}
	public String getDate() {
		return date;
	}
	public void setDate(String date) {
		this.date = date;
	}
	public String getAmount() {
		return amount;
	}
	public void setAmount(String amount) {
		this.amount = amount;
	}
	
	@Override
	public String toString() {
		return workerId   "t"   area   "t"   date   "t"   amount;
	}
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(workerId);
		out.writeUTF(area);
		out.writeUTF(date);
		out.writeUTF(amount);
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		this.workerId = in.readUTF();
		this.area = in.readUTF();
		this.date = in.readUTF();
		this.amount = in.readUTF();
	}
	@Override
	public int compareTo(BigDataWorker o) {
		return this.workerId.compareTo(o.workerId);
	}
}

MapReduce主类 InnerJoinMR

代码语言:javascript复制
package com.xtd.hadoop;
 
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
 
import org.apache.commons.io.output.NullWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
/**
 * TODO 内连接实现
 * @author   com
 * @Date	 2019年10月3日 	 
 */
public class InnerJoinMR extends Configured implements Tool{
	
	public static class MyMapper extends Mapper {
		private IntWritable outkey = new IntWritable();
		private Text outval = new Text();
		@Override
		protected void map(LongWritable key, Text value,
				Mapper.Context context)
				throws IOException, InterruptedException {
			String[] line = value.toString().split("t");
			String groupName = "all data";
			context.getCounter(groupName, "all data").increment(1);
			if(null != line) {
				outkey.set(Integer.parseInt(line[0]));
				if(line.length == 2) {
					outval.set(line[1]);
				}else if(line.length == 3) {
					outval.set(line[1] "t" line[2]);
				}else {
					context.getCounter(groupName, "invalid data").increment(1);
				}
			}else {
				context.getCounter(groupName, "empty data").increment(1);
			}
			context.write(outkey, outval);
		}
	}
	public static class MyReduce extends Reducer {
//		private BigDataWorker outkey = new BigDataWorker();
//		private NullWriter outval = new NullWriter();
		private Map map1 = new HashMap();
//		private String[] str = null;
		@Override
		protected void reduce(IntWritable text, Iterable values,
				Reducer.Context context) throws IOException, InterruptedException {
			BigDataWorker outkey = new BigDataWorker();
			List list1 = new ArrayList();
			List list2 = new ArrayList();
			for (Text value : values) {
				outkey.setWorkerId(text.toString());
				if(!value.toString().contains("t")) {	// 地区表信息
					list1.add(value.toString());
				}else if(value.toString().contains("t")) {	// 年收入表信息
					list2.add(value.toString());
				}
			}//System.out.println();
			String[] str = null;
			for (String lis1 : list1) {
				outkey.setArea(lis1);
				for (String lis2 : list2) {
					if(list2.size()>0) {
						str = lis2.toString().split("t");
						System.out.println(lis1 "t" str[0] "t" str[1]);
						outkey.setDate(str[0]);
						outkey.setAmount(str[1]);
						context.write(outkey, null);
					}
				}
			}
		}
	}
	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = this.getConf();
		FileSystem fs = FileSystem.get(conf);
		Path inpath = new Path(args[0]);
		Path outpath = new Path(args[1]);
		if(fs.exists(outpath)) {
			fs.delete(outpath, true);
			System.out.println("The old file directory has been deleted!");
		}
		Job job = Job.getInstance(conf, "InnerJoinMR");
		job.setJarByClass(InnerJoinMR.class);
		job.setOutputKeyClass(BigDataWorker.class);
		job.setOutputValueClass(NullWriter.class);
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReduce.class);
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(Text.class);
		FileInputFormat.addInputPath(job, inpath);
		FileOutputFormat.setOutputPath(job, outpath);
		int result = job.waitForCompletion(true)?0:1;
		return result;
	}
	public static void main(String[] args) {
		String[] path = new String[2];
		path[0] = "C:\Users\com\Desktop\mr\mr4";
		path[1] = "C:\Users\com\Desktop\mr\mr4\output";
		try {
			int result = ToolRunner.run(new InnerJoinMR(), path);
			String msg = result==0?"job finish!":"job fail!";
			System.out.println(msg);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

3、运行效果

注意:输入路径path[]放在桌面,每台电脑的用户名不同,桌面路径也有所不同,注意自己的路径

如果需要运行时自定义输入输出路径,int result = ToolRunner.run(new InnerJoinMR(), path); 在这句把path改成args

生成的结果文件

part-r-00000文件结果

代码语言:javascript复制
1	北京	2017	6435
1	北京	2018	7535
2	上海	2016	6432
2	上海	2017	8532
2	上海	2018	7432
6	杭州	2017	6434
6	杭州	2018	4222
6	杭州	2019	6321
6	杭州	2019	8534
7	成都	2019	3424
8	重庆	2016	4690
9	厦门	2019	4650
10	武汉	2019	4443

0 人点赞