Hadoop基础教程-第7章 MapReduce进阶(7.5 MapReduce 连接)

2022-05-06 18:48:38 浏览数 (1)

第7章 MapReduce进阶

7.4 MapReduce 连接

连接操作,也就是常说的join操作,是数据分析时经常用到的操作。 比如有两份数据data1和data2,进行关键词连接是一个很通用的问题,如果数据量比较小,可以在内存中完成连接。如果数据量比较大,在内存进行连接操会发生内存溢出。MapReduce join就是用来解决大数据的连接问题。

7.4.1 准备数据

这里准备了Oracle数据库中的经典数据。 dept.txt文件存放部门数据。

代码语言:javascript复制
[root@node1 data]# cat dept.txt 
10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON
[root@node1 data]# 

emp.txt文件存放雇员数据。

代码语言:javascript复制
[root@node1 data]# cat emp.txt 
7369,SMITH,CLERK,7902,17-12-80,800,,20
7499,ALLEN,SALESMAN,7698,20-2-81,1600,300,30
7521,WARD,SALESMAN,7698,22-2-81,1250,500,30
7566,JONES,MANAGER,7839,02-4-81,2975,,20
7654,MARTIN,SALESMAN,7698,28-9-81,1250,1400,30
7698,BLAKE,MANAGER,7839,01-5-81,2850,,30
7782,CLARK,MANAGER,7839,09-6-81,2450,,10
7839,KING,PRESIDENT,,17-11-81,5000,,10
7844,TURNER,SALESMAN,7698,08-9-81,1500,0,30
7900,JAMES,CLERK,7698,03-12-81,950,,30
7902,FORD,ANALYST,7566,03-12-81,3000,,20
7934,MILLER,CLERK,7782,23-1-82,1300,,10

上传到HDFS hdfs dfs -mkdir -p input hdfs dfs -put emp.txt input hdfs dfs -put dept.txt input

代码语言:javascript复制
[root@node1 data]# hdfs dfs -mkdir -p input
[root@node1 data]# hdfs dfs -put emp.txt input
[root@node1 data]# hdfs dfs -put dept.txt input
[root@node1 data]# hdfs dfs -ls input
Found 2 items
-rw-r--r--   3 root hbase         82 2017-06-23 11:04 input/dept.txt
-rw-r--r--   3 root hbase        513 2017-06-23 11:04 input/emp.txt
[root@node1 data]# 

7.4.2 问题描述

求解每个雇员所在部门,输出格式:雇员名,部门名 比如

代码语言:javascript复制
RESEARCH,SMITH
SALES,ALLEN

7.4.3 编程

这个问题与SQL中的连接操作类似,将问题转换未1:N问题。 一个部门有多个雇员,一个雇员在唯一的部门。转换为1:N问题,部门是1端,雇员是多段 具体思路是,在map阶段读入emp.txt和dept.txt文件,将join的字段作为map输出key,再将每条记录标记上文件名作为map输出value;在reduce阶段做笛卡尔积。

(1)定义Mapper类

代码语言:javascript复制
package cn.hadron.mr.join;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class JoinMapper extends Mapper<LongWritable, Text, Text, Text>{
    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        //当前读取文件的路径
        String filePath=((FileSplit)context.getInputSplit()).getPath().toString();
        String joinKey="";
        String joinValue="";
        String fileFlag="";
        String[] array=value.toString().split(",");
        //判定当前行数据来自哪个文件
        if(filePath.contains("dept.txt")){
            fileFlag="l";//left
            joinKey=array[0];//部门编号
            joinValue=array[1];//部门名
        }else if(filePath.contains("emp.txt")){
            fileFlag="r";//right
            joinKey=array[array.length-1];//部门编号
            joinValue=array[1];//雇员名
        }
        //输出键值对,并标记来源文件
        context.write(new Text(joinKey),new Text(joinValue "," fileFlag));
    }
}

(2)定义Reducer类

代码语言:javascript复制
package cn.hadron.mr.join;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class JoinReducer extends Reducer<Text, Text, Text, Text>{
    //相同部门的数据,发送到同一个reduce
    @Override
    protected void reduce(Text key, Iterable<Text> values,Context context)
            throws IOException, InterruptedException {
        Iterator<Text> it=values.iterator();
        String deptName="";
        List<String> empNames=new ArrayList<>();
        while(it.hasNext()){
            //取一行记录
            String[] array=it.next().toString().split(",");
            //判定当前记录来源于哪个文件,并根据文件格式解析记录获取相应的信息
            if("l".equals(array[1])){//只有1条记录的flag=l
                deptName=array[0];
            }else if("r".equals(array[1])){
                empNames.add(array[0]);
            }
        }
        //求解笛卡尔积,对每个dept的1条记录与emp中多条记录作一次迭代
        for(String en:empNames){
            context.write(new Text(deptName), new Text(en));
        }

    }
}

(3)主方法

代码语言:javascript复制
package cn.hadron.mr.join;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class RunJob {
    public static void main(String[] args) {
        // 设置环境变量HADOOP_USER_NAME,其值是root
        System.setProperty("HADOOP_USER_NAME", "root");
        // Configuration类包含了Hadoop的配置
        Configuration config = new Configuration();
        // 设置fs.defaultFS
        config.set("fs.defaultFS", "hdfs://192.168.80.131:8020");
        // 设置yarn.resourcemanager节点
        config.set("yarn.resourcemanager.hostname", "node1");
        try {
            FileSystem fs = FileSystem.get(config);
            Job job = Job.getInstance(config);
            job.setJarByClass(RunJob.class);
            job.setJobName("JoinDemo");
            // 设置Mapper和Reducer类
            job.setMapperClass(JoinMapper.class);
            job.setReducerClass(JoinReducer.class);
            // 设置reduce方法输出key和value的类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            // 指定输入输出路径 
            FileInputFormat.addInputPath(job, new Path("/user/root/input/"));
            Path outpath = new Path("/user/root/output/");
            if (fs.exists(outpath)) {
                fs.delete(outpath, true);
            }
            FileOutputFormat.setOutputPath(job, outpath);
            // 提交任务,等待执行完成
            boolean f = job.waitForCompletion(true);
            if (f) {
                System.out.println("job任务执行成功");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

7.4.4 运行

run as –> Java Application Eclipse控制台输出信息:

代码语言:javascript复制
log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
job任务执行成功

查看结果 hdfs dfs -ls output

代码语言:javascript复制
[root@node1 ~]# hdfs dfs -ls output
Found 2 items
-rw-r--r--   3 root hbase          0 2017-06-23 14:05 output/_SUCCESS
-rw-r--r--   3 root hbase        168 2017-06-23 14:05 output/part-r-00000
[root@hds117 ~]# hdfs dfs -cat output/part-r-00000
ACCOUNTING  MILLER
ACCOUNTING  KING
ACCOUNTING  CLARK
RESEARCH    FORD
RESEARCH    JONES
RESEARCH    SMITH
SALES   JAMES
SALES   TURNER
SALES   BLAKE
SALES   MARTIN
SALES   WARD
SALES   ALLEN

0 人点赞