问题是这样的:
HDFS上存储了一个大小10G不可分割压缩格式的文件(gzip格式),当有一个mr任务去读取这个文件的时候会产生多少个map task?spark去读取这种不可分割格式的大文件时是怎么处理的呢?
关于这个问题,大家应该都看过这个:
Hadoop所支持的几种压缩格式
gzip文件最大的特点在于:不可分割。
OK,我们知道gzip不可分割了。那么一个10G的gzip文件在HDFS是怎么存储的呢?
首先,一个10G的gzip文件在HDFS是放在一个DataNode上,但是blocks=ceil(10G/128M),副本还是3份(hadoop2.0 默认),因为gzip不可分割。
意思就是,这个gzip文件会被存储在一个DataNode上,但是占用的block数量还是 10G/每个block的大小(假设是128M),并且向上取整。
其次,MapReduce在读gzip文件的时候要指定解压方法,就是GzipCodec。然后用InputStream方法去读,MapTask的个数和读取一般文件的个数是一样的。
关于Hadoop Maptask个数,有一个计算公式。代码逻辑和计算公式如下:
作业从JobClient端的submitJobInternal()方法提交作业的同时,调用InputFormat接口的getSplits()方法来创建split。默认是使用InputFormat的子类FileInputFormat来计算分片,而split的默认实现为FileSplit(其父接口为InputSplit)。这里要注意,split只是逻辑上的概念,并不对文件做实际的切分。一个split记录了一个Map Task要处理的文件区间,所以分片要记录其对应的文件偏移量以及长度等。每个split由一个Map Task来处理,所以有多少split,就有多少Map Task。下面着重分析这个方法:
代码语言:javascript复制public List<InputSplit> getSplits(JobContext job
) throws IOException {
//getFormatMinSplitSize():始终返回1
//getMinSplitSize(job):获取” mapred.min.split.size”的值,默认为1
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
//getMaxSplitSize(job):获取"mapred.max.split.size"的值,
//默认配置文件中并没有这一项,所以其默认值为” Long.MAX_VALUE”,即2^63 – 1
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus>files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
FileSystem fs = path.getFileSystem(job.getConfiguration());
long length = file.getLen();
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
if ((length != 0) && isSplitable(job, path)) {
long blockSize = file.getBlockSize();
//计算split大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
//计算split个数
long bytesRemaining = length; //bytesRemaining表示剩余字节数
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //SPLIT_SLOP=1.1
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkLocations.length-1].getHosts()));
}
} else if (length != 0) {
splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
} else {
//Create empty hosts array for zero length files
splits.add(new FileSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files in the job-conf
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
LOG.debug("Total # of splits: " splits.size());
return splits;
}
首先计算分片的下限和上限:minSize和maxSize,具体的过程在注释中已经说清楚了。接下来用这两个值再加上blockSize来计算实际的split大小,过程也很简单,具体代码如下:
代码语言:javascript复制 protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
接下来就是计算实际的分片个数了。针对每个输入文件,计算input split的个数。while循环的含义如下:
- 文件剩余字节数/splitSize > 1.1,创建一个split,这个split的字节数=splitSize,文件剩余字节数=文件大小 - splitSize;
- 文件剩余字节数/splitSize < 1.1,剩余的部分全都作为一个split(这主要是考虑到,不用为剩余的很少的字节数一些启动一个Map Task)
我们发现,在默认配置下,split大小和block大小是相同的。这是不是为了防止这种情况:
一个split如果对应的多个block,若这些block大多不在本地,则会降低Map Task的本地性,降低效率。到这里split的划分就介绍完了,但是有两个问题需要考虑:
- 如果一个record跨越了两个block该怎么办?
这个可以看到,在Map Task读取block的时候,每次是读取一行的,如果发现块的开头不是上一个文件的结束,那么抛弃第一条record,因为这个record会被上一个block对应的Map Task来处理。那么,第二个问题来了:
- 上一个block对应的Map Task并没有最后一条完整的record,它又该怎么办?
一般来说,Map Task在读block的时候都会多读后续的几个block,以处理上面的这种情况。
最后,Spark在读取gzip这种不可分割文件的时候,就退化成从单个task读取、单个core执行任务,很容易产生性能瓶颈。你可以做个测试。在spark的页面上可以看到效果。
基于以上所以,gzip格式最好提前进行分割成小文件或者换格式,因多个文件可以并行读取。另一个办法是read文件后调用repartition操作强制将读取多数据重新均匀分配到不同的executor上,但这个操作会导致大量单节点性能占用,因此该格式建议不在spark上使用。
gzip问题这么多,常用的场景我能想到的只有一个,就是每天的日志文件。单个日志文件不太大,百兆以内。其他的场景暂时想不到。