現在的位置: 首頁 > 黃專家專欄 > 正文

作業的提交和監控(一)

2014年11月03日 黃專家專欄 ⁄ 共 4672字 ⁄ 字型大小 評論關閉

整體流程

簡單的代碼就可以運行一個作業

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Create a new JobConf
JobConf job = new JobConf(new Configuration(), MyJob.class);

// Specify various job-specific parameters     
job.setJobName("myjob");

job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));

job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);

// Submit the job, then poll for progress until the job is complete
JobClient.runJob(job);

runJob 做的最主要兩個過程是提交監控。 提交在函數 submitJobInternal 中執行,監控在函數 monitorAndPrintJob 中執行。我們看看 submitJobInternal 裡面的代碼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
return ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
  public RunningJob run() throws FileNotFoundException, 
  ClassNotFoundException,
  InterruptedException,
  IOException {
    JobConf jobCopy = job;

    // 指定作業文件的上傳路徑
    // 默認在 mapreduce.jobtracker.staging.root.dir 中指定
    // 比如 mapreduce.jobtracker.staging.root.dir = /user
    // 後面加上用戶名作為文件夾名字
    // 一般是這樣 hdfs://host:port/user/${user}/.staging
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,
        jobCopy);

    // 得到一個 jobID
    // jobID 是根據日期和時間生成的
    // 假設是 job_201410312221_0001
    JobID jobId = jobSubmitClient.getNewJobId();

    // 得到目錄的路徑
    // 可能是這樣 hdfs://host:port/user/${user}/.staging/job_201410312221_0001
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());

    // 設置配置文件,將 mapreduce.job.dir 設置為作業文件上傳目錄
    jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
    JobStatus status = null;
    try {
      // 得到 token
      populateTokenCache(jobCopy, jobCopy.getCredentials());

      // copy 文件到上傳目錄中
      // 支持以下這幾種格式
      // -files -libjars -archives
      // 在如下的三種目錄里:
      // hdfs://host:port/user/${user}/.staging/job_201410312221_0001/files
      // hdfs://host:port/user/${user}/.staging/job_201410312221_0001/archives
      // hdfs://host:port/user/${user}/.staging/job_201410312221_0001/libjars
      copyAndConfigureFiles(jobCopy, submitJobDir);

      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),
                                          new Path [] {submitJobDir},
                                          jobCopy);

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
          // 得到 reduces 的個數
      int reduces = jobCopy.getNumReduceTasks();

      // 設置本機 ip 地址
      InetAddress ip = InetAddress.getLocalHost();
      if (ip != null) {
        job.setJobSubmitHostAddress(ip.getHostAddress());
        job.setJobSubmitHostName(ip.getHostName());
      }
      JobContext context = new JobContext(jobCopy, jobId);

      jobCopy = (JobConf)context.getConfiguration();

      // Check the output specification
      // 檢查指定的輸出目錄
      // 如果輸出文件夾存在, 那麼會拋出異常
      // checkOutputSpecs 在 FileOutputFormat.java 中
      if (reduces == 0 ? jobCopy.getUseNewMapper() : 
        jobCopy.getUseNewReducer()) {
          // 如果 readuce 數目是 0,但是 mapper 的數目不為 0
          // 得到指定的輸入類型
          // 默認是 TextOutputFormat
        org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
          ReflectionUtils.newInstance(context.getOutputFormatClass(),
              jobCopy);
        output.checkOutputSpecs(context);
      } else {
        jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
      }

      // Create the splits for the job
      // 計算 job 的輸入文件的分片
      FileSystem fs = submitJobDir.getFileSystem(jobCopy);
      LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
      // 設置 map
      int maps = writeSplits(context, submitJobDir);
      jobCopy.setNumMapTasks(maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      // 得到 queue 的名字
      String queue = jobCopy.getQueueName();
      // 然後根據這個 queue 名字獲得訪問控制列表
      AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);
      jobCopy.set(QueueManager.toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());

      // Write job file to JobTracker's fs  
      // 將重新配置過的 JobConf 寫入到 submitJobDir/job.xml 文件
      FSDataOutputStream out = 
        FileSystem.create(fs, submitJobFile,
            new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(jobCopy);

      try {
        jobCopy.writeXml(out);
      } finally {
        out.close();
      }
      //
      // Now, actually submit the job (using the submit name)
      //
      // 提交 job
      // 如果不是 local 模式,那麼這個 submitJob 是一個 rpc 調用
      // 調用的是遠程機子上的 JobTracker.submitJob
      // 如果是 local 模式, 就是調用 LocalJobRunner.submitJob
      printTokens(jobId, jobCopy.getCredentials());
      status = jobSubmitClient.submitJob(
          jobId, submitJobDir.toString(), jobCopy.getCredentials());
      if (status != null) {
        return new NetworkedJob(status);
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (fs != null && submitJobDir != null)
          fs.delete(submitJobDir, true);
      }
    }
  }
}

主要的流程如下:

  1. 找到 hdfs 上的 Staging 路徑
  2. 向 JobTracker 申請一個 job id
  3. 根據 job id 在 hdfs 上生成上傳文件的目錄
  4. copy 指定文件到上傳的目錄中去
  5. 得到 reduce 個數,設置本機 ip
  6. 檢查輸出目錄是否存在, 如果不存在,則拋出異常。這其實有一點不太合理的地方,因為是先上傳作業文件,再判斷是否輸入目錄存在。如果正常運行完作業,上傳的文件是能被清理的,但是,如果輸出文件異常,那麼這些上傳的文件就得不到清理?
  7. 計算 job 的輸入文件的分片, 根據這個設置 map 的數量
  8. 得到 queue name 和相關授權
  9. 將重新配置過的 JobConf 寫入到 submitJobDir/job.xml 文件
  10. 提交 job

抱歉!評論已關閉.