现在的位置: 首页 > 综合 > 正文

hadoop的jvm重用

2019年05月23日 ⁄ 综合 ⁄ 共 21379字 ⁄ 字号 评论关闭

1 先让我们分析task在hadoop中Tasktracker的运行过程:

我们知道每个job都是分割成多个task(由map task和reduce task组成)来完成的,而每个task又是由TaskLauncher(即:mapLauncher和reduceLauncher)来完成分配slot和开启task的运行(startNewTask),同时会创建相应的TaskInProgress,而TaskInProgress又是具体描述task信息的实体(包括task进度,配置文件,TaskRunner等信息),其中TaskRunner则是运行任务的线程类。

来看看TaskLauncher类:

     class TaskLauncher extends Thread {

    private IntWritable numFreeSlots;

    private final int maxSlots;

private List<TaskInProgress> tasksToLaunch;

可以知道TaskLauncher它是一个线程类,包括三个属性,分别表示现在可用的slot数(numFreeSlots), TaskLauncher中的最大slot数(maxSlots),要运行的task数(tasksToLaunch)

再来看看该类的run逻辑:

    public void run() {

      while (!Thread. interrupted ()) {

        try {

          TaskInProgress tip;

          Task task;

          synchronized (tasksToLaunch) {

            while (tasksToLaunch.isEmpty()) {

              tasksToLaunch.wait();

            }

            tip = tasksToLaunch.remove(0);

            task = tip.getTask();

            LOG .info(“Trying to launch : ” + tip.getTask().getTaskID() +

                     ” which needs ” + task.getNumSlotsRequired() + ” slots”);

          }

          synchronized (numFreeSlots) {

            boolean canLaunch = true ;

            while (numFreeSlots.get() < task.getNumSlotsRequired()) {

              if (!tip.canBeLaunched()) {

                LOG .info(“Not blocking slots for ” + task.getTaskID()

                    + ” as it got killed externally. Task’s state is “

                    + tip.getRunState());

                canLaunch = false ;

                break ;

              }

              LOG .info(“TaskLauncher : Waiting for ” + task.getNumSlotsRequired() +

                       ” to launch ” + task.getTaskID() + “, currently we have ” +

                       numFreeSlots.get() + ” free slots”);

              numFreeSlots.wait();

            }

            if (!canLaunch) {

              continue ;

            }

            LOG .info(“In TaskLauncher, current free slots : ” + numFreeSlots.get()+

                     ” and trying to launch “+tip.getTask().getTaskID() +

                     ” which needs ” + task.getNumSlotsRequired() + ” slots”);

            numFreeSlots.set(numFreeSlots.get() – task.getNumSlotsRequired());

             assert (numFreeSlots.get() >= 0);

          }

          synchronized (tip) {

            if (!tip.canBeLaunched()) {

              LOG .info(“Not launching task ” + task.getTaskID() + ” as it got”

                + ” killed externally. Task’s state is ” + tip.getRunState());

              addFreeSlots(task.getNumSlotsRequired());

              continue ;

            }

            tip.slotTaken = true ;

          }

          startNewTask(tip);

        } catch (InterruptedException e) {

          return ;

        } catch (Throwable th) {

          LOG .error(“TaskLauncher error ” +

              StringUtils. stringifyException (th));

        }

      }

    }

  }

我们可以看到TaskLauncher首先从tasksToLaunch链表中取下第一个task并删除它,然后将它所需要的slot数和现在所剩余的空闲的slot数比较,看现在该task是否可以运行,如果所需要的slot数大于剩余空闲的slot数,则该task不能运行,等待剩余空闲的slot数满足它所需要的slot数(canLaunch标记为false) ;否则就运行该task(startNewTask(tip))。

再来看看在startNewTask函数做了什么事:

