MapReduce之自定义inputFormat合并小文件

2021-01-22 16:54:15 浏览数 (1)

无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。

小文件的优化无非以下几种方式:

  1. 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
  2. 在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
  3. 在mapreduce处理时,可采用combineInputFormat提高效率

本篇博客小菌将大家实现的是上述第二种方式! 先让我们确定程序的核心机制:

自定义一个InputFormat 改写RecordReader,实现一次读取一个完整文件封装为KV 在输出时使用SequenceFileOutPutFormat输出合并文件

具体的代码如下:

自定义InputFromat

代码语言:javascript复制
public class Custom_FileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {


    /*
   直接返回文件不可切割,保证一个文件是一个完整的一行
    */

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {

        return false;

    }


    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

        Custom_RecordReader custom_recordReader = new Custom_RecordReader();


        custom_recordReader.initialize(split,context);


        return custom_recordReader;
    }
}

自定义RecordReader

代码语言:javascript复制
/**
 * 
 * RecordReader的核心工作逻辑:
 * 通过nextKeyValue()方法去读取数据构造将返回的key   value
 * 通过getCurrentKey 和 getCurrentValue来返回上面构造好的key和value
 * 
 * 
 * @author
 *
 */
public class Custom_RecordReader extends RecordReader<NullWritable, BytesWritable> {

    private FileSplit fileSplit;

    private Configuration conf;

    private BytesWritable bytesWritable = new BytesWritable();

    private boolean pressced = false;


    //初始化

    /**
     *
     * @param split   封装的文件的对象内容
     * @param context 上下文对象
     * @throws IOException
     * @throws InterruptedException
     */

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

        this.fileSplit = (FileSplit) split;
        this.conf=context.getConfiguration();

    }

    //读取下一个文件
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {


        if (!pressced){

            //获取文件路径
            Path path = fileSplit.getPath();

            //获取FileSystem对象
            FileSystem fileSystem = null;
            FSDataInputStream inputStream = null;

            try {
                fileSystem = FileSystem.get(conf);

                //读取文件
                inputStream = fileSystem.open(path);

                //初始化一个字节数据,大小为文件的长度
                byte[] bytes = new byte[(int)fileSplit.getLength()];

                //把数据流转换成字节数组
                IOUtils.readFully(inputStream,bytes,0,bytes.length);

                //把 字节数组 转换成 BytesWritable 对象
                bytesWritable.set(bytes,0,bytes.length);


            } catch (Exception e) {
                e.printStackTrace();

            }finally {

                fileSystem.close();

                if (null != inputStream ){
                    inputStream.close();
                }
            }

            pressced = true;
            return true;

        }else{

            return false;
        }

    }

    //获取当前的key值
    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();

    }

    //获取当前的value值
    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return bytesWritable;
    }

    //获取当前的进程
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return pressced?0:1;
    }

    //关流
    @Override
    public void close() throws IOException {


    }
}

map处理

代码语言:javascript复制
public class Custom_Mapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {


    @Override
    protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {

        FileSplit fileSplit = (FileSplit)context.getInputSplit();

        String name = fileSplit.getPath().getName();

        context.write(new Text(name),value);



    }
}

定义mapreduce处理流程

代码语言:javascript复制
public class Customer_Driver {

    public static void main(String[] args) throws Exception {


        //1.实例化job对象
        Job job = Job.getInstance(new Configuration(), "Customer_Driver");
        job.setJarByClass(Customer_Driver.class);

        //2.设置输入
        job.setInputFormatClass(Custom_FileInputFormat.class);
        Custom_FileInputFormat.addInputPath(job,new Path("E:\2019大数据课程\DeBug\测试\order\素材\5\自定义inputformat_小文件合并\input"));

        //3.设置Map
        job.setMapperClass(Custom_Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        //4.设置Reduce()
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        //5.设置输出
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        SequenceFileOutputFormat.setOutputPath(job,new Path("E:\2019大数据课程\DeBug\测试\order\素材\5\自定义inputformat_小文件合并\output4"));
        
        boolean b = job.waitForCompletion(true);
        System.out.println(b?0:1);

    }
}

到这里我们的程序就算完成了。在运行之前,我们先打开我们程序读取的目录,可以看到在input目录下有两个文件。

然后运行程序。

伴随着成功运行,我们可以再进入到程序输出目录,查看情况。

可以发现该文件已经把多个文件的内容合并在了一起,部分内容出现乱码属于正常现象。这是由于该过程属于序列化后的结果,如果我们想要看到文件最初的内容需要后续做反序列化处理!

那么本次分享的内容就到这里了,下期小菌将为大家带来MapReduce之自定义outputFormat的内容,敬请期待!!!

0 人点赞