现在的位置: 首页 > 黄专家专栏 > 正文

作业的提交和监控(一)

2014年11月03日 黄专家专栏 ⁄ 共 4672字 ⁄ 字号 评论关闭

整体流程

简单的代码就可以运行一个作业

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Create a new JobConf
JobConf job = new JobConf(new Configuration(), MyJob.class);

// Specify various job-specific parameters     
job.setJobName("myjob");

job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));

job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);

// Submit the job, then poll for progress until the job is complete
JobClient.runJob(job);

runJob 做的最主要两个过程是提交监控。 提交在函数 submitJobInternal 中执行,监控在函数 monitorAndPrintJob 中执行。我们看看 submitJobInternal 里面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
return ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
  public RunningJob run() throws FileNotFoundException, 
  ClassNotFoundException,
  InterruptedException,
  IOException {
    JobConf jobCopy = job;

    // 指定作业文件的上传路径
    // 默认在 mapreduce.jobtracker.staging.root.dir 中指定
    // 比如 mapreduce.jobtracker.staging.root.dir = /user
    // 后面加上用户名作为文件夹名字
    // 一般是这样 hdfs://host:port/user/${user}/.staging
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,
        jobCopy);

    // 得到一个 jobID
    // jobID 是根据日期和时间生成的
    // 假设是 job_201410312221_0001
    JobID jobId = jobSubmitClient.getNewJobId();

    // 得到目录的路径
    // 可能是这样 hdfs://host:port/user/${user}/.staging/job_201410312221_0001
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());

    // 设置配置文件,将 mapreduce.job.dir 设置为作业文件上传目录
    jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
    JobStatus status = null;
    try {
      // 得到 token
      populateTokenCache(jobCopy, jobCopy.getCredentials());

      // copy 文件到上传目录中
      // 支持以下这几种格式
      // -files -libjars -archives
      // 在如下的三种目录里:
      // hdfs://host:port/user/${user}/.staging/job_201410312221_0001/files
      // hdfs://host:port/user/${user}/.staging/job_201410312221_0001/archives
      // hdfs://host:port/user/${user}/.staging/job_201410312221_0001/libjars
      copyAndConfigureFiles(jobCopy, submitJobDir);

      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),
                                          new Path [] {submitJobDir},
                                          jobCopy);

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
          // 得到 reduces 的个数
      int reduces = jobCopy.getNumReduceTasks();

      // 设置本机 ip 地址
      InetAddress ip = InetAddress.getLocalHost();
      if (ip != null) {
        job.setJobSubmitHostAddress(ip.getHostAddress());
        job.setJobSubmitHostName(ip.getHostName());
      }
      JobContext context = new JobContext(jobCopy, jobId);

      jobCopy = (JobConf)context.getConfiguration();

      // Check the output specification
      // 检查指定的输出目录
      // 如果输出文件夹存在, 那么会抛出异常
      // checkOutputSpecs 在 FileOutputFormat.java 中
      if (reduces == 0 ? jobCopy.getUseNewMapper() : 
        jobCopy.getUseNewReducer()) {
          // 如果 readuce 数目是 0,但是 mapper 的数目不为 0
          // 得到指定的输入类型
          // 默认是 TextOutputFormat
        org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
          ReflectionUtils.newInstance(context.getOutputFormatClass(),
              jobCopy);
        output.checkOutputSpecs(context);
      } else {
        jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
      }

      // Create the splits for the job
      // 计算 job 的输入文件的分片
      FileSystem fs = submitJobDir.getFileSystem(jobCopy);
      LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
      // 设置 map
      int maps = writeSplits(context, submitJobDir);
      jobCopy.setNumMapTasks(maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      // 得到 queue 的名字
      String queue = jobCopy.getQueueName();
      // 然后根据这个 queue 名字获得访问控制列表
      AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);
      jobCopy.set(QueueManager.toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());

      // Write job file to JobTracker's fs  
      // 将重新配置过的 JobConf 写入到 submitJobDir/job.xml 文件
      FSDataOutputStream out = 
        FileSystem.create(fs, submitJobFile,
            new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(jobCopy);

      try {
        jobCopy.writeXml(out);
      } finally {
        out.close();
      }
      //
      // Now, actually submit the job (using the submit name)
      //
      // 提交 job
      // 如果不是 local 模式,那么这个 submitJob 是一个 rpc 调用
      // 调用的是远程机子上的 JobTracker.submitJob
      // 如果是 local 模式, 就是调用 LocalJobRunner.submitJob
      printTokens(jobId, jobCopy.getCredentials());
      status = jobSubmitClient.submitJob(
          jobId, submitJobDir.toString(), jobCopy.getCredentials());
      if (status != null) {
        return new NetworkedJob(status);
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (fs != null && submitJobDir != null)
          fs.delete(submitJobDir, true);
      }
    }
  }
}

主要的流程如下:

  1. 找到 hdfs 上的 Staging 路径
  2. 向 JobTracker 申请一个 job id
  3. 根据 job id 在 hdfs 上生成上传文件的目录
  4. copy 指定文件到上传的目录中去
  5. 得到 reduce 个数,设置本机 ip
  6. 检查输出目录是否存在, 如果不存在,则抛出异常。这其实有一点不太合理的地方,因为是先上传作业文件,再判断是否输入目录存在。如果正常运行完作业,上传的文件是能被清理的,但是,如果输出文件异常,那么这些上传的文件就得不到清理?
  7. 计算 job 的输入文件的分片, 根据这个设置 map 的数量
  8. 得到 queue name 和相关授权
  9. 将重新配置过的 JobConf 写入到 submitJobDir/job.xml 文件
  10. 提交 job

抱歉!评论已关闭.