void startNewTask(TaskInProgress tip) throws InterruptedException {

    try {

      RunningJob rjob = localizeJob(tip);

      tip.getTask().setJobFile(rjob.localizedJobConf.toString());

      launchTaskForJob(tip, new JobConf(rjob.jobConf), rjob);

    } catch (Throwable e) {

      String msg = (“Error initializing ” + tip.getTask().getTaskID() +

                    “:\n” + StringUtils. stringifyException (e));

      LOG .warn(msg);

      tip.reportDiagnosticInfo(msg);

      try {

        tip.kill( true );

        tip.cleanup( true );

      } catch (IOException ie2) {

        LOG .info(“Error cleaning up ” + tip.getTask().getTaskID(), ie2);

      } catch (InterruptedException ie2) {

        LOG .info(“Error cleaning up ” + tip.getTask().getTaskID(), ie2);

      }

      if (e instanceof Error) {

         throw ((Error) e);

      }

    }

  }

可以看到,该函数首先对该task做一个localizeJob处理(即:本地化处理,将该task所需的文件拷贝到该Tasktracker节点上)并设置其配置文件(setJobFile),然后在运行该task(launchTaskForJob) 。

再看看localizeJob都做了哪些处理:

RunningJob localizeJob(TaskInProgress tip)

  throws IOException, InterruptedException {

    Task t = tip.getTask();

    JobID jobId = t.getJobID();

    RunningJob rjob = addTaskToJob(jobId, tip);

    InetSocketAddress ttAddr = getTaskTrackerReportAddress();

    try {

      synchronized (rjob) {

        if (!rjob.localized) {

          while (rjob.localizing) {

            rjob.wait();

          }

          if (!rjob.localized) {

            rjob.localizing = true ;

          }

        }

      }

      if (!rjob.localized) {

        Path localJobConfPath = initializeJob(t, rjob, ttAddr);

        JobConf localJobConf = new JobConf(localJobConfPath);

        localJobConf.setUser(t.getUser());

        resetNumTasksPerJvm (localJobConf);

        synchronized (rjob) {

          rjob.localizedJobConf = localJobConfPath;

          rjob.jobConf = localJobConf;

          rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null ) ||

              localJobConf.getKeepFailedTaskFiles());

          rjob.localized = true ;

        }

      }

    } finally {

      synchronized (rjob) {

        if (rjob.localizing) {

          rjob.localizing = false ;

          rjob.notifyAll();

        }

      }

    }

    synchronized (runningJobs) {

      runningJobs.notify();

    }

    return rjob;

  }

可以看到它首先把该task添加到所对应的job中,然后会做一个非常重要的逻辑处理—-首先判断该job是否已经将该task所需文件拷贝到了当前Tasktracker节点上(即:localized为true),否则将会进行这一拷贝过程并进行resetNumTasksPerJvm处理,拷贝完后并将localized标记为true。

那resetNumTasksPerJvm中又做了哪些处理呢?

  static void resetNumTasksPerJvm(JobConf localJobConf) {

    boolean debugEnabled = false ;

    if (localJobConf.getNumTasksToExecutePerJvm() == 1) {

      return ;

    }

    if (localJobConf.getMapDebugScript() != null ||

        localJobConf.getReduceDebugScript() != null ) {

      debugEnabled = true ;

    }

    String keepPattern = localJobConf.getKeepTaskFilesPattern();

    if (debugEnabled || localJobConf.getProfileEnabled() ||

        keepPattern != null || localJobConf.getKeepFailedTaskFiles()) {

      localJobConf.setNumTasksToExecutePerJvm(1);

    }

  }

看完代码,我们知道该段代码逻辑是首先判断mapred.job.reuse.jvm.num.tasks的参数值是否设置为1(即:jvm不重用,默认值),如果为1,不进行任何处理,否则又判断是否设置了mapred.map.task.debug.script, mapred.reduce.task.debug.script, keep.task.files.pattern, mapred.task.profile, keep.failed.task.files,只要设置了其中的任何的一项,则jvm都不会进行重用(即mapred.job.reuse.jvm.num.tasks的值永远为1,对于任何一个task,它都会新启动一个jvm来运行该task)

下面我们来讨论一下那几个参数的意义。

  /**

   * Get the regular expression that is matched against the task names

   * to see if we need to keep the files.

   */

  public String getKeepTaskFilesPattern() {

    return get(“keep.task.files.pattern”);

  }

  /**

   * @return should the files be kept?

   */

  public boolean getKeepFailedTaskFiles() {

    return getBoolean(“keep.failed.task.files”, false );

  }

