MapReduce快速入门系列(12) | MapReduce之OutputFormat

2020-10-28 15:26:03 浏览数 (1)

前面我们讲解了MapTask,ReduceTask和MapReduce运行机制。,那么这篇文章博主继续为大家讲解OutputFormat数据输出。

一. OutputFormat接口实现类

  OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。下面介绍几种常见的OutputFormat实现类。

1.1 文本输出TextOutputFormat

  默认的输出格式是TextOutFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toSTRING()方法把它们转换为字符串。

1.2 SequenceFileOutputFormat

  将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。

1.3 自定义OutputFormat

  根据用户需求,自定义实现输出。

二. 自定义OutputFormat的使用场景和步骤

2.1 使用场景

  为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。   eg:要在一个MapReduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义OutputFormat来实现。

2.2 自定义OutputFormat步骤

1、自定义一个类继承FileOutputFormat。 2、改写RecordWriter,具体改写输出数据的方法write()。

三. 举例操作

3.1 需求

  过滤输入的log日志,包含buwenbuhuo的网站输出到d:/buwenbuhuo.log,不包含buwenbuhuo的网站输出到d:/other.log

3.2 需求分析

  1. 需求: 过滤输入的log日志,包含buwenbuhuo的网站输出到d:/buwenbuhuo.log,不包含buwenbuhuo的网站输出到d:/other.log

3.3 编写代码

  • 1. 自定义一个MyOutputFormat类
代码语言:javascript复制
package com.buwenbuhuo.outputformat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
/**
 * @author 卜温不火
 * @create 2020-04-25 16:37
 * com.buwenbuhuo.outputformat - the name of the target package where the new class or interface will be created.
 * mapreduce0422 - the name of the current project.
 */
public class MyOutputFormat extends FileOutputFormat<LongWritable, Text> {
    @Override
    public RecordWriter<LongWritable, Text> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        MyRecordWriter myRecordWriter = new MyRecordWriter();
        myRecordWriter.initialize(job);
        return myRecordWriter;
    }
}
  • 2. 编写MyRecordWriter类
代码语言:javascript复制
package com.buwenbuhuo.outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
/**
 * @author 卜温不火
 * @create 2020-04-25 16:37
 * com.buwenbuhuo.outputformat - the name of the target package where the new class or interface will be created.
 * mapreduce0422 - the name of the current project.
 */
public class MyRecordWriter extends RecordWriter<LongWritable, Text> {


    private FSDataOutputStream buwenbuhuo;
    private FSDataOutputStream other;

    /**
     * 初始化方法
     * @param job
     */
    public void initialize(TaskAttemptContext job) throws IOException {
        String outdir = job.getConfiguration().get(FileOutputFormat.OUTDIR);
        FileSystem fileSystem = FileSystem.get(job.getConfiguration());
        buwenbuhuo = fileSystem.create(new Path(outdir   "/buwenbuhuo.log"));
        other = fileSystem.create(new Path(outdir   "/other.log"));
    }


    /**
     * 将KV写出,每对KV调用一次
     * @param key
     * @param value
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void write(LongWritable key, Text value) throws IOException, InterruptedException {
        String out = value.toString()   "n";
        if (out.contains("buwenbuhuo")) {
            buwenbuhuo.write(out.getBytes());
        } else {
            other.write(out.getBytes());
        }
    }

    /**
     * 关闭资源
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        IOUtils.closeStream(buwenbuhuo);
        IOUtils.closeStream(other);
    }
}
  • 3. 编写OutputDriver类
代码语言:javascript复制
package com.buwenbuhuo.outputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
/**
 * @author 卜温不火
 * @create 2020-04-25 16:37
 * com.buwenbuhuo.outputformat - the name of the target package where the new class or interface will be created.
 * mapreduce0422 - the name of the current project.
 */
public class OutputDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(OutputDriver.class);

        job.setOutputFormatClass(MyOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path("d:\input"));
        FileOutputFormat.setOutputPath(job, new Path("d:\output"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

3.4 运行及结果

  • 1. 运行
  • 2.结果

0 人点赞