现在的位置: 首页 > 综合 > 正文

Hadoop MapReduce之任务启动(二)

2013年10月08日 ⁄ 综合 ⁄ 共 11074字 ⁄ 字号 评论关闭

  TaskLauncher线程监控到有新任务到达的时候,便会通过startNewTask启动一个新的线程来执行任务,该线程独立运行不会影响TT的其他线程,通过源码中的注释也能看到这个意思,任务启动之初会先做本地化操作,然后通过launchTaskForJob启动TaskLauncher,在child JVM启动之前的过程比较复杂,会经过一系列的线程创建,过程如下图:

/**
 * Start a new task. All exceptions are handled locally, so that we don't
 * mess up the task tracker.
 * 
 * @throws InterruptedException
 */
void startNewTask(final TaskInProgress tip) throws InterruptedException {
	//构建一个子类
	Thread launchThread = new Thread(new Runnable() {
		@Override
		public void run() {
			try {
				//1、初始化本地作业
				RunningJob rjob = localizeJob(tip);
				//2、设置job文件
				tip.getTask().setJobFile(
						rjob.getLocalizedJobConf().toString());
				// Localization is done. Neither rjob.jobConf nor rjob.ugi
				// can be null
				//3、启动任务
				launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob);
			} catch (Throwable e) {
				String msg = ("Error initializing "
						+ tip.getTask().getTaskID() + ":\n" + StringUtils
						.stringifyException(e));
				LOG.warn(msg);
				tip.reportDiagnosticInfo(msg);
				try {
					tip.kill(true);
					tip.cleanup(true);
				} catch (IOException ie2) {
					LOG.info("Error cleaning up "
							+ tip.getTask().getTaskID(), ie2);
				} catch (InterruptedException ie2) {
					LOG.info("Error cleaning up "
							+ tip.getTask().getTaskID(), ie2);
				}
				if (e instanceof Error) {
					LOG.error("TaskLauncher error "
							+ StringUtils.stringifyException(e));
				}
			}
		}
	});
	launchThread.start();//启动线程
}

1、任务本地初始化如下:

RunningJob localizeJob(TaskInProgress tip) throws IOException,
		InterruptedException {
	Task t = tip.getTask();
	JobID jobId = t.getJobID();
	//如果是新作业,则构建RunningJob放入runningJobs集合
	RunningJob rjob = addTaskToJob(jobId, tip);
	InetSocketAddress ttAddr = getTaskTrackerReportAddress();
	try {
		synchronized (rjob) {
			if (!rjob.localized) {
				while (rjob.localizing) {
					rjob.wait();
				}
				if (!rjob.localized) {
					// this thread is localizing the job
					rjob.localizing = true;
				}
			}
		}
		//下面开始初始化
		if (!rjob.localized) {
			//拷贝job.xml至本地:tasklog/ttprivate/taskTracker/hadoop/jobcache/$jobid/job.xml
			//设置jvm复用,涉及参数mapred.job.reuse.jvm.num.tasks,当然在多CPU架构环境中还要注意设置:mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum
			//初始化一系列本地目录,并且下载执行该任务所需的jar文件、配置文件、凭证文件等
			Path localJobConfPath = initializeJob(t, rjob, ttAddr);
			JobConf localJobConf = new JobConf(localJobConfPath);
			// to be doubly sure, overwrite the user in the config with the
			// one the TT
			// thinks it is
			localJobConf.setUser(t.getUser());
			// also reset the #tasks per jvm
			resetNumTasksPerJvm(localJobConf);
			// set the base jobconf path in rjob; all tasks will use
			// this as the base path when they run
			synchronized (rjob) {
				rjob.localizedJobConf = localJobConfPath;
				rjob.jobConf = localJobConf;
				rjob.keepJobFiles = ((localJobConf
						.getKeepTaskFilesPattern() != null) || localJobConf
						.getKeepFailedTaskFiles());


				rjob.localized = true;
			}
		}
	} finally {
		synchronized (rjob) {
			if (rjob.localizing) {
				rjob.localizing = false;
				rjob.notifyAll();
			}
		}
	}
	synchronized (runningJobs) {
		runningJobs.notify(); // notify the fetcher thread
	}
	return rjob;
}

任务本地化的重头戏包含在JobLocalizer.java中,具体内容如下:

