无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。
小文件的优化无非以下几种方式:
- 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
- 在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
- 在mapreduce处理时,可采用combineInputFormat提高效率
本篇博客小菌将大家实现的是上述第二种方式! 先让我们确定程序的核心机制:
自定义一个InputFormat 改写RecordReader,实现一次读取一个完整文件封装为KV 在输出时使用SequenceFileOutPutFormat输出合并文件
具体的代码如下:
自定义InputFromat
代码语言:javascript复制public class Custom_FileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
/*
直接返回文件不可切割,保证一个文件是一个完整的一行
*/
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
Custom_RecordReader custom_recordReader = new Custom_RecordReader();
custom_recordReader.initialize(split,context);
return custom_recordReader;
}
}
自定义RecordReader
代码语言:javascript复制/**
*
* RecordReader的核心工作逻辑:
* 通过nextKeyValue()方法去读取数据构造将返回的key value
* 通过getCurrentKey 和 getCurrentValue来返回上面构造好的key和value
*
*
* @author
*
*/
public class Custom_RecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable bytesWritable = new BytesWritable();
private boolean pressced = false;
//初始化
/**
*
* @param split 封装的文件的对象内容
* @param context 上下文对象
* @throws IOException
* @throws InterruptedException
*/
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.conf=context.getConfiguration();
}
//读取下一个文件
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!pressced){
//获取文件路径
Path path = fileSplit.getPath();
//获取FileSystem对象
FileSystem fileSystem = null;
FSDataInputStream inputStream = null;
try {
fileSystem = FileSystem.get(conf);
//读取文件
inputStream = fileSystem.open(path);
//初始化一个字节数据,大小为文件的长度
byte[] bytes = new byte[(int)fileSplit.getLength()];
//把数据流转换成字节数组
IOUtils.readFully(inputStream,bytes,0,bytes.length);
//把 字节数组 转换成 BytesWritable 对象
bytesWritable.set(bytes,0,bytes.length);
} catch (Exception e) {
e.printStackTrace();
}finally {
fileSystem.close();
if (null != inputStream ){
inputStream.close();
}
}
pressced = true;
return true;
}else{
return false;
}
}
//获取当前的key值
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
//获取当前的value值
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return bytesWritable;
}
//获取当前的进程
@Override
public float getProgress() throws IOException, InterruptedException {
return pressced?0:1;
}
//关流
@Override
public void close() throws IOException {
}
}
map处理
代码语言:javascript复制public class Custom_Mapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
@Override
protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String name = fileSplit.getPath().getName();
context.write(new Text(name),value);
}
}
定义mapreduce处理流程
代码语言:javascript复制public class Customer_Driver {
public static void main(String[] args) throws Exception {
//1.实例化job对象
Job job = Job.getInstance(new Configuration(), "Customer_Driver");
job.setJarByClass(Customer_Driver.class);
//2.设置输入
job.setInputFormatClass(Custom_FileInputFormat.class);
Custom_FileInputFormat.addInputPath(job,new Path("E:\2019大数据课程\DeBug\测试\order\素材\5\自定义inputformat_小文件合并\input"));
//3.设置Map
job.setMapperClass(Custom_Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
//4.设置Reduce()
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
//5.设置输出
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(job,new Path("E:\2019大数据课程\DeBug\测试\order\素材\5\自定义inputformat_小文件合并\output4"));
boolean b = job.waitForCompletion(true);
System.out.println(b?0:1);
}
}
到这里我们的程序就算完成了。在运行之前,我们先打开我们程序读取的目录,可以看到在input目录下有两个文件。
然后运行程序。
伴随着成功运行,我们可以再进入到程序输出目录,查看情况。
可以发现该文件已经把多个文件的内容合并在了一起,部分内容出现乱码属于正常现象。这是由于该过程属于序列化后的结果,如果我们想要看到文件最初的内容需要后续做反序列化处理!
那么本次分享的内容就到这里了,下期小菌将为大家带来MapReduce之自定义outputFormat的内容,敬请期待!!!