TaskLauncher线程监控到有新任务到达的时候,便会通过startNewTask启动一个新的线程来执行任务,该线程独立运行不会影响TT的其他线程,通过源码中的注释也能看到这个意思,任务启动之初会先做本地化操作,然后通过launchTaskForJob启动TaskLauncher,在child JVM启动之前的过程比较复杂,会经过一系列的线程创建,过程如下图:
/** * Start a new task. All exceptions are handled locally, so that we don't * mess up the task tracker. * * @throws InterruptedException */ void startNewTask(final TaskInProgress tip) throws InterruptedException { //构建一个子类 Thread launchThread = new Thread(new Runnable() { @Override public void run() { try { //1、初始化本地作业 RunningJob rjob = localizeJob(tip); //2、设置job文件 tip.getTask().setJobFile( rjob.getLocalizedJobConf().toString()); // Localization is done. Neither rjob.jobConf nor rjob.ugi // can be null //3、启动任务 launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob); } catch (Throwable e) { String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils .stringifyException(e)); LOG.warn(msg); tip.reportDiagnosticInfo(msg); try { tip.kill(true); tip.cleanup(true); } catch (IOException ie2) { LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2); } catch (InterruptedException ie2) { LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2); } if (e instanceof Error) { LOG.error("TaskLauncher error " + StringUtils.stringifyException(e)); } } } }); launchThread.start();//启动线程 }
1、任务本地初始化如下:
RunningJob localizeJob(TaskInProgress tip) throws IOException, InterruptedException { Task t = tip.getTask(); JobID jobId = t.getJobID(); //如果是新作业,则构建RunningJob放入runningJobs集合 RunningJob rjob = addTaskToJob(jobId, tip); InetSocketAddress ttAddr = getTaskTrackerReportAddress(); try { synchronized (rjob) { if (!rjob.localized) { while (rjob.localizing) { rjob.wait(); } if (!rjob.localized) { // this thread is localizing the job rjob.localizing = true; } } } //下面开始初始化 if (!rjob.localized) { //拷贝job.xml至本地:tasklog/ttprivate/taskTracker/hadoop/jobcache/$jobid/job.xml //设置jvm复用,涉及参数mapred.job.reuse.jvm.num.tasks,当然在多CPU架构环境中还要注意设置:mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum //初始化一系列本地目录,并且下载执行该任务所需的jar文件、配置文件、凭证文件等 Path localJobConfPath = initializeJob(t, rjob, ttAddr); JobConf localJobConf = new JobConf(localJobConfPath); // to be doubly sure, overwrite the user in the config with the // one the TT // thinks it is localJobConf.setUser(t.getUser()); // also reset the #tasks per jvm resetNumTasksPerJvm(localJobConf); // set the base jobconf path in rjob; all tasks will use // this as the base path when they run synchronized (rjob) { rjob.localizedJobConf = localJobConfPath; rjob.jobConf = localJobConf; rjob.keepJobFiles = ((localJobConf .getKeepTaskFilesPattern() != null) || localJobConf .getKeepFailedTaskFiles()); rjob.localized = true; } } } finally { synchronized (rjob) { if (rjob.localizing) { rjob.localizing = false; rjob.notifyAll(); } } } synchronized (runningJobs) { runningJobs.notify(); // notify the fetcher thread } return rjob; }
任务本地化的重头戏包含在JobLocalizer.java中,具体内容如下:
public void initializeJob(String user, String jobid, Path credentials, Path jobConf, TaskUmbilicalProtocol taskTracker, InetSocketAddress ttAddr ) throws IOException, InterruptedException { final LocalDirAllocator lDirAlloc = allocator; FileSystem localFs = FileSystem.getLocal(getConf()); JobLocalizer localizer = new JobLocalizer((JobConf)getConf(), user, jobid); //创建本地目录:$mapred.local.dir/taskTracker/$user localizer.createLocalDirs(); //创建本地目录:$mapred.local.dir/taskTracker/$user/jobcache $mapred.local.dir/taskTracker/$user/distcache localizer.createUserDirs(); //创建目录:$mapred.local.dir/taskTracker/$user/jobcache/$jobid localizer.createJobDirs(); JobConf jConf = new JobConf(jobConf); //创建工作区目录;$mapred.local.dir/taskTracker/$user/jobcache/$jobid/work localizer.createWorkDir(jConf); //拷贝凭证文件 Path localJobTokenFile = lDirAlloc.getLocalPathForWrite( TaskTracker.getLocalJobTokenFile(user, jobid), getConf()); FileUtil.copy( localFs, credentials, localFs, localJobTokenFile, false, getConf()); //创建任务日志目录:$hadoop.log.dir/$jobid localizer.initializeJobLogDir(); // Download the job.jar for this job from the system FS? // setup the distributed cache // write job acls // write localized config //上面注释里都写的很清楚了,这里就不多说了 localizer.localizeJobFiles(JobID.forName(jobid), jConf, localJobTokenFile, taskTracker); }
2、设置job文件,这个过程比较简单,这里不再赘述
tip.getTask().setJobFile(rjob.getLocalizedJobConf().toString());
3、启动任务
launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob);
launchTaskForJob函数中会调用下面函数启动一个新的线程来执行任务,我们看下这个启动过程:
public synchronized void launchTask(RunningJob rjob) throws IOException { if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED || this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN || this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) { //任务本地化,会设置该任务相关的一系列参数 localizeTask(task); if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) { this.taskStatus.setRunState(TaskStatus.State.RUNNING); } //创建新的线程,TaskRunner来执行任务 setTaskRunner(task.createRunner(TaskTracker.this, this, rjob)); //启动新创建的线程 this.runner.start(); long now = System.currentTimeMillis(); this.taskStatus.setStartTime(now); this.lastProgressReport = now; } else { LOG.info("Not launching task: " + task.getTaskID() + " since it's state is " + this.taskStatus.getRunState()); } }
下面我们重点关注TaskRunner的逻辑:
public final void run() { String errorInfo = "Child Error"; try { //before preparing the job localize //all the archives TaskAttemptID taskid = t.getTaskID(); final LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir"); //simply get the location of the workDir and pass it to the child. The //child will do the actual dir creation //设置子进程的工作目录,当进程执行时会自动创建 final File workDir = new File(new Path(localdirs[rand.nextInt(localdirs.length)], TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(), taskid.toString(), t.isTaskCleanupTask())).toString()); //获得执行用户 String user = tip.getUGI().getUserName(); // Set up the child task's configuration. After this call, no localization // of files should happen in the TaskTracker's process space. Any changes to // the conf object after this will NOT be reflected to the child. // setupChildTaskConfiguration(lDirAlloc); if (!prepare()) { return; } // Accumulates class paths for child. List<String> classPaths = getClassPaths(conf, workDir, taskDistributedCacheManager); long logSize = TaskLog.getTaskLogLength(conf); // 构建JVM启动参数 Vector<String> vargs = getVMArgs(taskid, workDir, classPaths, logSize); tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf); // set memory limit using ulimit if feasible and necessary ... String setup = getVMSetupCmd(); // 重定向输出日志 File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask()); File stdout = logFiles[0]; File stderr = logFiles[1]; tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr); //设置JVM运行的环境变量 Map<String, String> env = new HashMap<String, String>(); errorInfo = getVMEnvironment(errorInfo, user, workDir, conf, env, taskid, logSize); // 一堆export,用户输出环境变量 List <String> setupCmds = new ArrayList<String>(); for(Entry<String, String> entry : env.entrySet()) { StringBuffer sb = new StringBuffer(); sb.append("export "); sb.append(entry.getKey()); sb.append("=\""); sb.append(entry.getValue()); sb.append("\""); setupCmds.add(sb.toString()); } setupCmds.add(setup); //上面设置好环境变量、JVM的启动参数后,这里就开始启动JVM了 launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir); tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID()); //判断返回码 if (exitCodeSet) { if (!killed && exitCode != 0) { if (exitCode == 65) { tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID()); } throw new IOException("Task process exit with nonzero status of " + exitCode + "."); } } } catch (FSError e) { LOG.fatal("FSError", e); try { tracker.fsErrorInternal(t.getTaskID(), e.getMessage()); } catch (IOException ie) { LOG.fatal(t.getTaskID()+" reporting FSError", ie); } } catch (Throwable throwable) { LOG.warn(t.getTaskID() + " : " + errorInfo, throwable); Throwable causeThrowable = new Throwable(errorInfo, throwable); ByteArrayOutputStream baos = new ByteArrayOutputStream(); causeThrowable.printStackTrace(new PrintStream(baos)); try { tracker.reportDiagnosticInfoInternal(t.getTaskID(), baos.toString()); } catch (IOException e) { LOG.warn(t.getTaskID()+" Reporting Diagnostics", e); } } finally { // It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with // *false* since the task has either // a) SUCCEEDED - which means commit has been done // b) FAILED - which means we do not need to commit tip.reportTaskFinished(false); } }
上面函数中启动JVM的代码如下:
void launchJvmAndWait(List <String> setup, Vector<String> vargs, File stdout, File stderr, long logSize, File workDir) throws InterruptedException, IOException { //这里开始启动,然后 jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup, vargs, stdout, stderr, logSize, workDir, conf)); synchronized (lock) { while (!done) { lock.wait(); } } }
下面我们看直接启动JVM的函数,其实中间还有很多判断过程,由于篇幅太长,这里就省略了
private void spawnNewJvm(JobID jobId, JvmEnv env,
TaskRunner t) {
JvmRunner jvmRunner = new JvmRunner(env, jobId, t.getTask());
jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
//spawn the JVM in a new thread. Note that there will be very little
//extra overhead of launching the new thread for a new JVM since
//most of the cost is involved in launching the process. Moreover,
//since we are going to be using the JVM for running many tasks,
//the thread launch cost becomes trivial when amortized over all
//tasks. Doing it this way also keeps code simple.
jvmRunner.setDaemon(true);
jvmRunner.setName("JVM Runner " + jvmRunner.jvmId + " spawned.");
setRunningTaskForJvm(jvmRunner.jvmId, t);
LOG.info(jvmRunner.getName());
jvmRunner.start();//开始启动
}
在jvmRunner线程中会最终调用launchTask真正启动一个JVM进程,这个过程相对简单,过程如下:
public int launchTask(String user, String jobId, String attemptId, List<String> setup, List<String> jvmArguments, File currentWorkDirectory, String stdout, String stderr) throws IOException { ShellCommandExecutor shExec = null; try { FileSystem localFs = FileSystem.getLocal(getConf()); //创建任务工作目录:$hadoop.log.dir\taskTracker\hadoop\jobcache\job_201311071504_0001\attempt_201311071504_0001_m_000002_0\work new Localizer(localFs, getConf().getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY)). initializeAttemptDirs(user, jobId, attemptId); // create the working-directory of the task if (!currentWorkDirectory.mkdir()) { throw new IOException("Mkdirs failed to create " + currentWorkDirectory.toString()); } //创建日志目录:$hadoop.log.dir\userlogs\job_201311071504_0001\attempt_201311071504_0001_m_000002_0 String logLocation = TaskLog.getAttemptDir(jobId, attemptId).toString(); //localFs.delete(new Path(logLocation)); //localFs.rename(new Path(logLocation), new Path(logLocation+".old")); if (!localFs.mkdirs(new Path(logLocation))) { throw new IOException("Mkdirs failed to create " + logLocation); } //read the configuration for the job FileSystem rawFs = FileSystem.getLocal(getConf()).getRaw(); long logSize = 0; //TODO MAPREDUCE-1100 // get the JVM command line. String cmdLine = TaskLog.buildCommandLine(setup, jvmArguments, new File(stdout), new File(stderr), logSize, true); // write the command to a file in the // task specific cache directory // TODO copy to user dir //将JVM启动脚本写入文件,日志中会看到如下信息: //13/11/07 15:22:05 INFO mapred.TaskController: Writing commands to C:/hadoop/tasklog/ttprivate/taskTracker/hadoop/jobcache/job_201311071504_0001/attempt_201311071504_0001_m_000002_0/taskjvm.sh Path p = new Path(allocator.getLocalPathForWrite( TaskTracker.getPrivateDirTaskScriptLocation(user, jobId, attemptId), getConf()), COMMAND_FILE); String commandFile = writeCommand(cmdLine, rawFs, p); rawFs.setPermission(p, TaskController.TASK_LAUNCH_SCRIPT_PERMISSION); //构建命令执行器,用于调用上面写入的脚本 shExec = new ShellCommandExecutor(new String[]{ "bash", "-c", commandFile.substring(1)}, currentWorkDirectory); shExec.execute();//开始执行脚本taskjvm.sh,通过ProcessBuilder来执行脚本,会开启错误监控线程 } catch (Exception e) { if (shExec == null) { return -1; } int exitCode = shExec.getExitCode(); LOG.warn("Exit code from task is : " + exitCode); LOG.info("Output from DefaultTaskController's launchTask follows:"); logOutput(shExec.getOutput()); return exitCode; } return 0; }