Hadoop使用学习笔记(2)

2021-04-12 16:21:45 浏览数 (1)

Hadoop使用学习笔记

2. 基本Map-Reduce工作配置与原理(上)

我们假设MapReduce任务为统计所有文件中每个词语出现次数

整个MapReduce流程主要如下所示,可以分为四步:

我们将统计所有文件中每个词语出现次数拆分成为:

  1. 文件输入转换成Map工作可处理的键值对(后面我们会知道是以文件位置为key,文件内容为value)
  2. Map:提取上一步value中的所有词语,生成以词语为key,value为1的键值对
  3. Reduce:统计每个词语出现的个数,转换成以词语为key,value为出现次数的键值对
  4. 输出上一步的输出到文件

Input是将输入(比如数据库,网络,文件等)转化为Hadoop可以处理的标准输入。这里我们拿文件输入举例,假设我们有如下两个文件作为输入流:

Hadoop会将它们转化成什么呢?我们看下Hadoop的源码,针对文件输入,Hadoop中有如下类:

Hadoop会将过大的文件拆分。在HDFS中,文件被分成多个Block,并且每个Block会被保存多份。在用HDFS的文件作为输入时,我们需要获取文件有多少个Block以及每个Block位于哪个datanode上。获取到文件后,按照合适的规则以及map任务数量,分割成多个输入文件。有个makeSplit方法就是将文件输入转成一个一个块:

代码语言:javascript复制
protected FileSplit makeSplit(Path file, long start, long length,
                                  String[] hosts, String[] inMemoryHosts) {
        return new FileSplit(file, start, length, hosts, inMemoryHosts);
    }

我们可以看出,每个FileSplit包括:

  • file:文件
  • start:该FileSplit在file中的起始字节位置
  • length:该FileSplit的字节长度
  • hosts和inMemoryHosts:这个我们之后在HDFS部分会详细描述,这里我们就理解成file所处的datanode和缓存node就可以

下面代码展示究竟是如何拆分的。

代码语言:javascript复制
//job: MapReduce的配置, numSplits一般等于或者大于Map任务的个数,因为每个Map任务至少要处理一个split
public InputSplit[] getSplits(JobConf job, int numSplits)
        throws IOException {
    //计时用
    StopWatch sw = new StopWatch().start();
    FileStatus[] files = listStatus(job);

    //保存文件个数
    job.setLong(NUM_INPUT_FILES, files.length);
    //计算所有文件大小,遇到非文件(文件夹)抛异常
    long totalSize = 0;
    for (FileStatus file : files) {
        if (file.isDirectory()) {
            throw new IOException("Not a file: "   file.getPath());
        }
        totalSize  = file.getLen();
    }
    //用总大小除以numSplits获取每个Map任务处理文件理想大小
    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    //最小大小为1(minSplitSize)与配置中mapreduce.input.fileinputformat.split.minsize的最大值
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
            FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

    //生成splits
    ArrayList splits = new ArrayList(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    for (FileStatus file : files) {
        Path path = file.getPath();
        long length = file.getLen();
        //长度不为0,获取File的HDFS信息
        if (length != 0) {
            //获取文件的HDFS信息,文件Block分布的Hosts和缓存Hosts
            FileSystem fs = path.getFileSystem(job);
            BlockLocation[] blkLocations;
            if (file instanceof LocatedFileStatus) {
                blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
                blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
            //如果可以拆分
            if (isSplitable(fs, path)) {
                long blockSize = file.getBlockSize();
                //取goalSize与 minSize和blockSize的最小值 中的大的那个
                long splitSize = computeSplitSize(goalSize, minSize, blockSize);

                long bytesRemaining = length;
                //如果剩余大于splitSize的1.1倍则继续拆分
                while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
                    String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
                            length - bytesRemaining, splitSize, clusterMap);
                    splits.add(makeSplit(path, length - bytesRemaining, splitSize,
                            splitHosts[0], splitHosts[1]));
                    bytesRemaining -= splitSize;
                }
                //剩余不为0,则将剩下的组成一个Split
                if (bytesRemaining != 0) {
                    String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
                            - bytesRemaining, bytesRemaining, clusterMap);
                    splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                            splitHosts[0], splitHosts[1]));
                }
            } else {
                String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, 0, length, clusterMap);
                splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
            }
        }
        //长度为0,直接用空host生成FileSplits
        else {
            splits.add(makeSplit(path, 0, length, new String[0]));
        }
    }
    //计时结束
    sw.stop();
    if (LOG.isDebugEnabled()) {
        LOG.debug("Total # of splits generated by getSplits: "   splits.size()
                  ", TimeTaken: "   sw.now(TimeUnit.MILLISECONDS));
    }
    return splits.toArray(new FileSplit[splits.size()]);
}

