现在的位置: 首页 > 云计算 > 正文

通过Hadoop的API管理Job

2013年02月24日 云计算 ⁄ 共 5038字 ⁄ 字号 评论关闭
 

一、背景

前些时候写了一篇这样的文档,由于当时的时间比较紧,且有些细节内容不太好细说,于是写的比较粗。之后也有些朋友发邮件给我,问我要详细的过程以及管理方式。于是,今天就像把这个内容细化,写在这里,供大家参考。

二、环境简述

Ø  操作系统Linux、JDK1.6

Ø  Hadoop 0.20.2

Ø  开发工具选择eclipse 3.3.2(可配合使用hadoop的插件)

三、需求

首先还是要说一下需求,为什么要用hadoop的API来对Job进行管理。对此,我列举出了以下需求内容:

1、Job之间有先后的顺序执行问题(一个业务可能是多个Job才能完成)。

2、需要对每个Job的状态进行监控(是否完成以及失败情况处理等等)

3、有些无先后依赖关系的Job可以并发执行。

4、每个Job执行时的信息采集和记录。

5、能够灵活添加或删除要执行的Job队列。

如果以上需求去掉2和4,那么,我们通过脚本就可以做到(如shell)。但是如果要获取Job的详细信息以及Job运行时的状态,那么还是需要调用Hadoop的API来实现。所以,这里我选择了Java语言来实现以上需求。

四、设计思路

这里的设计必须要满足以上的需求(名字暂定为pipeline),设计内容大体如下:

Ø  通过周期的遍历时间,获得Job队列启动时间,并在启动之前加载Job配置文件。

Ø  通过配置Job的列表来确定Job执行的先后顺序以及哪些可以并发哪些不能并发。

Ø  通过JobClinet来采集相关的Job信息。

Ø  对失败的Job有重新尝试执行的机制。

因为考虑到pipeline是和Job的MR代码是剥离的,不能存在于一个工程下,这样,才能实现MR的灵活增删。那么,我们还要设计如何通过pipeline来管理MR生成好的JAR包。下面,我们将就以上思路来逐步设计。

五、配置文件

首先是配置文件,如果要满足以上的设计思路,那么需要2个配置文件。一个是pipeline自身的配置文件,包含了周期遍历时间、pipeline启动时间、任务失败尝试次数、最大并发任务数、调度模式(FIFO还是FAIR)以及日志输出目录。将这个配置文件命名为pipeline.ini,见下图:

上面的是pipeline自己的配置文件,那么,接下来我们还要考虑,如何将JAR加载到pipeline中,从而按照我们制定的顺序启动。我们设计了一下配置文件,暂定pipeline.joblist,这个配置文件通过序号来区分哪些任务是需要顺序执行,哪些任务是可以并发执行的。例如当前有5个任务,分别是A,B,C,D,E。AB是一个业务,CDE是一个业务。那么配置文件如下(字段间以\t划分):

01001 A 后面是输入、输出以及自定义的一些参数。

01002 B 后面是输入、输出以及自定义的一些参数。

02001 C 后面是输入、输出以及自定义的一些参数。

02002 D 后面是输入、输出以及自定义的一些参数。

02003 E 后面是输入、输出以及自定义的一些参数。

可以通过编号看出,01是一个业务,02是一个业务。01或者02开头的job是需要顺序执行的.但是开头不同的,是可以并发执行的。

六、启动Job

到这里为止,基本配置文件就设计完成了,后面我们需要考虑代码如何编写。其实,在这个过程中,就是一个Timer类,然后根据配置参数来做不停的时间遍历,直到到了配置的启动时间,那么加载pipeline.joblist文件。然后执行Job。以上的过程如果是FIFO模式,是很好实现的,直接的顺序执行就可以了。启动job实际上还是调用的org.apache.hadoop.util.RunJar.main(args[])方法。

由于是在Timer下调用,所以如果执行FIFO模式的Job,需要用到线程阻塞的模式来控制。关键代码如下:

// 线程阻塞,直到执行结束

jobClient.getJob(jobID).waitForCompletion();

上面的代码用了到JobClient和JobID,后面我会说明。OK,到这里为止,FIFO的调度模式已经比较清晰了。接下来我们说说FAIR的模式。由于FAIR涉及到了并发,所以,这里需要考虑采用多线程的模式来启动任务。在这里,我采用了Semaphore类。代码直接贴出来:

