现在的位置: 首页 > 综合 > 正文

Hadoop学习总结之四:Map-Reduce的过程解析

2013年11月04日 ⁄ 综合 ⁄ 共 11413字 ⁄ 字号 评论关闭

一、客户端

Map-Reduce的过程首先是由客户端提交一个任务开始的。

提交任务主要是通过JobClient.runJob(JobConf)静态函数实现的:

public static RunningJob runJob(JobConf job) throws IOException {

  //首先生成一个JobClient对象

  JobClient jc = new JobClient(job);

  ……

  //调用submitJob来提交一个任务

  running = jc.submitJob(job);

  JobID jobId = running.getID();

  ……

  while (true) {

     //while循环中不断得到此任务的状态,并打印到客户端console中

  }

  return running;

}

其中JobClient的submitJob函数实现如下:

public RunningJob submitJob(JobConf job) throws FileNotFoundException,

                                InvalidJobConfException, IOException {

  //从JobTracker得到当前任务的id

  JobID jobId = jobSubmitClient.getNewJobId();

  //准备将任务运行所需要的要素写入HDFS:

  //任务运行程序所在的jar封装成job.jar

  //任务所要处理的input split信息写入job.split

  //任务运行的配置项汇总写入job.xml

  Path submitJobDir = new Path(getSystemDir(), jobId.toString());

  Path submitJarFile = new Path(submitJobDir, "job.jar");

  Path submitSplitFile = new Path(submitJobDir, "job.split");

  //此处将-libjars命令行指定的jar上传至HDFS

  configureCommandLineOptions(job, submitJobDir, submitJarFile);

  Path submitJobFile = new Path(submitJobDir, "job.xml");

  ……

  //通过input format的格式获得相应的input split,默认类型为FileSplit

  InputSplit[] splits =

    job.getInputFormat().getSplits(job, job.getNumMapTasks());

 

  // 生成一个写入流,将input split得信息写入job.split文件

  FSDataOutputStream out = FileSystem.create(fs,

      submitSplitFile, new FsPermission(JOB_FILE_PERMISSION));

  try {

    //写入job.split文件的信息包括:split文件头,split文件版本号,split的个数,接着依次写入每一个input split的信息。

    //对于每一个input split写入:split类型名(默认FileSplit),split的大小,split的内容(对于FileSplit,写入文件名,此split在文件中的起始位置),split的location信息(即在那个DataNode上)。

    writeSplitsFile(splits, out);

  } finally {

    out.close();

  }

  job.set("mapred.job.split.file", submitSplitFile.toString());

  //根据split的个数设定map task的个数

  job.setNumMapTasks(splits.length);

  // 写入job的配置信息入job.xml文件      

  out = FileSystem.create(fs, submitJobFile,

      new FsPermission(JOB_FILE_PERMISSION));

  try {

    job.writeXml(out);

  } finally {

    out.close();

  }

  //真正的调用JobTracker来提交任务

  JobStatus status = jobSubmitClient.submitJob(jobId);

  ……

}

 

二、JobTracker

JobTracker作为一个单独的JVM运行,其运行的main函数主要调用有下面两部分:

  • 调用静态函数startTracker(new JobConf())创建一个JobTracker对象
  • 调用JobTracker.offerService()函数提供服务

在JobTracker的构造函数中,会生成一个taskScheduler成员变量,来进行Job的调度,默认为JobQueueTaskScheduler,也即按照FIFO的方式调度任务。

在offerService函数中,则调用taskScheduler.start(),在这个函数中,为JobTracker(也即taskScheduler的taskTrackerManager)注册了两个Listener:

  • JobQueueJobInProgressListener jobQueueJobInProgressListener用于监控job的运行状态
  • EagerTaskInitializationListener eagerTaskInitializationListener用于对Job进行初始化

EagerTaskInitializationListener中有一个线程JobInitThread,不断得到jobInitQueue中的JobInProgress对象,调用JobInProgress对象的initTasks函数对任务进行初始化操作。

在上一节中,客户端调用了JobTracker.submitJob函数,此函数首先生成一个JobInProgress对象,然后调用addJob函数,其中有如下的逻辑:

synchronized (jobs) {

  synchronized (taskScheduler) {

    jobs.put(job.getProfile().getJobID(), job);

    //对JobTracker的每一个listener都调用jobAdded函数

    for (JobInProgressListener listener : jobInProgressListeners) {

      listener.jobAdded(job);

    }

  }

}

 

EagerTaskInitializationListener的jobAdded函数就是向jobInitQueue中添加一个JobInProgress对象,于是自然触发了此Job的初始化操作,由JobInProgress得initTasks函数完成:

