现在的位置: 首页 > 综合 > 正文

Hadoop工作流引擎之JobControl

2019年11月04日 ⁄ 综合 ⁄ 共 3757字 ⁄ 字号 评论关闭

Hadoop工作流引擎之JobControl

  • Hadoop自带的工作流控制主要包括依赖关系组合式(JobControl)MapReduce和链式(Chain)MapReduce两类。
    PS:需要注意的是目前由于Hadoop有新旧两套API,分别对应源代码里的mapred和mapreduce两个包,JobControl和Chain在这两种API中的用法是不一样的,而且Hadoop1.x目前海不支持Chain的新API调用(0.21.0支持,不过它只是一个过渡版本)。
  • JobControl由两个类组成:Job和JobControl。其中,Job类封装了一个MapReduce作业及其对应的依赖关系,主要负责监控各个依赖作业的运行状态,以此更新自己的状态,其状态转移图如下图所示。作业刚开始处于WAITING状态。如果没有依赖作业或者所有依赖作业均已运行完成,则进入READY状态。一旦进入READY状态,则作业可被提交到Hadoop集群上运行,并进入RUNNING状态。在RUNNING状态下,根据作业运行情况,可能进入SUCCESS或者FAILED状态。需要注意的是,如果一个作业的依赖作业失败,则该作业也会失败,于是形成“多米诺骨牌效应”后续所有作业均会失败。

    编程示例(新API):

        ........
        Configuration conf = new Configuration();
        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path("/output1"));
     
        Configuration conf2 = new Configuration();
        Job job2 = new Job(conf2, "word count1");
        .................
     
        Configuration conf3 = new Configuration();
        Job job3 = new Job(conf3, "word count2");
        .................
     
        ControlledJob controlledJob1 = new ControlledJob(job.getConfiguration());
        controlledJob1.setJob(job);
        ControlledJob controlledJob2 = new ControlledJob(job2.getConfiguration());
        controlledJob2.setJob(job2);
        ControlledJob controlledJob3 = new ControlledJob(job2.getConfiguration());
        controlledJob3.setJob(job3);
     
        controlledJob1.addDependingJob(controlledJob2);
        controlledJob1.addDependingJob(controlledJob3);
     
        JobControl jobControl = new JobControl("test");
        jobControl.addJob(controlledJob1);
        jobControl.addJob(controlledJob2);
        jobControl.addJob(controlledJob3);
     
        Thread thread = new Thread(jobControl);
        thread.start();
     
        while(true){
            if(jobControl.allFinished())
            {
                System.out.println(jobControl.getSuccessfulJobList());
                jobControl.stop();
                System.exit(0);
            }
            if(jobControl.getFailedJobList().size() > 0)
            {
                System.out.println(jobControl.getFailedJobList());
                jobControl.stop();
                System.exit(1);
            }
        }
  • ChainMapper/ChainReducer主要为了解决线性链式Mapper 而提出的。也就是说,在Map或者Reduce阶段存在多个Mapper,这些Mapper像Linux管道一样,前一个Mapper的输出结果直接重定向到下一个Mapper 的输入,形成一个流水线,形式类似于 [MAP+REDUCE MAP*]。下图展示了一个典型的ChainMapper/ChainReducer的应用场景:在Map阶段,数据依次经过Mapper1和Mapper2处理;在Reduce阶段,数据经过shuffle
    sort后;交由对应的 Reducer处理,但Reducer处理之后并没有直接写到HDFS上,而是交给另外一个Mapper处理,它产生的结果写到最终的HDFS输出目录中。

    PS:需要注意的是,对于任意一个MapReduce作业,Map和Reduce阶段可以有无限个Mapper,但Reducer只能有一个也就是说,下图所示的计算过程不能使用ChainMapper/ChainReducer完成,而需要分解成两个MapReduce 作业。

    新API编程示例(1.x不支持),摘自Hadoop API说明文档:

    ...
    Job = new Job(conf);
     
    Configuration mapAConf = new Configuration(false);
    ...
    ChainMapper.addMapper(job, AMap.class, LongWritable.class, Text.class,
       Text.class, Text.class, true, mapAConf);
     
    Configuration mapBConf = new Configuration(false);
    ...
    ChainMapper.addMapper(job, BMap.class, Text.class, Text.class,
       LongWritable.class, Text.class, false, mapBConf);
     
    ...
     
    job.waitForComplettion(true);
    ...

    ...
    Job = new Job(conf);
    ....
     
    Configuration reduceConf = new Configuration(false);
    ...
    ChainReducer.setReducer(job, XReduce.class, LongWritable.class, Text.class,
       Text.class, Text.class, true, reduceConf);
     
    ChainReducer.addMapper(job, CMap.class, Text.class, Text.class,
       LongWritable.class, Text.class, false, null);
     
    ChainReducer.addMapper(job, DMap.class, LongWritable.class, Text.class,
       LongWritable.class, LongWritable.class, true, null);
     
    ...
     
    job.waitForCompletion(true);
    ...
  • 简单对比
    Hadoop自带的JobControl和Chain的工作流控制可以处理一些简单的工作流控制,同时相对与Azkaban和Oozie它不需要安装,使用简单方便。但是它的功能远远不如后两者,比较明显的缺点是它不可以跟踪运行进度(仅能跟踪是否结束),没有重试功能,没有定时执行功能,并且只能控制用Java写的Mapeduce任务(不支持Pig,Shell等)。
    总体来讲Hadoop自带的工作流控制还是有用的,能满足一些简单的常规使用,可以作为Azkaban或Oozie的补充来使用。

抱歉!评论已关闭.