Hadoop中MapReduce应用(1)

2020-10-27 17:12:43 浏览数 (1)

MapReduce应用1

1.在IDEA工具中新建一个空白的Maven工程,导入依赖--根据自己工程的hadoop版本而定

代码语言:javascript复制
<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.2.1</version>
        <!--scope设置为provided是为了在导出jar包的时候不把hadoop-client加进去,以免增加jar大小。-->
        <scope>provided</scope>
    </dependency>
</dependencies>

2.新建一个类WordCountMapper

代码语言:javascript复制
package com.xmaven;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 统计单词出现的次数
 * 这部分简单的输入是由mapreduce自动读取进来的
 * 简单的统计单词出现的次数
 * 参数一:KEYIN 默认情况下,是MapReduce所读取到的一行文本的起始偏移量,Long类型,在Hadoop中有其自己的序列化类LongWriterable     相当于获取到读取的光标--读取到哪里了
 * 参数二:VALUEIN 默认情况下,是MapReduce所读取到的一行文本的内容,Hadoop中序列化类型为Text   就是一行字符串
 * 参数三:KEYOUT 是用户自定义逻辑处理完成后输出的KEY,在此处是单词,String             代表某个单词的名称
 * 参数四:VALUEOUT 是用户自定义逻辑输出的VALUE,这里是单词出现的次数,Long             代表单词统计的次数
 * @author Sanji
 *
 */

public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {

    //重写map方法
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //把读入的一行数据按空格切割
        String[] words = value.toString().split(" ");
        //迭代切割出来的单词数据
        for (String word : words) {
            //把迭代出来的单词封装成<KEY,VALUE>
            Text k2 = new Text(word);
            LongWritable v2 = new LongWritable(1L);
            context.write(k2,v2);
        }

    }
}

3.编写一个类WordCountReduce

代码语言:javascript复制
package com.xmaven;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * 统计单词出现的规约(总计)
 * 参数一:KEYIN   Text,代表某个单词出现的名称,例如hello
 * 参数二:VALUEIN  LongWritable,代表某个单词的统计的一次
 * 参数三:KEYOUT Text,代表某个单词出现的名称,例如hello
 * 参数四:VALUEOUT LongWritable,代表某个单词的统计的总次数
 * @author Sanji
 *
 */
public class WordCountReduce extends Reducer<Text, LongWritable,Text,LongWritable> {

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        //创建一个sum变量,保存key的和
        Long sum =0L;
        //迭代出相同key的value并求和
        for (LongWritable value : values) {
            sum  = value.get();
        }
        LongWritable v2 = new LongWritable(sum);  //输出每个单词出现的总次数
        //把结果写出去
        context.write(key,v2);
    }
}

4.编写一个函数的入口类WordCount