public synchronized void initTasks() throws IOException {

  ……

  //从HDFS中读取job.split文件从而生成input splits

  String jobFile = profile.getJobFile();

  Path sysDir = new Path(this.jobtracker.getSystemDir());

  FileSystem fs = sysDir.getFileSystem(conf);

  DataInputStream splitFile =

    fs.open(new Path(conf.get("mapred.job.split.file")));

  JobClient.RawSplit[] splits;

  try {

    splits = JobClient.readSplitFile(splitFile);

  } finally {

    splitFile.close();

  }

  //map task的个数就是input split的个数

  numMapTasks = splits.length;

  //为每个map tasks生成一个TaskInProgress来处理一个input split

  maps = new TaskInProgress[numMapTasks];

  for(int i=0; i < numMapTasks; ++i) {

    inputLength += splits[i].getDataLength();

    maps[i] = new TaskInProgress(jobId, jobFile,

                                 splits[i],

                                 jobtracker, conf, this, i);

  }

  //对于map task,将其放入nonRunningMapCache,是一个Map<Node, List<TaskInProgress>>,也即对于map task来讲,其将会被分配到其input split所在的Node上。nonRunningMapCache将在JobTracker向TaskTracker分配map task的时候使用。

  if (numMapTasks > 0) { 
    nonRunningMapCache = createCache(splits, maxLevel);
  }

 

  //创建reduce task

  this.reduces = new TaskInProgress[numReduceTasks];

  for (int i = 0; i < numReduceTasks; i++) {

    reduces[i] = new TaskInProgress(jobId, jobFile,

                                    numMapTasks, i,

                                    jobtracker, conf, this);

    //reduce task放入nonRunningReduces,其将在JobTracker向TaskTracker分配reduce task的时候使用。

    nonRunningReduces.add(reduces[i]);

  }

 

  //创建两个cleanup task,一个用来清理map,一个用来清理reduce.

  cleanup = new TaskInProgress[2];

  cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0],

          jobtracker, conf, this, numMapTasks);

  cleanup[0].setJobCleanupTask();

  cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

                     numReduceTasks, jobtracker, conf, this);

  cleanup[1].setJobCleanupTask();

  //创建两个初始化 task,一个初始化map,一个初始化reduce.

  setup = new TaskInProgress[2];

  setup[0] = new TaskInProgress(jobId, jobFile, splits[0],

          jobtracker, conf, this, numMapTasks + 1 );

  setup[0].setJobSetupTask();

  setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

                     numReduceTasks + 1, jobtracker, conf, this);

  setup[1].setJobSetupTask();

  tasksInited.set(true);//初始化完毕

  ……

}

 

三、TaskTracker

TaskTracker也是作为一个单独的JVM来运行的,在其main函数中,主要是调用了new TaskTracker(conf).run(),其中run函数主要调用了:

State offerService() throws Exception {

  long lastHeartbeat = 0;

  //TaskTracker进行是一直存在的

  while (running && !shuttingDown) {

      ……

      long now = System.currentTimeMillis();

      //每隔一段时间就向JobTracker发送heartbeat

      long waitTime = heartbeatInterval - (now - lastHeartbeat);

      if (waitTime > 0) {

        synchronized(finishedCount) {

          if (finishedCount[0] == 0) {

            finishedCount.wait(waitTime);

          }

          finishedCount[0] = 0;

        }

      }

      ……

      //发送Heartbeat到JobTracker,得到response

      HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);

      ……

     //从Response中得到此TaskTracker需要做的事情

      TaskTrackerAction[] actions = heartbeatResponse.getActions();

      ……

      if (actions != null){

        for(TaskTrackerAction action: actions) {

          if (action instanceof LaunchTaskAction) {

            //如果是运行一个新的Task,则将Action添加到任务队列中

            addToTaskQueue((LaunchTaskAction)action);

          } else if (action instanceof CommitTaskAction) {

            CommitTaskAction commitAction = (CommitTaskAction)action;

            if (!commitResponses.contains(commitAction.getTaskID())) {

              commitResponses.add(commitAction.getTaskID());

            }

          } else {

            tasksToCleanup.put(action);

          }

        }

      }

  }

  return State.NORMAL;

}

其中transmitHeartBeat主要逻辑如下:

