現在的位置: 首頁 > 黃專家專欄 > 正文

作業的提交和監控(二)

2014年11月03日 黃專家專欄 ⁄ 共 4632字 ⁄ 字型大小 評論關閉

文件分片

函數

1
2
3
4
5
6
7
8
9
10
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
    Path jobSubmitDir) throws IOException,
    InterruptedException, ClassNotFoundException {
  JobConf jConf = (JobConf)job.getConfiguration();
  int maps;
  if (jConf.getUseNewMapper()) {
    maps = writeNewSplits(job, jobSubmitDir);
  } else {
    maps = writeOldSplits(jConf, jobSubmitDir);
  }
  return maps;
}

執行文件分片,並得到需要的 map 數目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public InputSplit[] getSplits(JobConf job, int numSplits)
  throws IOException {
  // 得到輸入文件的各種狀態
  FileStatus[] files = listStatus(job);

  // Save the number of input files in the job-conf
  // conf 中設置輸入文件的數目
  job.setLong(NUM_INPUT_FILES, files.length);

  // 計算總的大小
  long totalSize = 0;                           // compute total size
  for (FileStatus file: files) {                // check we have valid files
    if (file.isDir()) {
      throw new IOException("Not a file: "+ file.getPath());
    }
    totalSize += file.getLen();
  }

  // numSplits 傳進來的是 map 的數目
  // 獲得每一個分片的期望大小
  long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

  // 獲得最小的分片大小,這個可以在 mapred.min.split.size 中設置
  long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
                          minSplitSize);

  // generate splits
  // 以下是生成分片的計算
  ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
  NetworkTopology clusterMap = new NetworkTopology();
  for (FileStatus file: files) {
    Path path = file.getPath();
    FileSystem fs = path.getFileSystem(job);
    long length = file.getLen();
    BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
    // isSplitable 是判斷該文件是否可以分片
    // 一般情況下都是可以的,但是如果是 stream compressed 的方式,那麼是不可以的
    if ((length != 0) && isSplitable(fs, path)) { 
      long blockSize = file.getBlockSize();

      // 計算每一個分片大小的實際函數
      // 得到真實的分片大小
      long splitSize = computeSplitSize(goalSize, minSize, blockSize);

      long bytesRemaining = length;

      // 允許最後一個分片在 SPLIT_SLOP(默認 1.1) 比例之下
      while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
        String[] splitHosts = getSplitHosts(blkLocations, 
            length-bytesRemaining, splitSize, clusterMap);
        // 加入分片
        splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
            splitHosts));
        bytesRemaining -= splitSize;
      }

      // 加入最後一個分片
      // 這個比例最大不超過期望分片的 1.1
      if (bytesRemaining != 0) {
        splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
                   blkLocations[blkLocations.length-1].getHosts()));
      }
    } else if (length != 0) {
      String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
      splits.add(new FileSplit(path, 0, length, splitHosts));
    } else { 
      //Create empty hosts array for zero length files
      splits.add(new FileSplit(path, 0, length, new String[0]));
    }
  }
  LOG.debug("Total # of splits: " + splits.size());
  return splits.toArray(new FileSplit[splits.size()]);
}

protected long computeSplitSize(long goalSize, long minSize,
                                     long blockSize) {
  // 計算分片大小,很明顯
  // 這裡設定了最大最小值,每一個分片大小在 minSize 和 blockSize 之間
  return Math.max(minSize, Math.min(goalSize, blockSize));
}

這樣看,要想設置超過大於 block size 的也是可以的,只要將 minSize 設置很大即可 以上分片演算法只是單純計算需要多少個 map ,根據設定的 mapred.map.tasks 計算出這個任務需要多少個 map 最終的 map 數目,可能和 mapred.map.tasks 不同

但是這樣仍然會有一個問題,就是這個只是按照輸入文件的大小做邏輯的切分,但是如果文件中含有邊界(比如 Text 文件就是以行作為邊界),那麼實際的劃分就不一定是這樣的。

這個是由 RecordReader 實現的,它將某一個 split 解析成一個個 key 和 value 對

我們看看實際的 TextInputFormat 類,它其實生成了 LineRecordReader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public LineRecordReader(Configuration job, FileSplit split,
    byte[] recordDelimiter) throws IOException {
  this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                                  Integer.MAX_VALUE);

  // 得到文件開始和結束的位置
  start = split.getStart();
  end = start + split.getLength();
  final Path file = split.getPath();
  compressionCodecs = new CompressionCodecFactory(job);
  final CompressionCodec codec = compressionCodecs.getCodec(file);

  // open the file and seek to the start of the split
  FileSystem fs = file.getFileSystem(job);
  FSDataInputStream fileIn = fs.open(split.getPath());

  // skipFirstLine 表示跳過第一行
  boolean skipFirstLine = false;
  if (codec != null) {
    in = new LineReader(codec.createInputStream(fileIn), job,
          recordDelimiter);
    end = Long.MAX_VALUE;
  } else {
    if (start != 0) {
      // 如果開始的位置不是整個文件的開始
      // 那麼,有可能是在行的中間, LineRecordReader 的處理方式是跳過這行,從下一行處理起
      skipFirstLine = true;
      --start;
      fileIn.seek(start);
    }
    in = new LineReader(fileIn, job, recordDelimiter);
  }
  if (skipFirstLine) {  // skip first line and re-establish "start".
      // 跳過第一行
    start += in.readLine(new Text(), 0,
                         (int)Math.min((long)Integer.MAX_VALUE, end - start));
  }
  this.pos = start;
}

public synchronized boolean next(LongWritable key, Text value)
  throws IOException {

  while (pos < end) {
    key.set(pos);

    // 在這裡, 會處理一個完整行
    // 但是有可能最後一行的另外一個部分在另一個 split 裡面
    // 但是 FSDataInputStream fileIn 作為一個抽象,這樣的操作使得對 Reader 透明了
    int newSize = in.readLine(value, maxLineLength,
                              Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
                                       maxLineLength));
    if (newSize == 0) {
      return false;
    }
    pos += newSize;
    if (newSize < maxLineLength) {
      return true;
    }

    // line too long. try again
    LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
  }

  return false;
}

以上代碼我們可以知道,TextInputFormat 生成的 LineRecordReader 會根據行邊界來切分,避免了 split 邏輯分片不考慮邊界的情況。

其實 SequenceFileInputFormat 輸入也同樣有邊界問題,這是根據創建時候的序列點來實現的。 具體代碼可以看 SequenceFileRecordReader 裡面的實現

抱歉!評論已關閉.