Hadoop使用学习笔记
2. 基本Map-Reduce工作配置与原理(上)
我们假设MapReduce任务为统计所有文件中每个词语出现次数。
整个MapReduce流程主要如下所示,可以分为四步:
我们将统计所有文件中每个词语出现次数拆分成为:
- 文件输入转换成Map工作可处理的键值对(后面我们会知道是以文件位置为key,文件内容为value)
- Map:提取上一步value中的所有词语,生成以词语为key,value为1的键值对
- Reduce:统计每个词语出现的个数,转换成以词语为key,value为出现次数的键值对
- 输出上一步的输出到文件
Input是将输入(比如数据库,网络,文件等)转化为Hadoop可以处理的标准输入。这里我们拿文件输入举例,假设我们有如下两个文件作为输入流:
Hadoop会将它们转化成什么呢?我们看下Hadoop的源码,针对文件输入,Hadoop中有如下类:
Hadoop会将过大的文件拆分。在HDFS中,文件被分成多个Block,并且每个Block会被保存多份。在用HDFS的文件作为输入时,我们需要获取文件有多少个Block以及每个Block位于哪个datanode上。获取到文件后,按照合适的规则以及map任务数量,分割成多个输入文件。有个makeSplit方法就是将文件输入转成一个一个块:
代码语言:javascript复制protected FileSplit makeSplit(Path file, long start, long length,
String[] hosts, String[] inMemoryHosts) {
return new FileSplit(file, start, length, hosts, inMemoryHosts);
}
我们可以看出,每个FileSplit包括:
- file:文件
- start:该FileSplit在file中的起始字节位置
- length:该FileSplit的字节长度
- hosts和inMemoryHosts:这个我们之后在HDFS部分会详细描述,这里我们就理解成file所处的datanode和缓存node就可以
下面代码展示究竟是如何拆分的。
代码语言:javascript复制//job: MapReduce的配置, numSplits一般等于或者大于Map任务的个数,因为每个Map任务至少要处理一个split
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
//计时用
StopWatch sw = new StopWatch().start();
FileStatus[] files = listStatus(job);
//保存文件个数
job.setLong(NUM_INPUT_FILES, files.length);
//计算所有文件大小,遇到非文件(文件夹)抛异常
long totalSize = 0;
for (FileStatus file : files) {
if (file.isDirectory()) {
throw new IOException("Not a file: " file.getPath());
}
totalSize = file.getLen();
}
//用总大小除以numSplits获取每个Map任务处理文件理想大小
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
//最小大小为1(minSplitSize)与配置中mapreduce.input.fileinputformat.split.minsize的最大值
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
//生成splits
ArrayList splits = new ArrayList(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file : files) {
Path path = file.getPath();
long length = file.getLen();
//长度不为0,获取File的HDFS信息
if (length != 0) {
//获取文件的HDFS信息,文件Block分布的Hosts和缓存Hosts
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
//如果可以拆分
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
//取goalSize与 minSize和blockSize的最小值 中的大的那个
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
long bytesRemaining = length;
//如果剩余大于splitSize的1.1倍则继续拆分
while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length - bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, splitSize,
splitHosts[0], splitHosts[1]));
bytesRemaining -= splitSize;
}
//剩余不为0,则将剩下的组成一个Split
if (bytesRemaining != 0) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts[0], splitHosts[1]));
}
} else {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, 0, length, clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
}
}
//长度为0,直接用空host生成FileSplits
else {
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
//计时结束
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " splits.size()
", TimeTaken: " sw.now(TimeUnit.MILLISECONDS));
}
return splits.toArray(new FileSplit[splits.size()]);
}
所以,经过Input这一步,我们得到了:
除了文件输入,Hadoop中还有其他输入:
比如DB输入DBInputFormat,常用的还是FileInputFormat,因为大部分MapReduce job都基于HDFS。FileInputFormat默认的实现类是TextInputFormat,就是纯文本输入,也是我们这里的例子使用的。 Map阶段的输入是TextInputFormat,之前的FileSplit会经过如下方法的处理:
代码语言:javascript复制public RecordReader getRecordReader(
InputSplit genericSplit, JobConf job,
Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
String delimiter = job.get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter) {
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
}
return new LineRecordReader(job, (FileSplit) genericSplit,
recordDelimiterBytes);
}
LineRecordReader的next方法会在各个工作节点被调用,生成LongWritable类型的key和Text类型的value的键值对输入:
代码语言:javascript复制public synchronized boolean next(LongWritable key, Text value)
throws IOException {
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
key.set(pos);
//
int newSize = 0;
if (pos == 0) {
newSize = skipUtfByteOrderMark(value);
} else {
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
pos = newSize;
}
if (newSize == 0) {
return false;
}
if (newSize < maxLineLength) {
return true;
}
// line too long. try again
LOG.info("Skipped line of size " newSize " at pos " (pos - newSize));
}
return false;
}
通过上面源码可以看出:key为文件起始位置,value为文件内容。
每个Map任务接受(LongWritable->Text)为输入,输出为(Text->IntWritable)即(词语->1)。 之后进入Reduce,hadoop框架中会将Map的输出在Reduce步骤进行第一步的聚合,我们从ReduceTask类的runOldReducer方法中可以知道:
代码语言:javascript复制private
void runOldReducer(JobConf job,
TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
RawComparator comparator,
Class keyClass,
Class valueClass) throws IOException {
Reducer reducer =
ReflectionUtils.newInstance(job.getReducerClass(), job);
// make output collector
String finalName = getOutputName(getPartition());
RecordWriter out = new OldTrackingRecordWriter(
this, job, reporter, finalName);
final RecordWriter finalOut = out;
OutputCollector collector =
new OutputCollector() {
public void collect(OUTKEY key, OUTVALUE value)
throws IOException {
finalOut.write(key, value);
// indicate that progress update needs to be sent
reporter.progress();
}
};
// apply reduce function
try {
//increment processed counter only if skipping feature is enabled
boolean incrProcCount = SkipBadRecords.getReducerMaxSkipGroups(job)>0 &&
SkipBadRecords.getAutoIncrReducerProcCount(job);
//生成同一个keys的所有values的Iterator数据结构
ReduceValuesIterator values = isSkipping() ?
new SkippingReduceValuesIterator(rIter,
comparator, keyClass, valueClass,
job, reporter, umbilical) :
new ReduceValuesIterator(rIter,
job.getOutputValueGroupingComparator(), keyClass, valueClass,
job, reporter);
values.informReduceProgress();
while (values.more()) {
reduceInputKeyCounter.increment(1);
//调用用户实现的reducer的reduce方法进行实际的reduce
reducer.reduce(values.getKey(), values, collector, reporter);
if(incrProcCount) {
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS, 1);
}
values.nextKey();
values.informReduceProgress();
}
reducer.close();
reducer = null;
out.close(reporter);
out = null;
} finally {
IOUtils.cleanup(LOG, reducer);
closeQuietly(out, reporter);
}
}
可以总结,hadoop在到了reduce阶段,首先隐式的将Map输出进行归类。
注意,这里apple对应的Iterator有三个1,包含所有的Map的输出,所以,我们知道,只有所有的Map执行完后,reduce才会开始。 之后,reduce统计每个词出现次数,输出以词语为key,value为出现次数的键值对。
最后,将结果写入文件:
这样,一个完整的流程就展示完了。下一篇我们将写这个任务的源代码,配置本地提交任务至远程Hadoop集群。