private HeartbeatResponse transmitHeartBeat(long now) throws IOException {

  //每隔一段时间,在heartbeat中要返回给JobTracker一些统计信息

  boolean sendCounters;

  if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {

    sendCounters = true;

    previousUpdate = now;

  }

  else {

    sendCounters = false;

  }

  ……

  //报告给JobTracker,此TaskTracker的当前状态

  if (status == null) {

    synchronized (this) {

      status = new TaskTrackerStatus(taskTrackerName, localHostname,

                                     httpPort,

                                     cloneAndResetRunningTaskStatuses(

                                       sendCounters),

                                     failures,

                                     maxCurrentMapTasks,

                                     maxCurrentReduceTasks);

    }

  }

  ……

  //当满足下面的条件的时候,此TaskTracker请求JobTracker为其分配一个新的Task来运行:

  //当前TaskTracker正在运行的map task的个数小于可以运行的map task的最大个数

  //当前TaskTracker正在运行的reduce task的个数小于可以运行的reduce task的最大个数

  boolean askForNewTask;

  long localMinSpaceStart;

  synchronized (this) {

    askForNewTask = (status.countMapTasks() < maxCurrentMapTasks ||

                     status.countReduceTasks() < maxCurrentReduceTasks) &&

                    acceptNewTasks;

    localMinSpaceStart = minSpaceStart;

  }

  ……

  //向JobTracker发送heartbeat,这是一个RPC调用

  HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,

                                                            justStarted, askForNewTask,

                                                            heartbeatResponseId);

  ……

  return heartbeatResponse;

}

 

四、JobTracker

当JobTracker被RPC调用来发送heartbeat的时候,JobTracker的heartbeat(TaskTrackerStatus status,boolean initialContact, boolean acceptNewTasks, short responseId)函数被调用:

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,

                                                boolean initialContact, boolean acceptNewTasks, short responseId)

  throws IOException {

  ……

  String trackerName = status.getTrackerName();

  ……

  short newResponseId = (short)(responseId + 1);

  ……

  HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);

  List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();

  //如果TaskTracker向JobTracker请求一个task运行

  if (acceptNewTasks) {

    TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);

    if (taskTrackerStatus == null) {

      LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);

    } else {

      //setup和cleanup的task优先级最高

      List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);

      if (tasks == null ) {

        //任务调度器分配任务

        tasks = taskScheduler.assignTasks(taskTrackerStatus);

      }

      if (tasks != null) {

        for (Task task : tasks) {

          //将任务放入actions列表,返回给TaskTracker

          expireLaunchingTasks.addNewTask(task.getTaskID());

          actions.add(new LaunchTaskAction(task));

        }

      }

    }

  }

  ……

  int nextInterval = getNextHeartbeatInterval();

  response.setHeartbeatInterval(nextInterval);

  response.setActions(

                      actions.toArray(new TaskTrackerAction[actions.size()]));

  ……

  return response;

}

默认的任务调度器为JobQueueTaskScheduler,其assignTasks如下:

public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)

    throws IOException {

  ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();

  int numTaskTrackers = clusterStatus.getTaskTrackers();

  Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue();

  int maxCurrentMapTasks = taskTracker.getMaxMapTasks();

  int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();

  int numMaps = taskTracker.countMapTasks();

  int numReduces = taskTracker.countReduceTasks();

  //计算剩余的map和reduce的工作量:remaining

  int remainingReduceLoad = 0;

  int remainingMapLoad = 0;

  synchronized (jobQueue) {

    for (JobInProgress job : jobQueue) {

      if (job.getStatus().getRunState() == JobStatus.RUNNING) {

        int totalMapTasks = job.desiredMaps();

        int totalReduceTasks = job.desiredReduces();

        remainingMapLoad += (totalMapTasks - job.finishedMaps());

        remainingReduceLoad += (totalReduceTasks - job.finishedReduces());

      }

    }

  }

  //计算平均每个TaskTracker应有的工作量,remaining/numTaskTrackers是剩余的工作量除以TaskTracker的个数。

  int maxMapLoad = 0;

  int maxReduceLoad = 0;

  if (numTaskTrackers > 0) {

    maxMapLoad = Math.min(maxCurrentMapTasks,

                          (int) Math.ceil((double) remainingMapLoad /

                                          numTaskTrackers));

    maxReduceLoad = Math.min(maxCurrentReduceTasks,

                             (int) Math.ceil((double) remainingReduceLoad

                                             / numTaskTrackers));

  }

  ……

 

  //map优先于reduce,当TaskTracker上运行的map task数目小于平均的工作量,则向其分配map task

  if (numMaps < maxMapLoad) {

    int totalNeededMaps = 0;

    synchronized (jobQueue) {

      for (JobInProgress job : jobQueue) {

        if (job.getStatus().getRunState() != JobStatus.RUNNING) {

          continue;

        }

        Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,

            taskTrackerManager.getNumberOfUniqueHosts());

        if (t != null) {

          return Collections.singletonList(t);

        }

        ……

      }

    }

  }

  //分配完map task,再分配reduce task

  if (numReduces < maxReduceLoad) {

    int totalNeededReduces = 0;

    synchronized (jobQueue) {

      for (JobInProgress job : jobQueue) {

        if (job.getStatus().getRunState() != JobStatus.RUNNING ||

            job.numReduceTasks == 0) {

          continue;

        }

抱歉!评论已关闭.