Java编写Hadoop第一个MapReduce任务

2020-06-01 10:45:47 浏览数 (1)

前言

因为在做网站案例的时候,想学习如何处理海量数据,所以想接触大数据相关的知识,之前对于大数据的了解,仅仅是停留在知道Hadoop,Hive,HBase,Spark的层面上,所以如何学习确实对我造成了困扰,所幸我所在公司部门存在大数据开发,于是不断的请教大佬,大佬说第一步是学一点Linux和Java,然后Hadoop,再然后......。再然后就先不说了,对于Linux和Java我这里可以直接跨过了,然后就是学Hadoop。这周利用散碎的时间,学会了Hadoop的安装使用,使用Java写一个Hadoop任务。安装这里我就不说了,大家可以去网上搜索,或者来我的网站文章模块看我如何安装(Mac): 网址:www.study-java.cn来看一下(需要打开微信小程序:每天学Java进行扫码登录)

引入依赖
代码语言:javascript复制
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.9.2</version>
        </dependency>

我在网上看很多人都引用了hadoop-core,但是由于我引用报错,所以我去除了,大家根据自己需要决定是否补上这个依赖

操作文件

引入依赖之后,我们使用Java可以简单的操作Hadoop的文件系统了。比如增加,删除某些文件/文件夹。操作Hadoop文件系统是我们去了解Hadoop必不可少的一步,下面的代码替代了我们在命令行敲写Hadoop命令的过程,但是安装Hadoop后,我们还是需要去数据Hadoop的常用命令,不然下面代码的作用可能有点不好理解。

代码语言:javascript复制
    public static void main(String[] args) {
        FileSystem fileSystem = null;
        try {
            fileSystem = FileSystem.get(new URI("hdfs://localhost:9000/"), new Configuration(), "chenlong");
            //是否存在
            if (fileSystem.exists(new Path("/test2"))) {
                //强制删除
                fileSystem.delete(new Path("/test2"), true);
            }
            //创建
            fileSystem.mkdirs(new Path("/test2"));
            //将本地文件放入到工作目录下,是否删除本地,是否覆盖已经存在的数据,本地路径,hadoop路径
            fileSystem.copyFromLocalFile(false, false, new Path("/Users/chenlong/Documents/xcx/dream/web-project/email-project/src/main/resources/view/log/log.json"), new Path("/test2"));
            //fileSystem.copyFromLocalFile();
            fileSystem.close();
        } catch (Exception e) {
            e.printStackTrace();
            //Not implemented by the DistributedFileSystem FileSystem implementation -- 引入了hadoop-core 的问题

        }
    }
}
编写MapReduce任务

对于Hadoop文件系统比较熟悉后,那么就来写一个任务去提交,我这里编写的任务是为了统计Json文件中不同URL的访问数量,Json格式如下:

代码语言:javascript复制
 {
    "url":"/view/screen",
    "desc":"请求项目案例->接口数据",
    "name": "每天学Java",
    "email":"4****6@qq.com",
    "date":"1573721558826"
  }

编写Hadoop任务第一步是编写自己的Mapper和Reduce。

代码语言:javascript复制
//Mapper
public static class ApiMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
}
//Reduce
 public static class ApiReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
     
 }

上面<LongWritable, Text, Text, IntWritable>含义是<输入键、输入值、输出键、输出值> LongWritable可以理解为Long,Text可以理解为String。什么叫输入键和输入值呢?笔者这样理解的,我们在提交文件到Hadoop任务的时候,Hadoop会读取文件(默认一行一行的读),然后将文件读取为K-V的形式,然后调用mapper的map方法(每读取一行,就分配到一个Mapper上,由于Hadoop大多数都是集群,所有一个文件会有很有mapper参与进来) 而输出键和输出值就好理解了,这个操作是我们将输入值和输出值经过一些逻辑衍生而成的。

需要注意的时候Reduce的输入键和输入值需要是Mapper的输出键、输出值。

完整Mapper和Reduce。这里我重写了run方法,因为默认按行读取,这样会将我的Json打散,所以我一次读取然后分发到map方法中,其实这里可以优化,就是判断是否组装成json,如果是就分发,不是继续组装,这样就不用读取完文件后在调用map方法。

代码语言:javascript复制
     //    输入键、输入值、输出键、输出值
public static class ApiMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        //重写Run方法,修改每读取一行就调用Mapper的格式
        @Override
        public void run(Context context) throws IOException, InterruptedException {
            super.setup(context);
            String json = "";
            try {
                while (context.nextKeyValue()) {
                    json  = context.getCurrentValue();
                }
                Text text = new Text();
                text.set(json);
                map(context.getCurrentKey(), text, context);
            } finally {
                this.cleanup(context);
            }
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            JSONArray jsonArray = JSON.parseArray(value.toString());
            System.out.println(jsonArray.size());
            for (Object o : jsonArray) {
                JSONObject jsonObject = (JSONObject) o;
                word.set(jsonObject.getString("url"));
                context.write(word, one);
            }
        }
    }

    //使用Mapper的输出作为输入,所以键要取mappere的输出
    public static class ApiReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum  = val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

写完Mapper和Reduce就是写启动函数了:

代码语言:javascript复制
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        //删除输出目录
        FileSystem.get(new URI("hdfs://localhost:9000/"), new Configuration(), "chenlong")
                .delete(new Path("/output"), true);
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration, "Api Request Num");
        job.setJarByClass(ApiCount.class);
        job.setMapperClass(ApiMapper.class);
        job.setCombinerClass(ApiReduce.class);
        job.setReducerClass(ApiReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //输入文件new Path(args[0])
        FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/test2/log.json"));
        //输出文件new Path(args[1])
        FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output"));
        boolean temp = job.waitForCompletion(true);
        if (temp) {
            System.out.println("处理完成");
        } else {
            System.out.println("处理失败");
        }
    }

如此便完成了json日志分析,统计出不同的URL请求数量。

0 人点赞