在Task类中runJobCleanupTask函数如下所示:

protected void runJobCleanupTask(TaskUmbilicalProtocol umbilical,

                               TaskReporter reporter

                              ) throws IOException, InterruptedException {

    ……

    JobConf conf = new JobConf (jobContext.getConfiguration());

    if (!supportIsolationRunner(conf)) {

      String jobTempDir = conf.get(“mapreduce.job.dir”);

      Path jobTempDirPath = new Path(jobTempDir);

      FileSystem fs = jobTempDirPath.getFileSystem(conf);

      fs.delete(jobTempDirPath, true );

    }

     ……

  }

从上面的代码来看,我们知道keep.task.files.pattern和keep.failed.task.files参数的作用是用来判断是否保存job临时文件的依据,由于磁盘空间有限,它们的值默认为false。

  /**

   * Get whether the task profiling is enabled.

   */

  public boolean getProfileEnabled() {

    return getBoolean(“mapred.task.profile”, false );

  }

 

在JobClient类中,

  public boolean monitorAndPrintJob(JobConf conf,

                                    RunningJob job

  ) throws IOException, InterruptedException {

    ……

    boolean profiling = conf.getProfileEnabled();

    ……

    while (!job.isComplete()) {

       ……

      for (TaskCompletionEvent event : events){

        TaskCompletionEvent.Status status = event.getTaskStatus();

        if (profiling && shouldDownloadProfile(conf) &&

            (status == TaskCompletionEvent.Status. SUCCEEDED ||

                status == TaskCompletionEvent.Status. FAILED ) &&

                (event.isMap ? mapRanges : reduceRanges).

                isIncluded(event.idWithinJob())) {

          downloadProfile (event);

        }

        ……

  }

  private static void downloadProfile(TaskCompletionEvent e

                                      ) throws IOException  {

    URLConnection connection =

      new URL( getTaskLogURL (e.getTaskAttemptId(), e.getTaskTrackerHttp()) +

              “&filter=profile”).openConnection();

    InputStream in = connection.getInputStream();

    OutputStream out = new FileOutputStream(e.getTaskAttemptId() + “.profile”);

    IOUtils. copyBytes (in, out, 64 * 1024, true );

  }

从上面的代码来看,我们知道mapred.task.profile的作用是用来在job完成(成功或者失败)下载job信息的标记( downloadProfile (event)),用来分析job的运行情况,默认为false。

  /**

   * Get the map task’s debug script.

   */

  public String getMapDebugScript() {

    return get(“mapred.map.task.debug.script”);

  }

  /**

   * Get the reduce task’s debug Script

   */

  public String getReduceDebugScript() {

    return get(“mapred.reduce.task.debug.script”);

  }

在TaskInProgress类中,

public synchronized void setJobConf(JobConf lconf){

……

      if (task.isMapTask()) {

        debugCommand = localJobConf.getMapDebugScript();

      } else {

        debugCommand = localJobConf.getReduceDebugScript();

      }

……

}

  public void taskFinished() {

      ……

      boolean needCleanup = false ;

      synchronized ( this ) {

        ……

        //任务还没完成且还在运行

        if (!done) {

          if (!wasKilled) {

            ……

            if (debugCommand != null ) {

              String taskStdout =”";

              String taskStderr =”";

              String taskSyslog =”";

              String jobConf = task.getJobFile();

              try {

                Map<LogName, LogFileDetail> allFilesDetails = TaskLog

                    . getAllLogsFileDetails (task.getTaskID(), task

                        .isTaskCleanupTask());

                // get task’s stdout file

                taskStdout =

                    TaskLog. getRealTaskLogFilePath (

                        allFilesDetails.get(LogName. STDOUT ).location,

                        LogName. STDOUT );

                // get task’s stderr file

                taskStderr =

                    TaskLog. getRealTaskLogFilePath (

                        allFilesDetails.get(LogName. STDERR ).location,

                        LogName. STDERR );

                // get task’s syslog file

                taskSyslog =

                    TaskLog. getRealTaskLogFilePath (

                        allFilesDetails.get(LogName. SYSLOG ).location,

                        LogName. SYSLOG );

              } catch (IOException e){

                LOG .warn(“Exception finding task’s stdout/err/syslog files”);

              }

……

}

从上面的代码来看,我们知道mapred.map.task.debug.script和 mapred.reduce.task.debug.script的作用是用来收集task运行日志的标记。默认为false。

好了,让我们返回到launchTaskForJob中,继续跟踪该task运行。

  private void launchTaskForJob(TaskInProgress tip, JobConf jobConf,

                                RunningJob rjob) throws IOException {

    synchronized (tip) {

      jobConf.set(JobConf. MAPRED_LOCAL_DIR_PROPERTY ,

                  localStorage.getDirsString());

      tip.setJobConf(jobConf);

      tip.setUGI(rjob.ugi);

      tip.launchTask(rjob);

    }

  }

在该段代码里,没有什么可以值得关注的,只是对该task(可能称呼TaskInProgress更为贴切)进行了一些设置,然后launchTask,那么它在launchTask中做了什么呢?

public synchronized void launchTask(RunningJob rjob) throws IOException {

      if ( this .taskStatus.getRunState() == TaskStatus.State. UNASSIGNED ||

          this .taskStatus.getRunState() == TaskStatus.State. FAILED_UNCLEAN ||

          this .taskStatus.getRunState() == TaskStatus.State. KILLED_UNCLEAN ) {

        localizeTask(task);

        if ( this .taskStatus.getRunState() == TaskStatus.State. UNASSIGNED ) {

          this .taskStatus.setRunState(TaskStatus.State. RUNNING );

        }

        setTaskRunner(task.createRunner(TaskTracker. this , this , rjob));

        this .runner.start();

        long now = System. currentTimeMillis ();

        this .taskStatus.setStartTime(now);

        this .lastProgressReport = now;

      } else {

        LOG .info(“Not launching task: ” + task.getTaskID() +

            ” since it’s state is ” + this .taskStatus.getRunState());

      }

}

我们可以看到它首先进行localizeTask处理,即对task的mapred.tip.id, mapred.task.id等进行设置,然后在当前Tasktracker节点上创建一个合适的taskRunner线程(对于map task,则创建的是mapTaskRunner;对于reduceTaskRunner,创建的是reduceTaskRunner)来运行该任务(createRunner),然后开始start(runner.start)任务的运行。那么在taskRunner线程的线程中的run都做了什么呢?

public final void run() {

    String errorInfo = “Child Error”;

    try {

      TaskAttemptID taskid = t.getTaskID();

      final LocalDirAllocator lDirAlloc = new LocalDirAllocator(“mapred.local.dir”);

      final File workDir =

      new File( new Path(localdirs[ rand .nextInt(localdirs.length)],

          TaskTracker. getTaskWorkDir (t.getUser(), taskid.getJobID().toString(),

          taskid.toString(),

          t.isTaskCleanupTask())).toString());

      if (!prepare()) {

        return ;

      }

      List<String> classPaths = getClassPaths (conf, workDir,

                                              taskDistributedCacheManager);

      long logSize = TaskLog. getTaskLogLength (conf);

      Vector<String> vargs = getVMArgs(taskid, workDir, classPaths, logSize);

      tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);

      String setup = getVMSetupCmd();

      File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());

      File stdout = logFiles[0];

      File stderr = logFiles[1];

      tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,

                 stderr);

      Map<String, String> env = new HashMap<String, String>();

      errorInfo = getVMEnvironment(errorInfo, workDir, conf, env, taskid,

                                   logSize);

      List <String> setupCmds = new ArrayList<String>();

      appendEnvExports (setupCmds, env);

      setupCmds.add(setup);

      launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);

      tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());

      if (exitCodeSet) {

        if (!killed && exitCode != 0) {

          if (exitCode == 65) {

            tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());

          }

          throw new IOException(“Task process exit with nonzero status of ” +

              exitCode + “.”);

        }

      }

    } catch (FSError e) {

      LOG .fatal(“FSError”, e);

      try {

        tracker.internalFsError(t.getTaskID(), e.getMessage());

      } catch (IOException ie) {

        LOG .fatal(t.getTaskID()+” reporting FSError”, ie);

      }

    } catch (Throwable throwable) {

      LOG .warn(t.getTaskID() + ” : ” + errorInfo, throwable);

      Throwable causeThrowable = new Throwable(errorInfo, throwable);

      ByteArrayOutputStream baos = new ByteArrayOutputStream();

      causeThrowable.printStackTrace( new PrintStream(baos));

      try {

        tracker.internalReportDiagnosticInfo(t.getTaskID(), baos.toString());

      } catch (IOException e) {

        LOG .warn(t.getTaskID()+” Reporting Diagnostics”, e);

      }

    } finally {

      tip.reportTaskFinished( false );

    }

  }