所以,经过Input这一步,我们得到了:

除了文件输入,Hadoop中还有其他输入:

比如DB输入DBInputFormat,常用的还是FileInputFormat,因为大部分MapReduce job都基于HDFS。FileInputFormat默认的实现类是TextInputFormat,就是纯文本输入,也是我们这里的例子使用的。 Map阶段的输入是TextInputFormat,之前的FileSplit会经过如下方法的处理:

代码语言:javascript复制
public RecordReader getRecordReader(
                                        InputSplit genericSplit, JobConf job,
                                        Reporter reporter)
  throws IOException {

  reporter.setStatus(genericSplit.toString());
  String delimiter = job.get("textinputformat.record.delimiter");
  byte[] recordDelimiterBytes = null;
  if (null != delimiter) {
    recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
  }
  return new LineRecordReader(job, (FileSplit) genericSplit,
      recordDelimiterBytes);
}

LineRecordReader的next方法会在各个工作节点被调用,生成LongWritable类型的key和Text类型的value的键值对输入:

代码语言:javascript复制
public synchronized boolean next(LongWritable key, Text value)
    throws IOException {

    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
      key.set(pos);
      //
      int newSize = 0;
      if (pos == 0) {
        newSize = skipUtfByteOrderMark(value);
      } else {
        newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
        pos  = newSize;
      }

      if (newSize == 0) {
        return false;
      }
      if (newSize < maxLineLength) {
        return true;
      }

      // line too long. try again
      LOG.info("Skipped line of size "   newSize   " at pos "   (pos - newSize));
    }

    return false;
  }

通过上面源码可以看出:key为文件起始位置,value为文件内容。

每个Map任务接受(LongWritable->Text)为输入,输出为(Text->IntWritable)即(词语->1)。 之后进入Reduce,hadoop框架中会将Map的输出在Reduce步骤进行第一步的聚合,我们从ReduceTask类的runOldReducer方法中可以知道:

代码语言:javascript复制
private 
  void runOldReducer(JobConf job,
                     TaskUmbilicalProtocol umbilical,
                     final TaskReporter reporter,
                     RawKeyValueIterator rIter,
                     RawComparator comparator,
                     Class keyClass,
                     Class valueClass) throws IOException {
    Reducer reducer = 
      ReflectionUtils.newInstance(job.getReducerClass(), job);
    // make output collector
    String finalName = getOutputName(getPartition());

    RecordWriter out = new OldTrackingRecordWriter(
        this, job, reporter, finalName);
    final RecordWriter finalOut = out;

    OutputCollector collector = 
      new OutputCollector() {
        public void collect(OUTKEY key, OUTVALUE value)
          throws IOException {
          finalOut.write(key, value);
          // indicate that progress update needs to be sent
          reporter.progress();
        }
      };

    // apply reduce function
    try {
      //increment processed counter only if skipping feature is enabled
      boolean incrProcCount = SkipBadRecords.getReducerMaxSkipGroups(job)>0 &&
        SkipBadRecords.getAutoIncrReducerProcCount(job);
      //生成同一个keys的所有values的Iterator数据结构
      ReduceValuesIterator values = isSkipping() ? 
          new SkippingReduceValuesIterator(rIter, 
              comparator, keyClass, valueClass, 
              job, reporter, umbilical) :
          new ReduceValuesIterator(rIter, 
          job.getOutputValueGroupingComparator(), keyClass, valueClass, 
          job, reporter);
      values.informReduceProgress();
      while (values.more()) {
        reduceInputKeyCounter.increment(1);
        //调用用户实现的reducer的reduce方法进行实际的reduce
        reducer.reduce(values.getKey(), values, collector, reporter);
        if(incrProcCount) {
          reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
              SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS, 1);
        }
        values.nextKey();
        values.informReduceProgress();
      }

      reducer.close();
      reducer = null;

      out.close(reporter);
      out = null;
    } finally {
      IOUtils.cleanup(LOG, reducer);
      closeQuietly(out, reporter);
    }
  }

可以总结,hadoop在到了reduce阶段,首先隐式的将Map输出进行归类。

注意,这里apple对应的Iterator有三个1,包含所有的Map的输出,所以,我们知道,只有所有的Map执行完后,reduce才会开始。 之后,reduce统计每个词出现次数,输出以词语为key,value为出现次数的键值对。

最后,将结果写入文件:

这样,一个完整的流程就展示完了。下一篇我们将写这个任务的源代码,配置本地提交任务至远程Hadoop集群。

0 人点赞