Hadoop使用学习笔记(3)

2021-04-12 15:11:00 浏览数 (1)

Hadoop使用学习笔记

2. 基本Map-Reduce工作配置与原理(下)

我们先用老版本的API编写,下一篇会用新的API,并解释区别: 环境配置: 提交Job,开发IDE所在机器环境:Windows 7,4C8G,IntelliJ IDEA 15. Hadoop集群环境:第一篇中已经提到,Linux环境的集群。

由于我们是跨环境提交任务,所以源代码和配置上多了很多麻烦事。 首先,确认windows系统能识别hadoop集群的域名,如果不能,先修改C:/Windows/System32/drivers/etc/hosts文件,添加域名解析。 我们之后把之前Linux上的hadoop拉下来到我们的windows系统中。其实只拉配置目录就行,我们只需要其中的配置文件。

我们在IDEA中新建maven工程,比如叫HadoopT。修改pom文件:

代码语言:javascript复制
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0modelVersion>

    <groupId>com.hash.testgroupId>
    <artifactId>hadoopTartifactId>
    <version>1.0-SNAPSHOTversion>

    <properties>
        <hadoop_version>2.7.2hadoop_version>
    properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoopgroupId>
            <artifactId>hadoop-commonartifactId>
            <version>${hadoop_version}version>
        dependency>
        <dependency>
            <groupId>org.apache.hadoopgroupId>
            <artifactId>hadoop-hdfsartifactId>
            <version>${hadoop_version}version>
        dependency>
        <dependency>
            <groupId>org.apache.hadoopgroupId>
            <artifactId>hadoop-clientartifactId>
            <version>${hadoop_version}version>
        dependency>

        <dependency>
            <groupId>junitgroupId>
            <artifactId>junitartifactId>
            <version>4.9version>
            <scope>testscope>
        dependency>
    dependencies>
project>

我们这个工程需要common模块(hdfs还有client模块依赖),hdfs模块(访问HDFS)还有client模块(提交任务)。 之后编写词语统计WordCount类:

代码语言:javascript复制
package com.hash.test.hadoop.mapred;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

/**
 * @author Hash Zhang
 * @version 1.0.0
 * @date 2016/7/20
 */
public class WordCount {
    private static void deleteDir(Configuration conf, String dirPath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path targetPath = new Path(dirPath);
        //如果文件夹存在,则删除
        if (fs.exists(targetPath)) {
            boolean delResult = fs.delete(targetPath, true);
            if (delResult) {
                System.out.println(targetPath   " has been deleted sucessfullly.");
            } else {
                System.out.println(targetPath   " deletion failed.");
            }
        }
    }

    public static void main(String[] args) throws IOException {
        //设置工作类,就是main方法所在类
        JobConf jobConf = new JobConf(WordCount.class);
        //配置需要运行的JAR在本地的位置,就是本类所在的JAR包
        jobConf.set("mapreduce.job.jar", "D:\Users\862911\hadoopT\target\hadoopT-1.0-SNAPSHOT.jar");
        //远程Hadoop集群的用户,防止没有权限
        System.setProperty("HADOOP_USER_NAME", "sfdba");
        //设置Job名称
        jobConf.setJobName("My Word Count");
        //设置每阶段输出格式
        jobConf.setOutputKeyClass(Text.class);
        jobConf.setOutputValueClass(IntWritable.class);
        //设置Map的类和reduce的类
        jobConf.setMapperClass(Map.class);
        jobConf.setReducerClass(Reduce.class);
        //设置输入输出格式
        jobConf.setInputFormat(TextInputFormat.class);
        jobConf.setOutputFormat(TextOutputFormat.class);
        //设置输入输出路径,远程Hdfs需要加链接地址
        FileInputFormat.setInputPaths(jobConf, args[0]);
        FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
        //先删除输出目录
        deleteDir(jobConf, args[1]);
        //执行Job
        JobClient.runJob(jobConf);
    }
}