public void initializeJob(String user, String jobid, 
                          Path credentials, Path jobConf, 
                          TaskUmbilicalProtocol taskTracker,
                          InetSocketAddress ttAddr
                          ) throws IOException, InterruptedException {
  final LocalDirAllocator lDirAlloc = allocator;
  FileSystem localFs = FileSystem.getLocal(getConf());
  JobLocalizer localizer = new JobLocalizer((JobConf)getConf(), user, jobid);
  //创建本地目录:$mapred.local.dir/taskTracker/$user
  localizer.createLocalDirs();
  //创建本地目录:$mapred.local.dir/taskTracker/$user/jobcache $mapred.local.dir/taskTracker/$user/distcache
  localizer.createUserDirs();
  //创建目录:$mapred.local.dir/taskTracker/$user/jobcache/$jobid
  localizer.createJobDirs();


  JobConf jConf = new JobConf(jobConf);
  //创建工作区目录;$mapred.local.dir/taskTracker/$user/jobcache/$jobid/work
  localizer.createWorkDir(jConf);
  //拷贝凭证文件
  Path localJobTokenFile = lDirAlloc.getLocalPathForWrite(
      TaskTracker.getLocalJobTokenFile(user, jobid), getConf());
  FileUtil.copy(
      localFs, credentials, localFs, localJobTokenFile, false, getConf());




  //创建任务日志目录:$hadoop.log.dir/$jobid
  localizer.initializeJobLogDir();


  // Download the job.jar for this job from the system FS?
  // setup the distributed cache
  // write job acls
  // write localized config
  //上面注释里都写的很清楚了,这里就不多说了
  localizer.localizeJobFiles(JobID.forName(jobid), jConf, localJobTokenFile, 
                             taskTracker);
}

2、设置job文件,这个过程比较简单,这里不再赘述

tip.getTask().setJobFile(rjob.getLocalizedJobConf().toString());

3、启动任务

launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob);

launchTaskForJob函数中会调用下面函数启动一个新的线程来执行任务,我们看下这个启动过程:

public synchronized void launchTask(RunningJob rjob) throws IOException {
		if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED
				|| this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN
				|| this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
			//任务本地化,会设置该任务相关的一系列参数
			localizeTask(task);
			if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
				this.taskStatus.setRunState(TaskStatus.State.RUNNING);
			}
			//创建新的线程,TaskRunner来执行任务
			setTaskRunner(task.createRunner(TaskTracker.this, this, rjob));
			//启动新创建的线程
			this.runner.start();
			long now = System.currentTimeMillis();
			this.taskStatus.setStartTime(now);
			this.lastProgressReport = now;
		} else {
			LOG.info("Not launching task: " + task.getTaskID()
					+ " since it's state is "
					+ this.taskStatus.getRunState());
		}
	}
	

下面我们重点关注TaskRunner的逻辑:

public final void run() {
  String errorInfo = "Child Error";
  try {
    
    //before preparing the job localize 
    //all the archives
    TaskAttemptID taskid = t.getTaskID();
    final LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
    //simply get the location of the workDir and pass it to the child. The
    //child will do the actual dir creation
    //设置子进程的工作目录,当进程执行时会自动创建
    final File workDir =
    new File(new Path(localdirs[rand.nextInt(localdirs.length)], 
        TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(), 
        taskid.toString(),
        t.isTaskCleanupTask())).toString());
    //获得执行用户
    String user = tip.getUGI().getUserName();
    
    // Set up the child task's configuration. After this call, no localization
    // of files should happen in the TaskTracker's process space. Any changes to
    // the conf object after this will NOT be reflected to the child.
    // setupChildTaskConfiguration(lDirAlloc);


    if (!prepare()) {
      return;
    }
    
    // Accumulates class paths for child.
    List<String> classPaths = getClassPaths(conf, workDir,
                                            taskDistributedCacheManager);


    long logSize = TaskLog.getTaskLogLength(conf);
    
    //  构建JVM启动参数
    Vector<String> vargs = getVMArgs(taskid, workDir, classPaths, logSize);
    
    tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);


    // set memory limit using ulimit if feasible and necessary ...
    String setup = getVMSetupCmd();
    // 重定向输出日志
    File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());
    File stdout = logFiles[0];
    File stderr = logFiles[1];
    tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,
               stderr);
    //设置JVM运行的环境变量
    Map<String, String> env = new HashMap<String, String>();
    errorInfo = getVMEnvironment(errorInfo, user, workDir, conf, env, taskid,
                                 logSize);
    
    // 一堆export,用户输出环境变量
    List <String> setupCmds = new ArrayList<String>();
    for(Entry<String, String> entry : env.entrySet()) {
      StringBuffer sb = new StringBuffer();
      sb.append("export ");
      sb.append(entry.getKey());
      sb.append("=\"");
      sb.append(entry.getValue());
      sb.append("\"");
      setupCmds.add(sb.toString());
    }
    setupCmds.add(setup);
    //上面设置好环境变量、JVM的启动参数后,这里就开始启动JVM了
    launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);
    tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
    //判断返回码
    if (exitCodeSet) {
      if (!killed && exitCode != 0) {
        if (exitCode == 65) {
          tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());
        }
        throw new IOException("Task process exit with nonzero status of " +
            exitCode + ".");
      }
    }
  } catch (FSError e) {
    LOG.fatal("FSError", e);
    try {
      tracker.fsErrorInternal(t.getTaskID(), e.getMessage());
    } catch (IOException ie) {
      LOG.fatal(t.getTaskID()+" reporting FSError", ie);
    }
  } catch (Throwable throwable) {
    LOG.warn(t.getTaskID() + " : " + errorInfo, throwable);
    Throwable causeThrowable = new Throwable(errorInfo, throwable);
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    causeThrowable.printStackTrace(new PrintStream(baos));
    try {
      tracker.reportDiagnosticInfoInternal(t.getTaskID(), baos.toString());
    } catch (IOException e) {
      LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);
    }
  } finally {
    
    // It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with
    // *false* since the task has either
    // a) SUCCEEDED - which means commit has been done
    // b) FAILED - which means we do not need to commit
    tip.reportTaskFinished(false);
  }
}

