Main-Class为HadoopStreaming,主函数:
public static void main(String[] args) throws Exception {
int returnStatus = 0;
StreamJob job = new StreamJob();
returnStatus = ToolRunner.run(job, args);
if (returnStatus != 0) {
System.err.println("Streaming Job Failed!");
System.exit(returnStatus);
}
}
核心类为StreamJob,StreamJob实现了org.apache.hadoop.util.Tool接口,其不带参数的构造函数做了两件事,调用setupOptions()初始化其支持的所有选项,以及初始化config_。
其支持的选项有:
name | desc | argName | max | required |
input | DFS input file(s) for the map step | path | Integer.MAX_VALUE | true |
output | DFS output directory for the reduce step | path | 1 | true |
mapper | the streaming command to run | cmd | 1 | false |
combiner | the streaming command to run | cmd | 1 | false |
reducer | the streaming command to run | cmd | 1 | false |
file | file to be shipped in the job jar file | file | Integer.MAX_VALUE | false |
dfs | Optional,override DFS configuration | <h:p>|local | 1 | false |
jt | Optional,override jobtracker configuration | <h:p>|local | 1 | false |
additionalconfspec | Optional | spec | 1 | false |
inputformat | Optional | spec | 1 | false |
outputformat | Optional | spec | 1 | false |
partitioner | Optional | spec | 1 | false |
numReduceTasks | Optional | spec | 1 | false |
inputreader | Optional | spec | 1 | false |
mapdebug | Optional | spec | 1 | false |
reducedebug | Optional | spec | 1 | false |
jobconf | Optional,add or override a jobconf property | spec | 1 | false |
cmdenv | Pass env.var to streaming commands | spec | 1 | false |
cacheFile | file name uri | fileNameURI | Integer.MAX_VALUE | false |
cacheArchive | file name uri | fileNameURL | Integer.MAX_VALUE | false |
verbose | print verbose output | |||
info | print verbose output | |||
help | print this help message | |||
debug | print debug output |
使用GenericOptionsParser类解析用户指定的参数,并根据用户指定的参数设置conf的属性。
StreamJob类的run(String[] args)方法负责提交job执行:
public int run(String[] args) throws Exception{
try{
this.argv_=args; //用户指定的命令行参数
init();
preProcessArgs();
parseArgv();
postProcessArgs();
setJobConf();
return submitAndMonitorJob();
}catch(Exception e){
}
}
init()获取机器的环境变量信息,parseArgv()根据用户指定的参数设置一些属性,最重要的是setJobConf方法,设置job的信息:
jobConf_=new JobConf(config_);
for(int i=0;i<inputSpecs_.size();i++){
FileInputFormat.addInputPaths(jobConf_,(String)inputSpecs_.get(i)); //设置输入路径
}
jobConf_.set("stream.numinputspecs",""+inputSpecs_.size());
jobConf_.setInputFormat(fmt); //默认情况下fmt为TextInputFormat
jobConf_.setOutputKeyClass(Text.class);
jobConf_.setOutputValueClass(Text.class);
jobConf_.set("stream.addenvironment",addTaskEnvironment_);
jobConf_.setMapperClass(PipeMapper.class); //可以看出stream是通过PipeMapper类来完成map任务的
jobConf_.setMapRunnerClass(PipeMapRunner.class);
jobConf_.set("stream.map.streamprocessor",URLEncoder.encode(mapCmd_,"utf-8")); //PipeMapper类根据mapCmd_启动进程
jobConf_.setCombinerClass(c); //combiner需要是一个java类
jobConf_.setReducerClass(PipeReducer.class); //通过PipeReducer类来完成reduce任务
jobConf_.set("stream.reduce.streamprocessor",URLEncoder.encode(redCmd_,"utf-8"));
FileOutputFormat.setOutputPath(jobConf_,new Path(output_));
jobConf_.setOutputFormat(fmt); //默认情况下fmt为TextOutputFormat
jobConf_.setPartitionerClass(c); //partitioner类需要是一个java类
jobConf_.setNumReduceTasks(numReduceTasks); //设置reduce task的任务数
//如果指定了cacheArchives和cacheFiles
DistributedCache.createSymLink(jobConf_);
DistributedCache.setCacheArchives(archiveURIs,jobConf_);
DistributedCache.setCacheFiles(fileURIs,jobConf_);
PipeMapRunner类调用PipeMapper类来处理map任务,PipeMapRunner的run方法如下:
PipeMapper pipeMapper=(PipeMapper)getMapper();
pipeMapper.startOutputThreads(output,reporter);
super.run(input,output,reporter);
getMapper是PipeMapRunner从MapRunner类继承过来的,用来构造PipeMapper类的实例对象的,同时会调用对象的configure方法,configure方法会调用父类PipeMapRed类的configure方法,PipeMapper和PipeReducer类都是继承了PipeMapRed,PipeMapRed类的configure方法负责初始化一些属性,以及根据用户输入的map和reduce命令启动进程,其代码如下:
public void configure(JobConf job){
String argv=getPipeCommand(job); //获取命令,如果是PipeMapper,就获取map命令,否则是reduce命令
joinDelay_=job.getLong("stream.joindelay.milli",0);
job_=job;
fs_=FileSystem.get(job_);
nonZeroExitIsFailure=job_.getBoolean("stream.non.zero.exit.is.failure",true);
doPipe_=getDoPipe();
String[] argvSplit=splitArgs(argv);
Environment childEnv=(Environment)StreamUtil.env().clone();
addJobConfToEnvironment(job_,childEnv);
addEnvironment(childEnv,job_.get("stream.addenvironment"));
ProcessBuilder builder=new ProcessBuilder(argvSplit); //根据用户指定的map或reduce命令,创建进程对象
builder.environment.putAll(childEnv.toMap());
sim=builder.start(); //启动进程
clientOut_=new DataOutputStream(new BufferedOutputStream(sim.getOutputStream())); // 获取进程的标准输入流
clientIn_=new DataInputStream(new BufferedInputStream(sim.getInputStream)); // 获取进程的标准输出流
clientErr_=new DataInputStream(new BufferedInputStream(sim.getErrorStream())); //获取进程的标准错误流
}
pipeMapper.startOutputThreads(output,reporter)方法启动一个线程获取map命令进程的标准输出,并做适当的处理,然后以key,value的形式作为Map任务的输出。MapRunner的run方法读取输入分片的record,对于每一条record,调用一次PipeMapper的map函数处理,
public void run(RecordReader<K1,V1> input,OutputCollector<K2,V2> output,Reporter reporter){
K1 key=input.createKey();
V1 value=input.createValue();
while(input.next(key,value)){
mapper.map(key,value,output,reporter);
}
}
而PipeMapper的map方法负责将接收到的key,value通过标准输入clientOut_传送给map命令进程。
PipeReducer类的configure方法根据用户指定的reduce命令启动处理进程,reduce方法将接收到的key,value通过标准输入clientOut_传送给reduce命令进程。