现在的位置: 首页 > 黄专家专栏 > 正文

作业的提交和监控(二)

2014年11月03日 黄专家专栏 ⁄ 共 4632字 ⁄ 字号 评论关闭

文件分片

函数

1
2
3
4
5
6
7
8
9
10
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
    Path jobSubmitDir) throws IOException,
    InterruptedException, ClassNotFoundException {
  JobConf jConf = (JobConf)job.getConfiguration();
  int maps;
  if (jConf.getUseNewMapper()) {
    maps = writeNewSplits(job, jobSubmitDir);
  } else {
    maps = writeOldSplits(jConf, jobSubmitDir);
  }
  return maps;
}

执行文件分片,并得到需要的 map 数目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public InputSplit[] getSplits(JobConf job, int numSplits)
  throws IOException {
  // 得到输入文件的各种状态
  FileStatus[] files = listStatus(job);

  // Save the number of input files in the job-conf
  // conf 中设置输入文件的数目
  job.setLong(NUM_INPUT_FILES, files.length);

  // 计算总的大小
  long totalSize = 0;                           // compute total size
  for (FileStatus file: files) {                // check we have valid files
    if (file.isDir()) {
      throw new IOException("Not a file: "+ file.getPath());
    }
    totalSize += file.getLen();
  }

  // numSplits 传进来的是 map 的数目
  // 获得每一个分片的期望大小
  long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

  // 获得最小的分片大小,这个可以在 mapred.min.split.size 中设置
  long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
                          minSplitSize);

  // generate splits
  // 以下是生成分片的计算
  ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
  NetworkTopology clusterMap = new NetworkTopology();
  for (FileStatus file: files) {
    Path path = file.getPath();
    FileSystem fs = path.getFileSystem(job);
    long length = file.getLen();
    BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
    // isSplitable 是判断该文件是否可以分片
    // 一般情况下都是可以的,但是如果是 stream compressed 的方式,那么是不可以的
    if ((length != 0) && isSplitable(fs, path)) { 
      long blockSize = file.getBlockSize();

      // 计算每一个分片大小的实际函数
      // 得到真实的分片大小
      long splitSize = computeSplitSize(goalSize, minSize, blockSize);

      long bytesRemaining = length;

      // 允许最后一个分片在 SPLIT_SLOP(默认 1.1) 比例之下
      while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
        String[] splitHosts = getSplitHosts(blkLocations, 
            length-bytesRemaining, splitSize, clusterMap);
        // 加入分片
        splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
            splitHosts));
        bytesRemaining -= splitSize;
      }

      // 加入最后一个分片
      // 这个比例最大不超过期望分片的 1.1
      if (bytesRemaining != 0) {
        splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
                   blkLocations[blkLocations.length-1].getHosts()));
      }
    } else if (length != 0) {
      String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
      splits.add(new FileSplit(path, 0, length, splitHosts));
    } else { 
      //Create empty hosts array for zero length files
      splits.add(new FileSplit(path, 0, length, new String[0]));
    }
  }
  LOG.debug("Total # of splits: " + splits.size());
  return splits.toArray(new FileSplit[splits.size()]);
}

protected long computeSplitSize(long goalSize, long minSize,
                                     long blockSize) {
  // 计算分片大小,很明显
  // 这里设定了最大最小值,每一个分片大小在 minSize 和 blockSize 之间
  return Math.max(minSize, Math.min(goalSize, blockSize));
}

这样看,要想设置超过大于 block size 的也是可以的,只要将 minSize 设置很大即可 以上分片算法只是单纯计算需要多少个 map ,根据设定的 mapred.map.tasks 计算出这个任务需要多少个 map 最终的 map 数目,可能和 mapred.map.tasks 不同

但是这样仍然会有一个问题,就是这个只是按照输入文件的大小做逻辑的切分,但是如果文件中含有边界(比如 Text 文件就是以行作为边界),那么实际的划分就不一定是这样的。

这个是由 RecordReader 实现的,它将某一个 split 解析成一个个 key 和 value 对

我们看看实际的 TextInputFormat 类,它其实生成了 LineRecordReader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public LineRecordReader(Configuration job, FileSplit split,
    byte[] recordDelimiter) throws IOException {
  this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                                  Integer.MAX_VALUE);

  // 得到文件开始和结束的位置
  start = split.getStart();
  end = start + split.getLength();
  final Path file = split.getPath();
  compressionCodecs = new CompressionCodecFactory(job);
  final CompressionCodec codec = compressionCodecs.getCodec(file);

  // open the file and seek to the start of the split
  FileSystem fs = file.getFileSystem(job);
  FSDataInputStream fileIn = fs.open(split.getPath());

  // skipFirstLine 表示跳过第一行
  boolean skipFirstLine = false;
  if (codec != null) {
    in = new LineReader(codec.createInputStream(fileIn), job,
          recordDelimiter);
    end = Long.MAX_VALUE;
  } else {
    if (start != 0) {
      // 如果开始的位置不是整个文件的开始
      // 那么,有可能是在行的中间, LineRecordReader 的处理方式是跳过这行,从下一行处理起
      skipFirstLine = true;
      --start;
      fileIn.seek(start);
    }
    in = new LineReader(fileIn, job, recordDelimiter);
  }
  if (skipFirstLine) {  // skip first line and re-establish "start".
      // 跳过第一行
    start += in.readLine(new Text(), 0,
                         (int)Math.min((long)Integer.MAX_VALUE, end - start));
  }
  this.pos = start;
}

public synchronized boolean next(LongWritable key, Text value)
  throws IOException {

  while (pos < end) {
    key.set(pos);

    // 在这里, 会处理一个完整行
    // 但是有可能最后一行的另外一个部分在另一个 split 里面
    // 但是 FSDataInputStream fileIn 作为一个抽象,这样的操作使得对 Reader 透明了
    int newSize = in.readLine(value, maxLineLength,
                              Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
                                       maxLineLength));
    if (newSize == 0) {
      return false;
    }
    pos += newSize;
    if (newSize < maxLineLength) {
      return true;
    }

    // line too long. try again
    LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
  }

  return false;
}

以上代码我们可以知道,TextInputFormat 生成的 LineRecordReader 会根据行边界来切分,避免了 split 逻辑分片不考虑边界的情况。

其实 SequenceFileInputFormat 输入也同样有边界问题,这是根据创建时候的序列点来实现的。 具体代码可以看 SequenceFileRecordReader 里面的实现

抱歉!评论已关闭.