从该段代码中我们可以得知,在该阶段中主要是对该task的一些参数进行设置(如:工作目录—workDir, 路径—classPaths,java运行时参数—vargs等),然而这些并不似我们关心的,在这些参数设置完成后,我们关心的launchJvmAndWait来了,即开启一个jvm(child进程)来运行task,具体task的运行这里就给与分析了(因为跟本文讨论的主题几乎没有联系)。

  void launchJvmAndWait(List <String> setup, Vector<String> vargs, File stdout,

      File stderr, long logSize, File workDir)

      throws InterruptedException, IOException {

    jvmManager.launchJvm( this , jvmManager.constructJvmEnv(setup, vargs, stdout,

        stderr, logSize, workDir, conf));

    synchronized (lock) {

      while (!done) {

        lock.wait();

      }

    }

  }

该段代码的作用很简单,主要是运行child进程(launchJvm)。其中launchJvm属于JvmManager类的方法。

  public void launchJvm(TaskRunner t, JvmEnv env

                        ) throws IOException, InterruptedException {

    if (t.getTask().isMapTask()) {

      mapJvmManager.reapJvm(t, env);

    } else {

      reduceJvmManager.reapJvm(t, env);

    }

  }

从这段代码我们可以看出launchJvm主要依据task的类型(map task OR reduce task)而相应开启相应类型的jvm。我们以map task类型分析为例来看看reapJvm的逻辑处理。其中reapJvm属于JvmManagerForType类中的方法。

    private synchronized void reapJvm(

        TaskRunner t, JvmEnv env) throws IOException, InterruptedException {

      if (t.getTaskInProgress().wasKilled()) {

        return ;

      }

      boolean spawnNewJvm = false ;

      JobID jobId = t.getTask().getJobID();

      int numJvmsSpawned = jvmIdToRunner.size();

      JvmRunner runnerToKill = null ;

      if (numJvmsSpawned >= maxJvms) {

        Iterator<Map.Entry<JVMId, JvmRunner>> jvmIter =

          jvmIdToRunner.entrySet().iterator();

        while (jvmIter.hasNext()) {

          JvmRunner jvmRunner = jvmIter.next().getValue();

          JobID jId = jvmRunner.jvmId.getJobId();

          if (jId.equals(jobId) && !jvmRunner.isBusy() && !jvmRunner.ranAll()){

            setRunningTaskForJvm(jvmRunner.jvmId, t); //reserve the JVM

            LOG .info(“No new JVM spawned for jobId/taskid: ” +

                     jobId+”/”+t.getTask().getTaskID() +

                     “. Attempting to reuse: ” + jvmRunner.jvmId);

            return ;

          }

          if ((jId.equals(jobId) && jvmRunner.ranAll()) ||

              (!jId.equals(jobId) && !jvmRunner.isBusy())) {

            runnerToKill = jvmRunner;

            spawnNewJvm = true ;

          }

        }

      } else {

        spawnNewJvm = true ;

      }

      if (spawnNewJvm) {

        if (runnerToKill != null ) {

          LOG .info(“Killing JVM: ” + runnerToKill.jvmId);

          killJvmRunner(runnerToKill);

        }

        spawnNewJvm(jobId, env, t);

        return ;

      }

      LOG .fatal(“Inconsistent state!!! ” +

             “JVM Manager reached an unstable state ” +

            “while reaping a JVM for task: ” + t.getTask().getTaskID()+

            ” ” + getDetails() + “. Aborting. “);

      System. exit (-1);

}

