Hadoop提供的两大核心HDFS和Map/Reduce,这里先拿MapReduce来分析
1:实例程序,WordCount是一个典型的实例
所采用的为0.20.2之后的Mapper和Reducer
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
从中可以看到Mapper,Reducer,Job作为核心的基类或者接口
查看源码org.apache.hadoop.mapreduce.Mapper.java
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>{}
内置一个Context抽象类,一个setup(Context),cleanup(Context),map(KEYIN,VALUEIN,Context);
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
作为启动方法定义在run(Context)
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
分析org.apache.hadoop.mapreduce.Reducer
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {}
核心方法属性如下
类似Mapper,不同之处方法为Reducer
看下reduce
@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
cleanup(context);
}
从方法上来看,Reducer貌似包含更多的内容
查看包org.apache.hadoop.mapreduce.Job
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class Job extends JobContextImpl implements JobContext {}
该类是个重量级类斤1400多行代码
首先看看接口JobContext.java
该接口继承于MRJobConfig一个纯配置常量定义类
public interface JobContext extends MRJobConfig {}
接口定义包括如下
最后两个方法为
getMaxMapAttempts()
gtMaxReduceAttempts()
另外一个接口为:Progressable.java
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Progressable {
public void progress();
}
关注TaskAttemptContext.java进一步继承
public interface TaskAttemptContext extends JobContext, Progressable {}
public TaskAttemptID getTaskAttemptID();
public void setStatus(String msg);
public String getStatus();
public abstract float getProgress();
public Counter getCounter(Enum<?> counterName);
public Counter getCounter(String groupName, String counterName);
}
针对Mapper中包括一个MapContext.java
public interface MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
public interface TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends TaskAttemptContext.
作为JobContext的直接实现子类JobContextImpl.java
public class JobContextImpl implements JobContext {}
内置属性包括如下:
protected final org.apache.hadoop.mapred.JobConf conf;
private JobID jobId;
protected UserGroupInformation ugi;
protected final Credentials credentials;
进一步分析Job内的代码
static {
ConfigUtil.loadResources();
}
private JobState state = JobState.DEFINE;
private JobStatus status;
private long statustime;
private Cluster cluster;
部分构造函数
Job(JobConf conf) throws IOException {
super(conf, null);
// propagate existing user credentials to job
this.credentials.mergeAll(this.ugi.getCredentials());
this.cluster = null;
}
Job(JobStatus status, JobConf conf) throws IOException {
this(conf);
setJobID(status.getJobID());
this.status = status;
state = JobState.RUNNING;
}
getInstance存在多个方法
两个同步刷新状态和更新状态方法
synchronized void ensureFreshStatus()
throws IOException, InterruptedException {
if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
updateStatus();
}
}
/** Some methods need to update status immediately. So, refresh
* immediately
* @throws IOException
*/
synchronized void updateStatus() throws IOException, InterruptedException {
this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
@Override
public JobStatus run() throws IOException, InterruptedException {
return cluster.getClient().getJobStatus(status.getJobID());
}
});
if (this.status == null) {
throw new IOException("Job status not available ");
}
this.statustime = System.currentTimeMillis();
}
一个toString()方法会显示对应的运行状态
@Override
public String toString() {
ensureState(JobState.RUNNING);
String reasonforFailure = " ";
int numMaps = 0;
int numReduces = 0;
try {
updateStatus();
if (status.getState().equals(JobStatus.State.FAILED))
reasonforFailure = getTaskFailureEventString();
numMaps = getTaskReports(TaskType.MAP).length;
numReduces = getTaskReports(TaskType.REDUCE).length;
} catch (IOException e) {
} catch (InterruptedException ie) {
}
StringBuffer sb = new StringBuffer();
sb.append("Job: ").append(status.getJobID()).append("\n");
sb.append("Job File: ").append(status.getJobFile()).append("\n");
sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
sb.append("\n");
sb.append("Uber job : ").append(status.isUber()).append("\n");
sb.append("Number of maps: ").append(numMaps).append("\n");
sb.append("Number of reduces: ").append(numReduces).append("\n");
sb.append("map() completion: ");
sb.append(status.getMapProgress()).append("\n");
sb.append("reduce() completion: ");
sb.append(status.getReduceProgress()).append("\n");
sb.append("Job state: ");
sb.append(status.getState()).append("\n");
sb.append("retired: ").append(status.isRetired()).append("\n");
sb.append("reason for failure: ").append(reasonforFailure);
return sb.toString();
}
获取任务报告
public TaskReport[] getTaskReports(TaskType type)
throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
final TaskType tmpType = type;
return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() {
public TaskReport[] run() throws IOException, InterruptedException {
return cluster.getClient().getTaskReports(getJobID(), tmpType);
}
});
}
获取进度
public float mapProgress() throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
ensureFreshStatus();
return status.getMapProgress();
}
/**
* Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0
* and 1.0. When all reduce tasks have completed, the function returns 1.0.
*
* @return the progress of the job's reduce-tasks.
* @throws IOException
*/
public float reduceProgress() throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
ensureFreshStatus();
return status.getReduceProgress();
}
关注执行开始方法
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
更多内容,继续分析中......