private void randRunJob(final ArrayList<ArrayList<String>> serJobList) {

       // 线程池

       ExecutorService exec_es = Executors.newCachedThreadPool();

       // 启动信号器

       for (int i = 0; i < serJobList.size(); i++) {

           final ArrayList<String> temp_list = serJobList.get(i);

           Runnable maxJob_runnable = new Runnable() {

              public void run() {

                  try {

                     ArrayList<String> job_list = temp_list;

                     // 注册一个信号计数器

                     semaphore.acquire();

                     Utils.RunJob(job_list, conf, jobStatus, jobClient);

                     // 释放这个信号计数器

                     semaphore.release();

                  } catch (InterruptedException e) {

                     Log.writeLog(e.getMessage(), Constant.systemLog_int);

                     return;

                  }

              }

           };

           exec_es.execute(maxJob_runnable);

       }

       // 退出线程池

       exec_es.shutdown();

    }

可以看到,输入的参数是个list嵌套的list。第一层list包含了2个list,里面分别存放的是A,B/C,D,E。OK,这样可以比较清晰的看到通过不同的信号量来启动2个第一层list里面的任务。

七、Job的监控

前面我们都说了如何加载Job列表以及采用不同的方式来启动启动Job。这里,我将说下如何采用JobClinet来获取Job的一些信息。前提是已知Job的name。那么,接下来我贴上几个函数。

/**

     * 根据JobName获取JobID

     *

     * @param jobName_str

     * @return JobID

     */

    public static JobID getJobIDByJobName(JobClient jobClient, JobStatus[] jobStatus, String jobName_str) {

       JobID jobID = null;

       try {

           for (int i = 0; i < jobStatus.length; i++) {

              RunningJob rj = jobClient.getJob(jobStatus[i].getJobID());

              if (rj.getJobName().trim().equals(jobName_str)) {

                  jobID = jobStatus[i].getJobID();

                  break;

              }

           }

       } catch (IOException e) {

           Log.writeLog(e.getMessage(), Constant.systemLog_int);

       }

       return jobID;

    }

 

/**

     * 根据JobID获取Job状态

     *

     * @param jobClient

     * @param jobStatus

     * @param jobID

     * @return RUNNING = 1,SUCCEEDED = 2,FAILED = 3,PREP = 4,KILLED = 5

     * @throws IOException

     */

    public static String getStatusByJobID(JobClient jobClient, JobStatus[] jobStatus, JobID jobID) throws IOException {

       int status_int = 0;

       jobStatus = jobClient.getAllJobs();

       for (int i = 0; i < jobStatus.length; i++) {

           if (jobStatus[i].getJobID().getId() == jobID.getId()) {

              status_int = jobStatus[i].getRunState();

              break;

           }

       }

 

       String desc_str = "";

       switch (status_int) {

       case 1:

           desc_str = "RUNNING";

           break;

       case 2:

           desc_str = "SUCCEEDED";

           break;

       case 3:

           desc_str = "FAILED";

           break;

       case 4:

           desc_str = "PREP";

           break;

       case 5:

           desc_str = "KILLED";

           break;

       default:

           break;

       }

       return desc_str;

    }

 

/**

     * 获取正在运行的JobID的列表

     *

     * @param jobClient

     * @return ArrayList<JobID>

     */

    public static ArrayList<JobID> getRunningJobList(JobClient jobClient) {

       ArrayList<JobID> runningJob_list = new ArrayList<JobID>();

       JobStatus[] js;

       try {

           js = jobClient.getAllJobs();

           for (int i = 0; i < js.length; i++) {

              if (js[i].getRunState() == JobStatus.RUNNING || js[i].getRunState() == JobStatus.PREP) {

                  runningJob_list.add(js[i].getJobID());

              }

           }

       } catch (IOException e) {

           Log.writeLog(e.getMessage(), Constant.systemLog_int);

       }

       return runningJob_list;

    }

上面的类还有很多的其他的API可以调用,查看API文档就可以看到。其实通过API还可以做到对某一个TASK的监控,不仅仅是Job的监控。

 

抱歉!评论已关闭.