现在的位置: 首页 > 云计算 > 正文

Hadoop源码分析之-MapReduce篇

2013年09月03日 云计算 ⁄ 共 8505字 ⁄ 字号 评论关闭

Hadoop提供的两大核心HDFS和Map/Reduce,这里先拿MapReduce来分析

1:实例程序,WordCount是一个典型的实例

所采用的为0.20.2之后的Mapper和Reducer

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{
   
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
     
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException
{
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
 
  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException
{
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

从中可以看到Mapper,Reducer,Job作为核心的基类或者接口

查看源码org.apache.hadoop.mapreduce.Mapper.java

@InterfaceAudience.Public
@InterfaceStability.Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>{}

内置一个Context抽象类,一个setup(Context),cleanup(Context),map(KEYIN,VALUEIN,Context);

protected void map(KEYIN key, VALUEIN value,
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }

作为启动方法定义在run(Context)

public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKeyValue()) {
      map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
  }

分析org.apache.hadoop.mapreduce.Reducer

@InterfaceAudience.Public
@InterfaceStability.Stable
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {}

核心方法属性如下

类似Mapper,不同之处方法为Reducer

看下reduce

@SuppressWarnings("unchecked")
  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  }

 public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKey()) {
      reduce(context.getCurrentKey(), context.getValues(), context);
      // If a back up store is used, reset it
      Iterator<VALUEIN> iter = context.getValues().iterator();
      if(iter instanceof ReduceContext.ValueIterator) {
        ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();       
      }
    }
    cleanup(context);
  }

从方法上来看,Reducer貌似包含更多的内容

查看包org.apache.hadoop.mapreduce.Job

@InterfaceAudience.Public
@InterfaceStability.Evolving
public class Job extends JobContextImpl implements JobContext {}

该类是个重量级类斤1400多行代码

首先看看接口JobContext.java

该接口继承于MRJobConfig一个纯配置常量定义类

public interface JobContext extends MRJobConfig {}

接口定义包括如下

最后两个方法为

getMaxMapAttempts()

gtMaxReduceAttempts()

另外一个接口为:Progressable.java

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Progressable {
  public void progress();
}

关注TaskAttemptContext.java进一步继承

public interface TaskAttemptContext extends JobContext, Progressable {}

  public TaskAttemptID getTaskAttemptID();
  public void setStatus(String msg);
  public String getStatus();
  public abstract float getProgress();
  public Counter getCounter(Enum<?> counterName);
  public Counter getCounter(String groupName, String counterName);

}

针对Mapper中包括一个MapContext.java

public interface MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
  extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

public interface TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
       extends TaskAttemptContext.

 

作为JobContext的直接实现子类JobContextImpl.java

public class JobContextImpl implements JobContext {}

内置属性包括如下:

   protected final org.apache.hadoop.mapred.JobConf conf;
  private JobID jobId;
  protected UserGroupInformation ugi;
  protected final Credentials credentials;

进一步分析Job内的代码

static {
    ConfigUtil.loadResources();
  }

  private JobState state = JobState.DEFINE;
  private JobStatus status;
  private long statustime;
  private Cluster cluster;

部分构造函数

Job(JobConf conf) throws IOException {
    super(conf, null);
    // propagate existing user credentials to job
    this.credentials.mergeAll(this.ugi.getCredentials());
    this.cluster = null;
  }

  Job(JobStatus status, JobConf conf) throws IOException {
    this(conf);
    setJobID(status.getJobID());
    this.status = status;
    state = JobState.RUNNING;
  }

getInstance存在多个方法

两个同步刷新状态和更新状态方法

synchronized void ensureFreshStatus()
      throws IOException, InterruptedException {
    if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
      updateStatus();
    }
  }
   
  /** Some methods need to update status immediately. So, refresh
   * immediately
   * @throws IOException
   */
  synchronized void updateStatus() throws IOException, InterruptedException {
    this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      @Override
      public JobStatus run() throws IOException, InterruptedException {
        return cluster.getClient().getJobStatus(status.getJobID());
      }
    });
    if (this.status == null) {
      throw new IOException("Job status not available ");
    }
    this.statustime = System.currentTimeMillis();
  }

一个toString()方法会显示对应的运行状态

@Override
  public String toString() {
    ensureState(JobState.RUNNING);
    String reasonforFailure = " ";
    int numMaps = 0;
    int numReduces = 0;
    try {
      updateStatus();
      if (status.getState().equals(JobStatus.State.FAILED))
        reasonforFailure = getTaskFailureEventString();
      numMaps = getTaskReports(TaskType.MAP).length;
      numReduces = getTaskReports(TaskType.REDUCE).length;
    } catch (IOException e) {
    } catch (InterruptedException ie) {
    }
    StringBuffer sb = new StringBuffer();
    sb.append("Job: ").append(status.getJobID()).append("\n");
    sb.append("Job File: ").append(status.getJobFile()).append("\n");
    sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
    sb.append("\n");
    sb.append("Uber job : ").append(status.isUber()).append("\n");
    sb.append("Number of maps: ").append(numMaps).append("\n");
    sb.append("Number of reduces: ").append(numReduces).append("\n");
    sb.append("map() completion: ");
    sb.append(status.getMapProgress()).append("\n");
    sb.append("reduce() completion: ");
    sb.append(status.getReduceProgress()).append("\n");
    sb.append("Job state: ");
    sb.append(status.getState()).append("\n");
    sb.append("retired: ").append(status.isRetired()).append("\n");
    sb.append("reason for failure: ").append(reasonforFailure);
    return sb.toString();
  }

获取任务报告

 public TaskReport[] getTaskReports(TaskType type)
      throws IOException, InterruptedException {
    ensureState(JobState.RUNNING);
    final TaskType tmpType = type;
    return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() {
      public TaskReport[] run() throws IOException, InterruptedException {
        return cluster.getClient().getTaskReports(getJobID(), tmpType);
      }
    });
  }

获取进度

 public float mapProgress() throws IOException, InterruptedException {
    ensureState(JobState.RUNNING);
    ensureFreshStatus();
    return status.getMapProgress();
  }

  /**
   * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0
   * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
   *
   * @return the progress of the job's reduce-tasks.
   * @throws IOException
   */
  public float reduceProgress() throws IOException, InterruptedException {
    ensureState(JobState.RUNNING);
    ensureFreshStatus();
    return status.getReduceProgress();
  }

关注执行开始方法

 public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();
    }
    if (verbose) {
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis =
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }
 更多内容,继续分析中......

 

 

 

 

 

抱歉!评论已关闭.