在该段代码中,首先判断该task是否被kill掉(.wasKilled),然后再判断是否有空闲slot来开启新的jvm(jvmIdToRunner.size()>=maxJvms)。如果还有空闲的槽位,则标记spawnNewJvm为true(即:启动一个新的子进程jvm);否则再对jvmIdToRunner进行迭代: A或者找出其中属于同一job且当前为空闲状态的jvm,再判断是否对jvm设置重用(通过jvmRunner.ranAll()方法判断),如果设置重用,则直接利用当前的jvm来运行任务(setRunningTaskForJvm)
 B或者找出要么属于不同job且没有运行task(task已经运行完)的空闲jvm,要么属于同一个job但是它的tasks已经运行完的jvm,再将该jvm标记为应该被kill(即:runnerToKill = jvmRunner),并标记spawnNewJvm为true,表示可以开启新的jvm。最后在根据spawnNewJvm的值判断是否将找出的runnerToKill杀掉,然后再spawnNewJvm(即:开启新的jvm子进程) 。

再来看看spawnNewJvm函数里都有些什么逻辑。

private void spawnNewJvm(JobID jobId, JvmEnv env,

        TaskRunner t) {

      JvmRunner jvmRunner = new JvmRunner(env, jobId, t.getTask());

      jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);

      jvmRunner.setDaemon( true );

      jvmRunner.setName(“JVM Runner ” + jvmRunner.jvmId + ” spawned.”);

      setRunningTaskForJvm(jvmRunner.jvmId, t);

      LOG .info(jvmRunner.getName());

      jvmRunner.start();

}