上面函数中启动JVM的代码如下:

void launchJvmAndWait(List <String> setup, Vector<String> vargs, File stdout,
    File stderr, long logSize, File workDir)
    throws InterruptedException, IOException {
  //这里开始启动,然后
  jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup, vargs, stdout,
      stderr, logSize, workDir, conf));
  synchronized (lock) {
    while (!done) {
      lock.wait();
    }
  }
}

下面我们看直接启动JVM的函数,其实中间还有很多判断过程,由于篇幅太长,这里就省略了

  private void spawnNewJvm(JobID jobId, JvmEnv env,  
      TaskRunner t) {
    JvmRunner jvmRunner = new JvmRunner(env, jobId, t.getTask());
    jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
    //spawn the JVM in a new thread. Note that there will be very little
    //extra overhead of launching the new thread for a new JVM since
    //most of the cost is involved in launching the process. Moreover,
    //since we are going to be using the JVM for running many tasks,
    //the thread launch cost becomes trivial when amortized over all
    //tasks. Doing it this way also keeps code simple.
    jvmRunner.setDaemon(true);
    jvmRunner.setName("JVM Runner " + jvmRunner.jvmId + " spawned.");
    setRunningTaskForJvm(jvmRunner.jvmId, t);
    LOG.info(jvmRunner.getName());
    jvmRunner.start();//开始启动
  }
  

在jvmRunner线程中会最终调用launchTask真正启动一个JVM进程,这个过程相对简单,过程如下:

public int launchTask(String user, 
                                String jobId,
                                String attemptId,
                                List<String> setup,
                                List<String> jvmArguments,
                                File currentWorkDirectory,
                                String stdout,
                                String stderr) throws IOException {
  ShellCommandExecutor shExec = null;
  try {    	            
    FileSystem localFs = FileSystem.getLocal(getConf());
    
    //创建任务工作目录:$hadoop.log.dir\taskTracker\hadoop\jobcache\job_201311071504_0001\attempt_201311071504_0001_m_000002_0\work
    new Localizer(localFs, 
        getConf().getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY)).
        initializeAttemptDirs(user, jobId, attemptId);
    
    // create the working-directory of the task 
    if (!currentWorkDirectory.mkdir()) {
      throw new IOException("Mkdirs failed to create " 
                  + currentWorkDirectory.toString());
    }
    
    //创建日志目录:$hadoop.log.dir\userlogs\job_201311071504_0001\attempt_201311071504_0001_m_000002_0
    String logLocation = TaskLog.getAttemptDir(jobId, attemptId).toString();
		//localFs.delete(new Path(logLocation));
		//localFs.rename(new Path(logLocation), new Path(logLocation+".old"));
    if (!localFs.mkdirs(new Path(logLocation))) {
      throw new IOException("Mkdirs failed to create " 
                 + logLocation);
    }
    //read the configuration for the job
    FileSystem rawFs = FileSystem.getLocal(getConf()).getRaw();
    long logSize = 0; //TODO MAPREDUCE-1100
    // get the JVM command line.
    String cmdLine = 
      TaskLog.buildCommandLine(setup, jvmArguments,
          new File(stdout), new File(stderr), logSize, true);


    // write the command to a file in the
    // task specific cache directory
    // TODO copy to user dir
    //将JVM启动脚本写入文件,日志中会看到如下信息:
    //13/11/07 15:22:05 INFO mapred.TaskController: Writing commands to C:/hadoop/tasklog/ttprivate/taskTracker/hadoop/jobcache/job_201311071504_0001/attempt_201311071504_0001_m_000002_0/taskjvm.sh
    Path p = new Path(allocator.getLocalPathForWrite(
        TaskTracker.getPrivateDirTaskScriptLocation(user, jobId, attemptId),
        getConf()), COMMAND_FILE);


    String commandFile = writeCommand(cmdLine, rawFs, p);
    rawFs.setPermission(p, TaskController.TASK_LAUNCH_SCRIPT_PERMISSION);
    //构建命令执行器,用于调用上面写入的脚本
    shExec = new ShellCommandExecutor(new String[]{
        "bash", "-c", commandFile.substring(1)},
        currentWorkDirectory);
    shExec.execute();//开始执行脚本taskjvm.sh,通过ProcessBuilder来执行脚本,会开启错误监控线程
  } catch (Exception e) {
    if (shExec == null) {
      return -1;
    }
    int exitCode = shExec.getExitCode();
    LOG.warn("Exit code from task is : " + exitCode);
    LOG.info("Output from DefaultTaskController's launchTask follows:");
    logOutput(shExec.getOutput());
    return exitCode;
  }
  return 0;
}

   

抱歉!评论已关闭.