class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
    private static final IntWritable one = new IntWritable(1);
    private final Text key = new Text();

    public void map(LongWritable longWritable, Text text, OutputCollector outputCollector, Reporter reporter) throws IOException {
        String line = text.toString();
        StringTokenizer stringTokenizer = new StringTokenizer(line);
        while (stringTokenizer.hasMoreTokens()) {
            key.set(stringTokenizer.nextToken());
            outputCollector.collect(key, one);
        }
    }
}

class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text text, Iterator iterator, OutputCollector outputCollector, Reporter reporter) throws IOException {
        int sum = 0;
        while (iterator.hasNext()) {
            sum  = iterator.next().get();
        }
        outputCollector.collect(text, new IntWritable(sum));
    }
}

源代码结合我们之前的图很好理解

这里我们重点强调如下: 因为/test/output/每次会被生成,所以,每次执行这个程序,程序开始会先清空输出目录。保证不出错。 由于是本地将任务提交到远程,我们需要指定这个工程需要生成的jar包,通过设置mapred.jar属性来实现。 这里,我配置了WordCount 执行的配置:

这里我们先修改Program Aruguments,程序中我们取第一个参数为输入文件夹,第二个为输出。这里我们配置的都在HDFS上。 之后在下面的运行配置中添加“Run Maven Goal “HadoopT: Package””。这样,保证我们在代码中配置的jar永远是最新(这个jar地址就是maven package后生成的jar包地址)的。

代码语言:javascript复制
jobConf.set("mapred.jar", "D:\Users\862911\hadoopT\target\hadoopT-1.0-SNAPSHOT.jar");

之后,在nosql1上利用hdfs命令,添加file1和file2至hdfs上的/test/input目录。注意,用户一定要是hadoop。

要指定Hadoop用户,否则没有权限执行Map-red job。这个用户使用System.setproperties来配置,因为每个Hadoop应用可能不一样。 之前我们在linux下用的hadoop用户,所以在这里我们设置:

代码语言:javascript复制
System.setProperty("HADOOP_USER_NAME", "hadoop");

之后,我们在IDEA中运行,输出如下(注意,我们的日志级别是WARN):

代码语言:javascript复制
16/08/03 11:10:43 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
hdfs://nosql1:9000/test/output has been deleted sucessfullly.
16/08/03 11:10:45 INFO client.RMProxy: Connecting to ResourceManager at nosql1/10.202.7.184:8032
16/08/03 11:10:45 INFO client.RMProxy: Connecting to ResourceManager at nosql1/10.202.7.184:8032
16/08/03 11:10:45 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/08/03 11:10:46 INFO mapred.FileInputFormat: Total input paths to process : 2
16/08/03 11:10:46 INFO mapreduce.JobSubmitter: number of splits:3
16/08/03 11:10:46 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1470040052262_0010
16/08/03 11:10:47 INFO impl.YarnClientImpl: Submitted application application_1470040052262_0010
16/08/03 11:10:47 INFO mapreduce.Job: The url to track the job: http://nosql1:8088/proxy/application_1470040052262_0010/
16/08/03 11:10:47 INFO mapreduce.Job: Running job: job_1470040052262_0010
16/08/03 11:10:54 INFO mapreduce.Job: Job job_1470040052262_0010 running in uber mode : false
16/08/03 11:10:54 INFO mapreduce.Job:  map 0% reduce 0%
16/08/03 11:10:59 INFO mapreduce.Job:  map 100% reduce 0%
16/08/03 11:11:05 INFO mapreduce.Job:  map 100% reduce 100%
16/08/03 11:11:05 INFO mapreduce.Job: Job job_1470040052262_0010 completed successfully
16/08/03 11:11:05 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=93
        FILE: Number of bytes written=476815
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=311
        HDFS: Number of bytes written=39
        HDFS: Number of read operations=12
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=3
        Launched reduce tasks=1
        Data-local map tasks=3
        Total time spent by all maps in occupied slots (ms)=9380
        Total time spent by all reduces in occupied slots (ms)=3400
        Total time spent by all map tasks (ms)=9380
        Total time spent by all reduce tasks (ms)=3400
        Total vcore-milliseconds taken by all map tasks=9380
        Total vcore-milliseconds taken by all reduce tasks=3400
        Total megabyte-milliseconds taken by all map tasks=9605120
        Total megabyte-milliseconds taken by all reduce tasks=3481600
    Map-Reduce Framework
        Map input records=5
        Map output records=8
        Map output bytes=71
        Map output materialized bytes=105
        Input split bytes=261
        Combine input records=0
        Combine output records=0
        Reduce input groups=6
        Reduce shuffle bytes=105
        Reduce input records=8
        Reduce output records=6
        Spilled Records=16
        Shuffled Maps =3
        Failed Shuffles=0
        Merged Map outputs=3
        GC time elapsed (ms)=114
        CPU time spent (ms)=2180
        Physical memory (bytes) snapshot=1001734144
        Virtual memory (bytes) snapshot=3987656704
        Total committed heap usage (bytes)=805306368
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=50
    File Output Format Counters 
        Bytes Written=39