从这段代码中我们可以看出,在该函数中首先是new一个JvmRunner线程.然后对其设置一些属性,然后将其(即当前jvmRunner的jvmId)放入jvmIdRunner中,而在setRunningTaskForJvm中,又将其放入jvmToRunningTask,和runningTaskToJvm中,然后设置其为繁忙状态(setBusy(true)) 。在spawnNewJvm函数的末尾通过start启动该jvm进程,紧接着一个task就真正的运行起来了。

再来看看在killJvmRunner都做了一些什么事。

    private synchronized void killJvmRunner(JvmRunner jvmRunner

                                            ) throws IOException,

                                                     InterruptedException {

      jvmRunner.kill();

      removeJvm(jvmRunner.jvmId);

}

    synchronized private void removeJvm(JVMId jvmId) {

      jvmIdToRunner.remove(jvmId);

      jvmIdToPid.remove(jvmId);

    }

从代码中我们可以看出它首先是停止当前jvmRunner线程(jvmRunner.kill()),然后通过removeJvm函数将当前JVMId从相应的结构中删除。

2 再让我们来看看jvm重用的影响:

我们知道mapred.job.reuse.jvm.num.tasks的默认值为1,即:每一个task都新启动一个jvm来运行任务,而当值为-1时,则表示jvm可以无限制重用。再结合上面的分析,当mapred.job.reuse.jvm.num.task设置为-1时,比值为1少的是killJvmRunner和spawnNewJvm过程,而且在值为-1时,TaskTracker首先也是先判断当前节点是否有空闲的slot剩余,如果没空闲的slot槽位,才会判断当前分配的slot槽位中的jvm是否已经将当前的task任务运行完,如果task已经运行完,才会复用当前jvm(当前只针对同一个job的task才会进行jvm的复用)。因此当一个job的task(尤其是task的耗时很小)数目很大,由于频繁的jvm停启会造很大的开销,进行jvm的复用也可以使同一job的一些static的数据得到共享,从而使集群的性能得到极大的提升。但是jvm的重用也会造成在同一个jvm中的碎片增加,导致jvm的性能变差。但是这一负面影响不是很大,总的来说,jvm重用还是值得使用的,尽管相对于那些长时间且task数少的job来说,jvm重用几乎没有什么性能提升。

