现在的位置: 首页 > 综合 > 正文

hadoop-*-streaming.jar源码浅析

2012年07月13日 ⁄ 综合 ⁄ 共 4966字 ⁄ 字号 评论关闭

Main-Class为HadoopStreaming,主函数:

public static void main(String[] args) throws Exception {
    int returnStatus = 0;
    StreamJob job = new StreamJob();      
    returnStatus = ToolRunner.run(job, args);
    if (returnStatus != 0) {
      System.err.println("Streaming Job Failed!");
      System.exit(returnStatus);
    }
  }


核心类为StreamJob,StreamJob实现了org.apache.hadoop.util.Tool接口,其不带参数的构造函数做了两件事,调用setupOptions()初始化其支持的所有选项,以及初始化config_。

其支持的选项有:

name desc argName max required
input DFS input file(s) for the map step path Integer.MAX_VALUE true
output DFS output directory for the reduce step path 1 true
mapper the streaming command to run cmd 1 false
combiner the streaming command to run cmd 1 false
reducer the streaming command to run cmd 1 false
file file to be shipped in the job jar file file Integer.MAX_VALUE false
dfs Optional,override DFS configuration <h:p>|local 1 false
jt Optional,override jobtracker configuration <h:p>|local 1 false
additionalconfspec Optional spec 1 false
inputformat Optional spec 1 false
outputformat Optional spec 1 false
partitioner Optional spec 1 false
numReduceTasks Optional spec 1 false
inputreader Optional spec 1 false
mapdebug Optional spec 1 false
reducedebug     Optional spec 1 false
jobconf Optional,add or override a jobconf property spec 1 false
cmdenv Pass env.var to streaming commands spec 1 false
cacheFile file name uri fileNameURI Integer.MAX_VALUE false
cacheArchive file name uri fileNameURL Integer.MAX_VALUE false
verbose print verbose output      
info print verbose output      
help print this help message      
debug print debug output      

 使用GenericOptionsParser类解析用户指定的参数,并根据用户指定的参数设置conf的属性。

StreamJob类的run(String[] args)方法负责提交job执行:

public int run(String[] args) throws Exception{

     try{

          this.argv_=args;   //用户指定的命令行参数

          init();

          preProcessArgs();

          parseArgv();

          postProcessArgs();

          setJobConf();

          return submitAndMonitorJob();

     }catch(Exception e){

     }

}

init()获取机器的环境变量信息,parseArgv()根据用户指定的参数设置一些属性,最重要的是setJobConf方法,设置job的信息:

jobConf_=new JobConf(config_);

for(int i=0;i<inputSpecs_.size();i++){

    FileInputFormat.addInputPaths(jobConf_,(String)inputSpecs_.get(i));     //设置输入路径

}

jobConf_.set("stream.numinputspecs",""+inputSpecs_.size());

jobConf_.setInputFormat(fmt);   //默认情况下fmt为TextInputFormat

jobConf_.setOutputKeyClass(Text.class);

jobConf_.setOutputValueClass(Text.class);

jobConf_.set("stream.addenvironment",addTaskEnvironment_);

jobConf_.setMapperClass(PipeMapper.class);      //可以看出stream是通过PipeMapper类来完成map任务的

jobConf_.setMapRunnerClass(PipeMapRunner.class);

jobConf_.set("stream.map.streamprocessor",URLEncoder.encode(mapCmd_,"utf-8")); //PipeMapper类根据mapCmd_启动进程

jobConf_.setCombinerClass(c);   //combiner需要是一个java类

jobConf_.setReducerClass(PipeReducer.class);    //通过PipeReducer类来完成reduce任务

jobConf_.set("stream.reduce.streamprocessor",URLEncoder.encode(redCmd_,"utf-8"));

FileOutputFormat.setOutputPath(jobConf_,new Path(output_));

jobConf_.setOutputFormat(fmt);  //默认情况下fmt为TextOutputFormat

jobConf_.setPartitionerClass(c);   //partitioner类需要是一个java类

jobConf_.setNumReduceTasks(numReduceTasks);   //设置reduce task的任务数

//如果指定了cacheArchives和cacheFiles

DistributedCache.createSymLink(jobConf_);

DistributedCache.setCacheArchives(archiveURIs,jobConf_);

DistributedCache.setCacheFiles(fileURIs,jobConf_);

PipeMapRunner类调用PipeMapper类来处理map任务,PipeMapRunner的run方法如下:

PipeMapper pipeMapper=(PipeMapper)getMapper();

pipeMapper.startOutputThreads(output,reporter);

super.run(input,output,reporter);

getMapper是PipeMapRunner从MapRunner类继承过来的,用来构造PipeMapper类的实例对象的,同时会调用对象的configure方法,configure方法会调用父类PipeMapRed类的configure方法,PipeMapper和PipeReducer类都是继承了PipeMapRed,PipeMapRed类的configure方法负责初始化一些属性,以及根据用户输入的map和reduce命令启动进程,其代码如下:

public void configure(JobConf job){

        String argv=getPipeCommand(job);   //获取命令,如果是PipeMapper,就获取map命令,否则是reduce命令

        joinDelay_=job.getLong("stream.joindelay.milli",0);

        job_=job;

        fs_=FileSystem.get(job_);

        nonZeroExitIsFailure=job_.getBoolean("stream.non.zero.exit.is.failure",true);

        doPipe_=getDoPipe();

        String[] argvSplit=splitArgs(argv);

        Environment childEnv=(Environment)StreamUtil.env().clone();

        addJobConfToEnvironment(job_,childEnv);

        addEnvironment(childEnv,job_.get("stream.addenvironment"));

        ProcessBuilder builder=new ProcessBuilder(argvSplit);   //根据用户指定的map或reduce命令,创建进程对象

        builder.environment.putAll(childEnv.toMap());

        sim=builder.start();    //启动进程

        clientOut_=new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));  // 获取进程的标准输入流

        clientIn_=new DataInputStream(new BufferedInputStream(sim.getInputStream));  // 获取进程的标准输出流

        clientErr_=new DataInputStream(new BufferedInputStream(sim.getErrorStream()));  //获取进程的标准错误流  

}

pipeMapper.startOutputThreads(output,reporter)方法启动一个线程获取map命令进程的标准输出,并做适当的处理,然后以key,value的形式作为Map任务的输出。MapRunner的run方法读取输入分片的record,对于每一条record,调用一次PipeMapper的map函数处理,

public void run(RecordReader<K1,V1> input,OutputCollector<K2,V2> output,Reporter reporter){

          K1 key=input.createKey();

          V1 value=input.createValue();

          while(input.next(key,value)){

                  mapper.map(key,value,output,reporter);

          }

}

而PipeMapper的map方法负责将接收到的key,value通过标准输入clientOut_传送给map命令进程。

PipeReducer类的configure方法根据用户指定的reduce命令启动处理进程,reduce方法将接收到的key,value通过标准输入clientOut_传送给reduce命令进程。

抱歉!评论已关闭.