之后,我们通过YARN可以看到这个Job的输出和记录。 我们编写一个本地程序,输出输出目录的文件内容:

代码语言:javascript复制
package com.hash.test.hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;

/**
 * @author Hash Zhang
 * @version 1.0.0
 * @date 2016/7/20
 */
public class TestHDFS {
    public static void main(String[] args) throws IOException {
        String uri = "hdfs://nosql1:9000/";
        Configuration config = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), config);



        // 显示在hdfs的/user/fkong下指定文件的内容
        InputStream is = fs.open(new Path("/test/output/part-00000"));
        IOUtils.copyBytes(is, System.out, 1024, true);
    }
}

执行后输出为:

代码语言:javascript复制
aa  1
apple   3
asd 1
bbb 1
ccccc   1
egg 1

用新的API如下:

代码语言:javascript复制
package com.hash.test.hadoop.mapred;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.StringTokenizer;

/**
 * @author Hash Zhang
 * @version 1.0.0
 * @date 2016/8/3
 */
public class WordCount extends Configured implements Tool {
    private static void deleteDir(Configuration conf, String dirPath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path targetPath = new Path(dirPath);
        //如果文件夹存在,则删除
        if (fs.exists(targetPath)) {
            boolean delResult = fs.delete(targetPath, true);
            if (delResult) {
                System.out.println(targetPath   " has been deleted sucessfullly.");
            } else {
                System.out.println(targetPath   " deletion failed.");
            }
        }
    }

    public int run(String[] strings) throws Exception {
        System.setProperty("HADOOP_USER_NAME", "sfdba");

        Job job = Job.getInstance();
        job.setJar("D:\Users\862911\hadoopT\target\hadoopT-1.0-SNAPSHOT.jar");
        //设置Job名称
        job.setJobName("My Word Count");
        //设置每阶段输出格式
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //设置Map的类和reduce的类
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        //设置输入输出格式
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        //设置输入输出路径,远程Hdfs需要加链接地址
        FileInputFormat.setInputPaths(job, strings[0]);
        FileOutputFormat.setOutputPath(job, new Path(strings[1]));
        //先删除输出目录
        deleteDir(job.getConfiguration(), strings[1]);

        final boolean success = job.waitForCompletion(true);
        return success ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int ret = ToolRunner.run(new WordCount(),args);
        System.exit(ret);
    }
}

class Map extends Mapper {
    private static final IntWritable one = new IntWritable(1);
    private final Text word = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer stringTokenizer = new StringTokenizer(line);
        while (stringTokenizer.hasMoreTokens()) {
            word.set(stringTokenizer.nextToken());
            context.write(word, one);
        }
    }
}

class Reduce extends Reducer {
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum  = value.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

之后的演示,我们都会使用新版的API。

0 人点赞