代码语言:javascript复制
package com.xmaven;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCount {

    /**
     * 组装job
     * @param args [0]输入路径 [1]输出路径
     */
    public static void main(String[] args) {
        //job需要的配置参数
        Configuration conf = new Configuration();
        try {
            //这里是为了防止没有写入输入和输出路径
            if (args.length!=2){
                System.exit(100);
            }
            //创建一个job
            Job job = Job.getInstance(conf);
            //注意:这一行必须设置,否则在集群中找不到WordCount类
            job.setJarByClass(WordCount.class);
            //指定map所在类
            job.setMapperClass(WordCountMapper.class);
            //指定reduce所在类
            job.setReducerClass(WordCountReduce.class);

            //指定Mapper输出的类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            //指定最终输出的类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);

            //指定输入路径(可以是文件,也可以是文件)路径参数从启动任务的时候传进来
            FileInputFormat.setInputPaths(job,new Path(args[0]));  //运行该类时的第一个参数:例如/wordcount/input
            //指定输出文件路径(只能指定一个不存在的目录)
            FileOutputFormat.setOutputPath(job,new Path(args[1]));  //运行该类时的第二个参数:例如/wordcount/output

            //提交作业
            job.waitForCompletion(true);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
}

5.利用Maven打包pageinstall(IDEA工具)

6.上传到主节点服务器(XFTP)

7.我们准备一份单词数据文件并上传到hdfs文件系统中

代码语言:javascript复制
vim word.txt

添加内容

代码语言:javascript复制
hello world
hello hadoop
the world is beautiful

上传文件到hdfs

代码语言:javascript复制
hdfs dfs -put word.txt /

8.提交任务

代码语言:javascript复制
hadoop jar wordcount-1.0-SNAPSHOT.jar com.xmaven.WordCount hdfs://xx.xx.xx.xx:9000/word.txt hdfs://xx.xx.xx.xx:9000/out
代码语言:javascript复制
指令解释:

hadoop jar :使用hadoop运行jar包

wordcount-1.0-SNAPSHOT.jar :之前我们到出的项目jar包

com.xmaven.WordCount :主入口类所在的类全名(加上类所在的包名,如果没有包写类名即可)

hdfs://xx.xx.xx.xx:9000/word.txt :输入文件

hdfs://xx.xx.xx.xx:9000/out :输出文件到该目录,注意:此目录一定是不存在的目录

成功效果如下:

代码语言:javascript复制
2020-08-16 22:49:47,331 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
[root@node1 ~]# hadoop jar wordcount-1.0-SNAPSHOT.jar com.xmaven.WordCount hdfs://xx.xx.xx.xx:9000/word.txt hdfs://xx.xx.xx.xx:9000/out2020-08-16 22:53:01,385 INFO client.RMProxy: Connecting to ResourceManager at node1/xx.xx.xx.xx:8032
2020-08-16 22:53:01,919 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2020-08-16 22:53:01,946 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1597570448090_0001
2020-08-16 22:53:02,088 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-08-16 22:53:02,255 INFO input.FileInputFormat: Total input files to process : 1
2020-08-16 22:53:02,297 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-08-16 22:53:02,321 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-08-16 22:53:02,357 INFO mapreduce.JobSubmitter: number of splits:1
2020-08-16 22:53:02,611 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-08-16 22:53:02,634 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1597570448090_0001
2020-08-16 22:53:02,634 INFO mapreduce.JobSubmitter: Executing with tokens: []
2020-08-16 22:53:02,882 INFO conf.Configuration: resource-types.xml not found
2020-08-16 22:53:02,882 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2020-08-16 22:53:03,365 INFO impl.YarnClientImpl: Submitted application application_1597570448090_0001
2020-08-16 22:53:03,429 INFO mapreduce.Job: The url to track the job: http://node1:8088/proxy/application_1597570448090_0001/
2020-08-16 22:53:03,430 INFO mapreduce.Job: Running job: job_1597570448090_0001
2020-08-16 22:53:11,599 INFO mapreduce.Job: Job job_1597570448090_0001 running in uber mode : false
2020-08-16 22:53:11,601 INFO mapreduce.Job:  map 0% reduce 0%
2020-08-16 22:53:17,674 INFO mapreduce.Job:  map 100% reduce 0%
2020-08-16 22:53:21,704 INFO mapreduce.Job:  map 100% reduce 100%
2020-08-16 22:53:21,711 INFO mapreduce.Job: Job job_1597570448090_0001 completed successfully
2020-08-16 22:53:21,809 INFO mapreduce.Job: Counters: 53
    File System Counters
        FILE: Number of bytes read=134
        FILE: Number of bytes written=434231
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=146
        HDFS: Number of bytes written=48
        HDFS: Number of read operations=8
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=1
        Launched reduce tasks=1
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=3481
        Total time spent by all reduces in occupied slots (ms)=2363
        Total time spent by all map tasks (ms)=3481
        Total time spent by all reduce tasks (ms)=2363
        Total vcore-milliseconds taken by all map tasks=3481
        Total vcore-milliseconds taken by all reduce tasks=2363
        Total megabyte-milliseconds taken by all map tasks=3564544
        Total megabyte-milliseconds taken by all reduce tasks=2419712
    Map-Reduce Framework
        Map input records=3
        Map output records=8
        Map output bytes=112
        Map output materialized bytes=134
        Input split bytes=98
        Combine input records=0
        Combine output records=0
        Reduce input groups=6
        Reduce shuffle bytes=134
        Reduce input records=8
        Reduce output records=6
        Spilled Records=16
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=101
        CPU time spent (ms)=1110
        Physical memory (bytes) snapshot=483147776
        Virtual memory (bytes) snapshot=5168349184
        Total committed heap usage (bytes)=312999936
        Peak Map Physical memory (bytes)=293695488
        Peak Map Virtual memory (bytes)=2580942848
        Peak Reduce Physical memory (bytes)=189452288
        Peak Reduce Virtual memory (bytes)=2587406336
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=48
    File Output Format Counters 
        Bytes Written=48

9.查看输出结果

代码语言:javascript复制
hdfs dfs -ls /out

效果如下:

代码语言:javascript复制
[root@node1 ~]# hdfs dfs -ls /out
Found 2 items
-rw-r--r--   2 root supergroup          0 2020-08-16 22:53 /out/_SUCCESS
-rw-r--r--   2 root supergroup         48 2020-08-16 22:53 /out/part-r-00000
代码语言:javascript复制
hdfs dfs -cat /out/part-r-00000

效果如下:

代码语言:javascript复制
[root@node1 ~]# hdfs dfs -cat /out/part-r-00000
2020-08-16 22:59:00,255 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
beautiful    1
hadoop    1
hello    2
is    1
the    1
world    2

0 人点赞