但是有一点值得我们注意的是,由于mapred.job.reuse.jvm.num.task是客户端参数(也可以在服务端Tasktracker节点上将其声明为final使之生效),在jvm重用时,会导致map(reduce)函数中的static类型的变量在使用时可能没有达到预期目的,因为再次使用该变量时,静态变量的值仍为上次task运行结束时的值。因此在使用该参数时,对于在map(reduce)函数中静态变量的使用,一定要小心,应该考虑是否需要对其进行初始化或者仍然使用上次使用的值(以达到数据共享目的) 。

附《Hadoop权威指南》上的介绍:

Task JVM Reuse

Hadoop runs tasks in their own Java Virtual Machine to isolate them from other running tasks. The overhead of starting a new JVM for each task can take around a second, which for jobs that run for a minute or so is insignificant. However, jobs that have a large
number of very short-lived tasks (these are usually map tasks), or that have lengthy initialization, can see performance gains when the JVM is reused for subsequent tasks.

With task JVM reuse enabled, tasks do  not  run concurrently in a single JVM. The JVM runs tasks sequentially.

Tasks that are CPU-bound may also benefit from task JVM reuse by taking advantage of runtime optimizations applied by the HotSpot JVM. After running for a while, the HotSpot JVM builds up enough information to detect performance-critical sections in the code
and dynamically translates the Java byte codes of these hot spots into native machine code. This works well for long-running processes, but JVMs that run for seconds or a few minutes may not gain the full benefit of HotSpot. In these cases, it is worth enabling
task JVM reuse.

Another place where a shared JVM is useful is for sharing state between the tasks of a job. By storing reference data in a static field, tasks get rapid access to the shared data.

附:测试结果:

job 类型

测试类型

修改配置前job运行总时间

修改配置后job运行总时间

总时间缩短百分比

优化后性能提升的百分比

wordcount

典型

3:15:07

1:28:55

54.43%

119.44%

上限

30:36:14

13:37:01

55.51%

130.66%

下限

0:10:23

0:10:22

0.16%

0.16%

短时间

2:22:16

0:57:56

59.28%

145.57%

长时间

9:39:21

7:08:34

26.03%

35.18%

grep

典型

3:16:46

1:23:37

57.50%

135.32%

下限

0:10:34

0:10:30

0.63%

0.63%

短时间

2:16:24

1:06:10

51.49%

106.15%

长时间

7:20:02

5:13:37

28.73%

40.31%

sort

典型

4:08:01

2:57:03

28.61%

40.08%

下限

0:11:57

0:11:57

0.00%

0.00%

短时间

2:16:35

1:09:15

49.30%

97.23%

长时间

11:11:13

10:29:27

6.22%

6.64%

randomWriter

典型

4:31:19

4:27:13

1.51%

1.53%

下限

0:05:58

0:06:02

-1.12%

-1.10%

短时间

2:00:46

1:55:50

4.09%

4.26%

长时间

13:28:24

13:33:03

-0.58%

-0.57%

混合类型

典型

3:27:31

2:42:51

21.52%

27.43%

说明A:  1)wordcount类型的典型、上限、下限、短时间和长时间测试;

2)grep类型的典型、下限、短时间和长时间测试;

3)sort类型的典型、下限、短时间和长时间测试;

4)randomWriter的典型、下限、短时间和长时间测试;

5)wordcount、grep、sort和randomwriter,4种类型的混合测试,测试典型数据。

其中:典型指的是任务的平均执行时间为10分钟;下限表示采用典型的数据1个map和1个reduce;上限为2000个map和100个reduce;短时间指的是任务的平均执行时间为3分钟;长时间指的是任务的平均执行时间为30分钟。

说明B: 由于测试数据准备不是很充分,job类型很少,测试结果并不能代表真实的hadoop集群的运行情况,这里只是说明jvm重用在一定条件下对于集群的性能有很大提升。

抱歉!评论已关闭.