MapReduce之自定义outputFormat

2021-01-22 16:54:32 浏览数 (1)

现在有一些订单的评论数据,需求: 将订单的好评与差评进行区分开来,将最终的数据分开到不同的文件夹下面去,其中数据第九个字段表示好评,中评,差评。0:好评,1:中评,2:差评。 根据我们之前学习的内容,大家可能第一时间想到的是自定义分区(不清楚的小伙伴们可以前往《MapReduce的自定义分区与ReduceTask数量》),但自定义分区后的程序运行的结果是数据保存在了同一个目录下的不同文件中。

而本题的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求我们可以通过自定义outputformat来实现! 先让我们来看下需要实现的点有哪些?

1、在mapreduce中访问外部资源 2、自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()

接下来我们开始上手代码!

第一步:自定义一个outputformat

代码语言:javascript复制
public class Custom_OutputFormat extends FileOutputFormat<Text, NullWritable> {



    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {


        Configuration configuration = context.getConfiguration();

        FileSystem fileSystem = FileSystem.get(configuration);

        //两个输出路径
        FSDataOutputStream fsDataOutputStream1 = fileSystem.create(new Path("E:\2019大数据课程\DeBug\测试结果\outputformat1\1.txt"));

        FSDataOutputStream fsDataOutputStream2 = fileSystem.create(new Path("E:\2019大数据课程\DeBug\测试结果\outputformat1\2.txt"));

        Custom_RecordWriter custom_recordWriter = new Custom_RecordWriter(fsDataOutputStream1, fsDataOutputStream2);




        return custom_recordWriter;

    }
}

第二步:自定义一个RecordWriter

代码语言:javascript复制
public class Custom_RecordWriter extends RecordWriter<Text, NullWritable> {


    FSDataOutputStream out1 = null;

    FSDataOutputStream out2 = null;


    @Override
    public String toString() {
        return "Custom_RecordWriter{"  
                "out1="   out1  
                ", out2="   out2  
                '}';
    }

    public FSDataOutputStream getOut1() {
        return out1;
    }

    public void setOut1(FSDataOutputStream out1) {
        this.out1 = out1;
    }

    public FSDataOutputStream getOut2() {
        return out2;
    }

    public void setOut2(FSDataOutputStream out2) {
        this.out2 = out2;
    }

    public Custom_RecordWriter() {
    }

    public Custom_RecordWriter(FSDataOutputStream out1, FSDataOutputStream out2) {
        this.out1 = out1;
        this.out2 = out2;
    }



    /**
     * 写入数据的方法
     * @param key   要写入的key值
     * @param value   要写入的value
     * @throws IOException
     * @throws InterruptedException
     */

    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {

               if (key.toString().split("t")[9].equals("0")){
                   //好评
                   out1.write(key.toString().getBytes());


               }else{


                   //中评和差评
                   out2.write(key.toString().getBytes());
                   out2.write("rn".getBytes());

               }



    }

    //  关闭
    @Override
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

        if (out1!=null){
            out1.close();
        }

        if (out2!=null){
            out2.close();
        }



    }
}

第三步:Map

代码语言:javascript复制
public class Custom_Mapper extends Mapper<LongWritable, Text,Text, NullWritable> {



    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {


        context.write(value,NullWritable.get());


    }

开发mapreduce处理流程

代码语言:javascript复制
public class Custom_Driver {


    public static void main(String[] args) throws Exception {


        Job job = Job.getInstance(new Configuration(), "Customer_Driver");

        job.setJarByClass(Custom_Driver.class);

        //设置输入
        job.setInputFormatClass(TextInputFormat.class);

        TextInputFormat.addInputPath(job,new Path("E:\2019大数据课程\DeBug\测试\ordercomment.csv"));

        job.setMapperClass(Custom_Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputFormatClass(Custom_OutputFormat.class);

        // 这里path的路径可以任意设置,因为我们在自定义outPutFormat中已经将输出路径确定
        Custom_OutputFormat.setOutputPath(job,new Path("E:\2019大数据课程\DeBug\测试结果\outputformat1"));

        boolean b = job.waitForCompletion(true);

        System.out.println(b?0:1);


    }
}

将代码写完后,我们开始测试程序!

目标文件:ordercomment.csv

程序运行完后,我们进入到outputformat1目录下,看到程序将我们想要的不同的结果放在了两个独立的文件中!

分别打开文件查看内容

到了这里说明我们的自定义outputFormat算是成功了。那本期的分享到这里也就该结束了,小伙伴们有什么疑惑或好的建议可以在评论区留言或者私信小菌都是可以的。小菌有抽时间一一回复大家,另外,受益的小伙伴们记得关注小菌(^U^)ノ~YO。是你们默默的支持鼓舞着小菌不辞辛苦的为大家每天更新新鲜的博客!之后小菌会更新更多好玩的内容,敬请期待!!!

0 人点赞