第7章 MapReduce进阶
7.4 MapReduce 连接
连接操作,也就是常说的join操作,是数据分析时经常用到的操作。 比如有两份数据data1和data2,进行关键词连接是一个很通用的问题,如果数据量比较小,可以在内存中完成连接。如果数据量比较大,在内存进行连接操会发生内存溢出。MapReduce join就是用来解决大数据的连接问题。
7.4.1 准备数据
这里准备了Oracle数据库中的经典数据。 dept.txt文件存放部门数据。
代码语言:javascript复制[root@node1 data]# cat dept.txt
10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON
[root@node1 data]#
emp.txt文件存放雇员数据。
代码语言:javascript复制[root@node1 data]# cat emp.txt
7369,SMITH,CLERK,7902,17-12-80,800,,20
7499,ALLEN,SALESMAN,7698,20-2-81,1600,300,30
7521,WARD,SALESMAN,7698,22-2-81,1250,500,30
7566,JONES,MANAGER,7839,02-4-81,2975,,20
7654,MARTIN,SALESMAN,7698,28-9-81,1250,1400,30
7698,BLAKE,MANAGER,7839,01-5-81,2850,,30
7782,CLARK,MANAGER,7839,09-6-81,2450,,10
7839,KING,PRESIDENT,,17-11-81,5000,,10
7844,TURNER,SALESMAN,7698,08-9-81,1500,0,30
7900,JAMES,CLERK,7698,03-12-81,950,,30
7902,FORD,ANALYST,7566,03-12-81,3000,,20
7934,MILLER,CLERK,7782,23-1-82,1300,,10
上传到HDFS hdfs dfs -mkdir -p input hdfs dfs -put emp.txt input hdfs dfs -put dept.txt input
代码语言:javascript复制[root@node1 data]# hdfs dfs -mkdir -p input
[root@node1 data]# hdfs dfs -put emp.txt input
[root@node1 data]# hdfs dfs -put dept.txt input
[root@node1 data]# hdfs dfs -ls input
Found 2 items
-rw-r--r-- 3 root hbase 82 2017-06-23 11:04 input/dept.txt
-rw-r--r-- 3 root hbase 513 2017-06-23 11:04 input/emp.txt
[root@node1 data]#
7.4.2 问题描述
求解每个雇员所在部门,输出格式:雇员名,部门名 比如
代码语言:javascript复制RESEARCH,SMITH
SALES,ALLEN
7.4.3 编程
这个问题与SQL中的连接操作类似,将问题转换未1:N问题。 一个部门有多个雇员,一个雇员在唯一的部门。转换为1:N问题,部门是1端,雇员是多段 具体思路是,在map阶段读入emp.txt和dept.txt文件,将join的字段作为map输出key,再将每条记录标记上文件名作为map输出value;在reduce阶段做笛卡尔积。
(1)定义Mapper类
代码语言:javascript复制package cn.hadron.mr.join;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class JoinMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//当前读取文件的路径
String filePath=((FileSplit)context.getInputSplit()).getPath().toString();
String joinKey="";
String joinValue="";
String fileFlag="";
String[] array=value.toString().split(",");
//判定当前行数据来自哪个文件
if(filePath.contains("dept.txt")){
fileFlag="l";//left
joinKey=array[0];//部门编号
joinValue=array[1];//部门名
}else if(filePath.contains("emp.txt")){
fileFlag="r";//right
joinKey=array[array.length-1];//部门编号
joinValue=array[1];//雇员名
}
//输出键值对,并标记来源文件
context.write(new Text(joinKey),new Text(joinValue "," fileFlag));
}
}
(2)定义Reducer类
代码语言:javascript复制package cn.hadron.mr.join;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class JoinReducer extends Reducer<Text, Text, Text, Text>{
//相同部门的数据,发送到同一个reduce
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
Iterator<Text> it=values.iterator();
String deptName="";
List<String> empNames=new ArrayList<>();
while(it.hasNext()){
//取一行记录
String[] array=it.next().toString().split(",");
//判定当前记录来源于哪个文件,并根据文件格式解析记录获取相应的信息
if("l".equals(array[1])){//只有1条记录的flag=l
deptName=array[0];
}else if("r".equals(array[1])){
empNames.add(array[0]);
}
}
//求解笛卡尔积,对每个dept的1条记录与emp中多条记录作一次迭代
for(String en:empNames){
context.write(new Text(deptName), new Text(en));
}
}
}
(3)主方法
代码语言:javascript复制package cn.hadron.mr.join;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class RunJob {
public static void main(String[] args) {
// 设置环境变量HADOOP_USER_NAME,其值是root
System.setProperty("HADOOP_USER_NAME", "root");
// Configuration类包含了Hadoop的配置
Configuration config = new Configuration();
// 设置fs.defaultFS
config.set("fs.defaultFS", "hdfs://192.168.80.131:8020");
// 设置yarn.resourcemanager节点
config.set("yarn.resourcemanager.hostname", "node1");
try {
FileSystem fs = FileSystem.get(config);
Job job = Job.getInstance(config);
job.setJarByClass(RunJob.class);
job.setJobName("JoinDemo");
// 设置Mapper和Reducer类
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
// 设置reduce方法输出key和value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 指定输入输出路径
FileInputFormat.addInputPath(job, new Path("/user/root/input/"));
Path outpath = new Path("/user/root/output/");
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);
// 提交任务,等待执行完成
boolean f = job.waitForCompletion(true);
if (f) {
System.out.println("job任务执行成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
7.4.4 运行
run as –> Java Application Eclipse控制台输出信息:
代码语言:javascript复制log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
job任务执行成功
查看结果 hdfs dfs -ls output
代码语言:javascript复制[root@node1 ~]# hdfs dfs -ls output
Found 2 items
-rw-r--r-- 3 root hbase 0 2017-06-23 14:05 output/_SUCCESS
-rw-r--r-- 3 root hbase 168 2017-06-23 14:05 output/part-r-00000
[root@hds117 ~]# hdfs dfs -cat output/part-r-00000
ACCOUNTING MILLER
ACCOUNTING KING
ACCOUNTING CLARK
RESEARCH FORD
RESEARCH JONES
RESEARCH SMITH
SALES JAMES
SALES TURNER
SALES BLAKE
SALES MARTIN
SALES WARD
SALES ALLEN