现在的位置: 首页 > 综合 > 正文

Hadoop工作流引擎之Oozie3.3.2

2019年11月04日 ⁄ 综合 ⁄ 共 28283字 ⁄ 字号 评论关闭

Hadoop工作流引擎之Oozie3.3.2

Edit

介绍

Oozie是一个由Yahoo开发,用于运行Hadoop工作流的开源工作流引擎。作为一个Java Web程序,它运行在Java Servlet容器中,如Tomcat,并且使用数据库来存储Hadoop工作流的定义和当前运行实例包括实例的状态和变量等。Oozie目前支持的任务包括M/R Job,Streaming Job ,Pig,Hive,Distcp,PoJo Java,SSH,Sqoop,开发人员也可以通过写插件自定义任务类型。

Oozie可以通过命令、Java API、Web UI(只能查看Job状态,不能提交Job)和Web Services API四种方式对外提供服务,其中前两种较常用。

Oozie系统主要包括工作流和协调系统两个概念。

  • workflow定义是一个带 流程控制节点(start、end、decision、fork、join、kill)和 action节点(map-reduce、pig等)的有向图(DAG)。workflow definition language基于XML,被称为HPDL(Hadoop Process Definition Language)。Oozie不支持在流程定义中循环,流程定义必须是严格的有向图。在workflow应用发布时,如果oozie探测到在wokflow定义中存在环,则发布失败。所有由动作节点触发的计算和处理任务都不在Oozie之中——它们是由Hadoop的Map/Reduce框架执行的,这种方法让Oozie可以支持现存的Hadoop用于负载平衡、灾难恢复的机制。相关细节可以参考http://oozie.apache.org/docs/3.3.2/WorkflowFunctionalSpec.html
  • Oozie协调系统目前支持时间上的定时执行和数据可用来激活任务调度两种方式。一些工作流是根据需要(基于一定的时间段和(或)数据可用性和(或)外部事件)来运行它们。Oozie协调系统(Coordinator system)让用户可以基于这些参数来定义工作流执行计划。Oozie协调程序让我们可以以谓词的方式对工作流执行触发器进行建模,可以指向数据、事件和(或)外部事件。工作流作业会在谓词得到满足的时候启动。经常我们还需要连接定时运行、但时间间隔不同的工作流操作。多个随后运行的工作流的输出会成为下一个工作流的输入。把这些工作流连接在一起,会让系统把它作为数据应用的管道来引用。Oozie协调程序支持创建这样的数据应用管道。
    关于coordinator的使用有很多细节,相关细节可以参考http://oozie.apache.org/docs/3.3.2/CoordinatorFunctionalSpec.html#a6._Coordinator_Application

Oozie工作方式是启动一个mapper任务(这个任务启动真正任务)放在cluster上面运行。大致上来看的话需要几个文件:

  • workflow.xml ozzie读取它来知道每个任务DAG如何并且失败以及成功之后如何处理。需要提交到hdfs上面。
  • coordinator.xml 如果是coordinator模式的话,还需要这个文件。需要提交到hdfs上面。
  • job.properties 这个用来存放一些任务相关的参数等。这个部分其实可以和workflow.xml放在一起,但是分离出来的话可以方便分离。本地使用。
  • lib 目录下面存放启动启动需要的库文件比如jar或者是so等。需要提交到hdfs上面。
    然后我们需要将这些文件(job.properties不许要上传)提交到hdfs上面,然后使用ozzie启动。此外oozie也有local-mode方便调试和测试。

Oozie执行分为三种模式:

  • workflow. 这种方式非常简单,就是定义DAG来执行。
  • coordinator. workflow缺点非常明显,就是没有办法定时触发或者是条件触发。coordinator可以完成这个需求。coordinator构建在workflow工作方式上面,可以定时运行也可以触发运行。
    触发条件非常巧妙,提供一个叫做synchronous dataset的数据集。这个数据集其实就是一个url(文件系统url),不过这个url通过时间来进行区分。比如hdfs://foo:9000/usr/logs/2009/04/15/23/30.
  • bundle. bundle的作用就是将多个coordinator管理起来。这样我们只需要提供一个bundle提交即可。然后可以start/stop/suspend/resume任何coordinator.

其他相关细节可参考Oozie主页http://oozie.apache.org/docs/3.3.2/index.html

Edit

安装步骤

Edit

首先准备安装Oozie需要的软件

官方要求
Unix box (tested on Mac OS X and Linux)
Java JDK 1.6+
Maven 3.0.1+
Hadoop 0.20.2+
Pig 0.7+


我采用的软件
Linux Ubuntu12.04
Java JDK 1.7.0_17
apache-maven-3.0.5
Hadoop 1.0.4
Pig pig-0.11.1
Mysql 5.5.31(oozie默认使用Derby数据库)
ext2.2(可选,访问Web UI需要用)


相关环境变量的配置如下:
#JAVA
JAVA_HOME=/home/supertool/soft/jdk1.7.0_17
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
export JAVA_HOME
export PATH 
export CLASSPATH
#hadoop
HADOOP_HOME=/home/supertool/soft/hadoop-1.0.4
PATH=$HADOOP_HOME/bin:$PATH
CLASSPATH=$CLASSPATH:$HADOOP_HOME:$HADOOP_HOME/lib
export HADOOP_HOME
export PATH
export CLASSPATH
#maven
export MVN_HOME=/home/supertool/soft/apache-maven-3.0.5
#oozie
OOZIE_HOME=/home/supertool/soft/oozie-3.3.2/distro/target/oozie-3.3.2-distro/oozie-3.3.2
PATH=$OOZIE_HOME/bin:$MVN_HOME/BIN:$PATH
export OOZIE_HOME PATH
export OOZIE_URL=http://localhost:11000/oozie
#pig
export PIG_HOME=/home/supertool/soft/pig-0.11.1
export PIG_CLASSPATH=$HADOOP_HOME/conf
export PATH=$PIG_HOME/bin:$PIG_CLASSPATH:$PATH
Edit

编译Oozie

编译只需要一条命令。需要注意的是目前不支持Hadoop 1.1.1版本,而且Hadoop 2.x也存在风险。此外编译需要下载的软件很多,可能需要较长时间(我第一次用了三个多小时)

$ bin/mkdistro.sh -DskipTests


我编译完的 主目录地址

/home/supertool/soft/oozie-3.3.2/distro/target/oozie-3.3.2-distro/oozie-3.3.2

Edit

安装Mysql

安装Mysql 具体安装过程可参考相关资料,比如http://ifalone.me/305.html ,http://dev.mysql.com/doc/index.html

  • 为Oozie创建数据库,其中数据库名字不一定是oozie
    mysql> CREATE DATABASE oozie;
  • 创建oozie数据库的用户,其中用户名字不一定是oozie
    mysql> CREATE USER 'username'@'%' IDENTIFIED BY 'password';
  • 增加oozie用户对oozie数据库的增删改查权限
    mysql> GRANT SELECT,INSERT,UPDATE,DELETE ON <database>.* to '<username>'@'%' WITH GRANT OPTION;
  • 配置完重启MYSQL
    sudo /sbin/service mysqld restart
Edit

安装Oozie

  • 修改oozie-site.xml文件
        <property>
            <name>oozie.db.schema.name</name>
            <value>oozie</value>
            <description>
                Oozie DataBase Name
            </description>
        </property>
     
        <property>
            <name>oozie.service.JPAService.create.db.schema</name>
            <value>false</value>
            <description>
                Creates Oozie DB.
     
                If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
                If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
            </description>
        </property>
     
        <property>
            <name>oozie.service.JPAService.jdbc.driver</name>
            <value>com.mysql.jdbc.Driver</value>
            <description>
                JDBC driver class.
            </description>
        </property>
        <property>
            <name>oozie.service.JPAService.jdbc.url</name>
            <value>jdbc:mysql://localhost:3306/${oozie.db.schema.name}?seUnicode=true;characterEncoding=UTF-8</value>
            <description>
                JDBC URL.
            </description>
        </property>
     
        <property>
            <name>oozie.service.JPAService.jdbc.username</name>
            <value>oozie</value>
            <description>
                DB user name.
            </description>
        </property>
     
        <property>
            <name>oozie.service.JPAService.jdbc.password</name>
            <value>oozie</value>
            <description>
                DB user password.
     
                IMPORTANT: if password is emtpy leave a 1 space string, the service trims the value,
                           if empty Configuration assumes it is NULL.
            </description>
        </property>
  • 修改Hadoop配置文件Hadoop core-site.xml
      <!-- OOZIE -->
      <property>
        <name>hadoop.proxyuser.[OOZIE_SERVER_USER].hosts</name>
        <value>[OOZIE_SERVER_HOSTNAME]</value>
      </property>
      <property>
        <name>hadoop.proxyuser.[OOZIE_SERVER_USER].groups</name>
        <value>[USER_GROUPS_THAT_ALLOW_IMPERSONATION]</value>
      </property>
  • 修改主目录地址oozie-3.3.2/bin文件夹下的addtowar.sh文件
      elif [ "${version}" = "0.20.200" ]; then
        #List is separated by ":"
        hadoopJars="hadoop-core*.jar:jackson-core-asl-*.jar:jackson-mapper-asl-*.jar:commons-configuration-*.jar"
      elif [ "${version}" = "1.0.4" ]; then
        #List is separated by ":"
        hadoopJars="hadoop-core*.jar:jackson-core-asl-*.jar:jackson-mapper-asl-*.jar:commons-configuration-*.jar"
     
  • 在主目录地址oozie-3.3.2下创建libext目录
    拷贝$HADOOP_HOME/lib下的所有jar包到libext下
    拷贝ext-2.2.zip到libext下
    拷贝mysql-connector-java-*.*.*-bin.jar到libext下

    拷贝完执行bin/oozie-setup.sh prepare-war –extjs [ext2.2的路径] –hadoop [hadoop verson] [hadoop path]命令
    例如bin/oozie-setup.sh -hadoop 1.0.4 /home/supertool/soft/hadoop-1.0.4 -extjs /home/supertool/soft/oozie-3.3.2/distro/target/oozie-3.3.2-distro/oozie-3.3.2/libext/ext-2.2.zip

    创建ooziedb表结构

    bin/ooziedb.sh create -sqlfile oozie.sql -run DB Connection

    在主目录地址oozie-3.3.2下将oozie-sharelib-3.3.2.tar.gz解压并上传到hdfs上

    tar -zxvf oozie-sharelib-3.3.2.tar.gz
    bin/hadoop fs –mkdir share
    bin/hadoop fs –put share/* share

    将Oozie作为后台守护进程运行:

    $ bin/oozied.sh start

    将Oozie作为前台命令行进程运行:

    $ bin/oozied.sh run

    查看logs/oozie.log信息以保证Oozie正常启动,或者可以使用Oozie命令行工具查看Oozie的运行状态

    $ bin/oozie admin -oozie http://localhost:11000/oozie -status

    同时也可以直接打开浏览器访问如下网页

    http://localhost:11000/oozie/
Edit

使用说明及举例

  • Oozie的使用相对比较复杂,内容较多,下面列出几个重要的说明文档及链接
    Workflow Functional Specification    http://oozie.apache.org/docs/3.3.2/WorkflowFunctionalSpec.html
    Coordinator Functional Specification http://oozie.apache.org/docs/3.3.2/CoordinatorFunctionalSpec.html
    Bundle Functional Specification      http://oozie.apache.org/docs/3.3.2/BundleFunctionalSpec.html
    Command Line Tool      http://oozie.apache.org/docs/3.3.2/DG_CommandLineTool.html
    Oozie Client Javadocs  http://oozie.apache.org/docs/3.3.2/client/apidocs/index.html
    Oozie Core Javadocs    http://oozie.apache.org/docs/3.3.2/core/apidocs/index.html
    Oozie Web Services API http://oozie.apache.org/docs/3.3.2/WebServicesAPI.html
  • 执行一个简单的MR程序,所需文件包括Jar包、job.properties和workflow.xml(job.properties放本地就可以,其他两个需要上传到HDFS上),示例如下
    #job.properties
    #主要定义了一些运行Job所需的变量
    nameNode=hdfs://localhost:9000
    jobTracker=localhost:9001
    queueName=default
    examplesRoot=examples
    oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/map-reduce
    outputDir=map-reduce

    #workflow.xml
    #通过控制节点和Action节点描述了工作流如何执行
    #控制节点其实可以理解为Oozie的语法,比如可以定义开始(start),结束(end),失败(fail)节点.开始节点就表示从该节点开始运行.同时也提供一种机制去控制工作流的执行过程,如选择(decision),并行(fork),join节点。
    <workflow-app xmlns="uri:oozie:workflow:0.2" name="map-reduce-wf">
        <start to="mr-node"/>
        <action name="mr-node">
            <map-reduce>
                <job-tracker>${jobTracker}</job-tracker>
                <name-node>${nameNode}</name-node>
                <prepare>
                    <delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/${outputDir}"/>
                </prepare>
                <configuration>
                    <property>
                        <name>mapred.job.queue.name</name>
                        <value>${queueName}</value>
                    </property>
                    <property>
                        <name>mapred.mapper.class</name>
                        <value>org.apache.oozie.example.SampleMapper</value>
                    </property>
                    <property>
                        <name>mapred.reducer.class</name>
                        <value>org.apache.oozie.example.SampleReducer</value>
                    </property>
                    <property>
                        <name>mapred.map.tasks</name>
                        <value>1</value>
                    </property>
                    <property>
                        <name>mapred.input.dir</name>
                        <value>/user/${wf:user()}/${examplesRoot}/input-data/text</value>
                    </property>
                    <property>
                        <name>mapred.output.dir</name>
                        <value>/user/${wf:user()}/${examplesRoot}/output-data/${outputDir}</value>
                    </property>
                </configuration>
            </map-reduce>
            <ok to="end"/>
            <error to="fail"/>
        </action>
        <kill name="fail">
            <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
        </kill>
        <end name="end"/>
    </workflow-app>

    执行命令

    oozie job -oozie http://localhost:11000/oozie -config ../examples/apps/map-reduce/job.properties –run
  • 一个有依赖关系的Worklow示例,finaljob依赖secondjob和thirdjob,secondjob和thirdjob依赖firstjob
    <workflow-app name='example-forkjoinwf' xmlns="uri:oozie:workflow:0.1">
        <start to='firstjob' />
        <action name="firstjob">
            <map-reduce>
                <job-tracker>${jobtracker}</job-tracker>
                <name-node>${namenode}</name-node>
                <configuration>
                    <property>
                        <name>mapred.mapper.class</name>
                        <value>org.apache.hadoop.example.IdMapper</value>
                    </property>
                    <property>
                        <name>mapred.reducer.class</name>
                        <value>org.apache.hadoop.example.IdReducer</value>
                    </property>
                    <property>
                        <name>mapred.map.tasks</name>
                        <value>1</value>
                    </property>
                    <property>
                        <name>mapred.input.dir</name>
                        <value>${input}</value>
                    </property>
                    <property>
                        <name>mapred.output.dir</name>
                        <value>/usr/foo/${wf:id()}/temp1</value>
                    </property>
                </configuration>
            </map-reduce>
            <ok to="fork" />
            <error to="kill" />
        </action>
        <fork name='fork'>
            <path start='secondjob' />
            <path start='thirdjob' />
        </fork>
        <action name="secondjob">
            <map-reduce>
                <job-tracker>${jobtracker}</job-tracker>
                <name-node>${namenode}</name-node>
                <configuration>
                    <property>
                        <name>mapred.mapper.class</name>
                        <value>org.apache.hadoop.example.IdMapper</value>
                    </property>
                    <property>
                        <name>mapred.reducer.class</name>
                        <value>org.apache.hadoop.example.IdReducer</value>
                    </property>
                    <property>
                        <name>mapred.map.tasks</name>
                        <value>1</value>
                    </property>
                    <property>
                        <name>mapred.input.dir</name>
                        <value>/usr/foo/${wf:id()}/temp1</value>
                    </property>
                    <property>
                        <name>mapred.output.dir</name>
                        <value>/usr/foo/${wf:id()}/temp2</value>
                    </property>
                </configuration>
            </map-reduce>
            <ok to="join" />
            <error to="kill" />
        </action>
        <action name="thirdjob">
            <map-reduce>
                <job-tracker>${jobtracker}</job-tracker>
                <name-node>${namenode}</name-node>
                <configuration>
                    <property>
                        <name>mapred.mapper.class</name>
                        <value>org.apache.hadoop.example.IdMapper</value>
                    </property>
                    <property>
                        <name>mapred.reducer.class</name>
                        <value>org.apache.hadoop.example.IdReducer</value>
                    </property>
                    <property>
                        <name>mapred.map.tasks</name>
                        <value>1</value>
                    </property>
                    <property>
                        <name>mapred.input.dir</name>
                        <value>/usr/foo/${wf:id()}/temp1</value>
                    </property>
                    <property>
                        <name>mapred.output.dir</name>
                        <value>/usr/foo/${wf:id()}/temp3</value>
                    </property>
                </configuration>
            </map-reduce>
            <ok to="join" />
            <error to="kill" />
        </action>
        <join name='join' to='finalejob'/>
        <action name="finaljob">
            <map-reduce>
                <job-tracker>${jobtracker}</job-tracker>
                <name-node>${namenode}</name-node>
                <configuration>
                    <property>
                        <name>mapred.mapper.class</name>
                        <value>org.apache.hadoop.example.IdMapper</value>
                    </property>
                    <property>
                        <name>mapred.reducer.class</name>
                        <value>org.apache.hadoop.example.IdReducer</value>
                    </property>
                    <property>
                        <name>mapred.map.tasks</name>
                        <value>1</value>
                    </property>
                    <property>
                        <name>mapred.input.dir</name>
                        <value>/usr/foo/${wf:id()}/temp2,/usr/foo/${wf:id()}/temp3
                        </value>
                    </property>
                    <property>
                        <name>mapred.output.dir</name>
                        <value>${output}</value>
                    </property>
                </configuration>
            </map-reduce>
            <ok to="end" />
            <ok to="kill" />
        </action>
        <kill name="kill">
            <message>Map/Reduce failed, error message[${wf:errorMessage()}]</message>
        </kill>
        <end name='end'/>
    </workflow-app>
  • 执行一个定时执行的MR程序,所需文件包括Jar包、job.properties、coordinator.xml和workflow.xml(除了job.properties,其他都要上传到HDFS上,执行命令参考上例),示例如下
    #job.properties
    nameNode=hdfs://localhost:9000
    jobTracker=localhost:9001
    queueName=default
    examplesRoot=examples
    oozie.coord.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/aggregator/coordinator.xml
    start=2013-07-18T11:00Z
    end=2013-07-18T18:00Z
    inputData=inputDate
    outputData=outputData

    #workflow.xml
    <workflow-app xmlns="uri:oozie:workflow:0.2" name="aggregator-wf">
        <start to="aggregator"/>
        <action name="aggregator">
            <map-reduce>
                <job-tracker>${jobTracker}</job-tracker>
                <name-node>${nameNode}</name-node>
                <prepare>
                    <delete path="${outputData}"/>
                </prepare>
                <configuration>
                    <property>
                        <name>mapred.job.queue.name</name>
                        <value>${queueName}</value>
                    </property>
                    <property>
                        <name>mapred.mapper.class</name>
                        <value>org.apache.oozie.example.SampleMapper</value>
                    </property>
                    <property>
                        <name>mapred.reducer.class</name>
                        <value>org.apache.oozie.example.SampleReducer</value>
                    </property>
                    <property>
                        <name>mapred.map.tasks</name>
                        <value>1</value>
                    </property>
                    <property>
                        <name>mapred.input.dir</name>
                        <value>${inputData}</value>
                    </property>
                    <property>
                        <name>mapred.output.dir</name>
                        <value>${outputData}</value>
                    </property>
                </configuration>
            </map-reduce>
            <ok to="end"/>
            <error to="fail"/>
        </action>
        <kill name="fail">
            <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
        </kill>
        <end name="end"/>
    </workflow-app>

    #coordinator.xml
    #coordinator是架在workflow上面的,实际运行时先跑这个XML再跑workflow.xml
    #sync dataset通常是一个hdfs uri,可以让uri里面指定date以及time来对应到每一个具体的任务。一旦某个任务完成的话,那么这个hdfs uri就会建立,并且在先面会存在一个_SUCCESS的文件
    (当然你也可以指定其他文件名,如果没有指定的话那么就以目录是否存在作为依据),来表示任务完成。各个任务之间可以通过这种方式来做数据流之间的依赖。
    #coordinator用到了一些EL表达式,具体细节可以参考http://oozie.apache.org/docs/3.3.2/CoordinatorFunctionalSpec.html#a6.6._Parameterization_of_Dataset_Instances_in_Input_and_Output_Events
    #注意点:${coord:current(0)}里数字的单位是相关的dataset定义的frequency单位
     
    <coordinator-app name="aggregator-coord" frequency="${coord:hours(1)}" start="${start}" end="${end}" timezone="Asia/Shanghai"
                     xmlns="uri:oozie:coordinator:0.2">
        <controls>
            <concurrency>1</concurrency>
        </controls>
     
        <datasets>
            <dataset name="raw-logs" frequency="${coord:minutes(20)}" initial-instance="2013-07-18T10:00Z" timezone="Asia/Shanghai">
                <uri-template>${nameNode}/user/${coord:user()}/${examplesRoot}/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template>
            </dataset>
            <dataset name="aggregated-logs" frequency="${coord:hours(1)}" initial-instance="2013-07-18T11:00Z" timezone="Asia/Shanghai">
                <uri-template>${nameNode}/user/${coord:user()}/${examplesRoot}/output-data/aggregator/aggregatedLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
            </dataset>
        </datasets>
     
        <input-events>
            <data-in name="input" dataset="raw-logs">
                <start-instance>${coord:current(-2)}</start-instance>
                <end-instance>${coord:current(0)}</end-instance>
            </data-in>
        </input-events>
     
        <output-events>
            <data-out name="output" dataset="aggregated-logs">
                <instance>${coord:current(0)}</instance>
            </data-out>
        </output-events>
     
        <action>
            <workflow>
                <app-path>${nameNode}/user/${coord:user()}/${examplesRoot}/apps/aggregator</app-path>
                <configuration>
                    <property>
                        <name>jobTracker</name>
                        <value>${jobTracker}</value>
                    </property>
                    <property>
                        <name>nameNode</name>
                        <value>${nameNode}</value>
                    </property>
                    <property>
                        <name>queueName</name>
                        <value>${queueName}</value>
                    </property>
                    <property>
                        <name>inputData</name>
                        <value>${coord:dataIn('input')}</value>
                    </property>
                    <property>
                        <name>outputData</name>
                        <value>${coord:dataOut('output')}</value>
                    </property>
                </configuration>
            </workflow>
        </action>
    </coordinator-app>
  • 一个依赖多文件输入,并且循环执行多次的任务示例
      #定时任务按周循环调度,每次执行将消费七天的数据作为输入
      #weeklystats这个Job是从一年的第七天开始每七天执行一次
      #logs是一个每天在24:00时产生的一个日志文件
      #weeklystats是一个每连续七天(第七天24:00)生成一次的输出文件
     
      <coordinator-app name="hello2-coord" frequency="${coord:days(7)}"
                        start="2009-01-07T24:00Z" end="2009-12-12T24:00Z"
                        timezone="UTC"
                        xmlns="uri:oozie:coordinator:0.1">
          <datasets>
            <dataset name="logs" frequency="${coord:days(1)}"
                     initial-instance="2009-01-01T24:00Z" timezone="UTC">
              <uri-template>hdfs://bar:8020/app/logs/${YEAR}${MONTH}/${DAY}</uri-template>
            </dataset>
            <dataset name="weeklySiteAccessStats" frequency="${coord:days(7)}"
                     initial-instance="2009-01-07T24:00Z" timezone="UTC">
              <uri-template>hdfs://bar:8020/app/weeklystats/${YEAR}/${MONTH}/${DAY}</uri-template>
            </dataset>
          </datasets>
          <input-events>
            <data-in name="input" dataset="logs">
              <start-instance>${coord:current(-6)}</start-instance>
              <end-instance>${coord:current(0)}</end-instance>
            </data-in>
          </input-events>
          <output-events>
             <data-out name="output" dataset="siteAccessStats">
               <instance>${coord:current(0)}</instance>
             </data-out>
          </output-events>
          <action>
            <workflow>
              <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path>
              <configuration>
                <property>
                  <name>wfInput</name>
                  <value>${coord:dataIn('input')}</value>
                </property>
                <property>
                  <name>wfOutput</name>
                  <value>${coord:dataOut('output')}</value>
                </property>
             </configuration>
           </workflow>
          </action>
       </coordinator-app>

    第一次被执行时,相当于被解析成如下任务

       <workflow>
        <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path>
        <configuration>
          <property>
            <name>wfInput</name>
            <value>
                   hdfs://bar:8020/app/logs/200901/01,hdfs://bar:8020/app/logs/200901/02,
                   hdfs://bar:8020/app/logs/200901/03,hdfs://bar:8020/app/logs/200901/05,
                   hdfs://bar:8020/app/logs/200901/05,hdfs://bar:8020/app/logs/200901/06,
                   hdfs://bar:8020/app/logs/200901/07
            </value>
          </property>
          <property>
            <name>wfOutput</name>
            <value>hdfs://bar:8020/app/stats/2009/01/07</value>
          </property>
        </configuration>
      </workflow>
     

    第二次执行时解析成如下任务

      <workflow>
        <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path>
        <configuration>
          <property>
            <name>wfInput</name>
            <value>
                   hdfs://bar:8020/app/logs/200901/08,hdfs://bar:8020/app/logs/200901/09,
                   hdfs://bar:8020/app/logs/200901/10,hdfs://bar:8020/app/logs/200901/11,
                   hdfs://bar:8020/app/logs/200901/12,hdfs://bar:8020/app/logs/200901/13,
                   hdfs://bar:8020/app/logs/200901/16
            </value>
          </property>
          <property>
            <name>wfOutput</name>
            <value>hdfs://bar:8020/app/stats/2009/01/16</value>
          </property>
        </configuration>
      </workflow>
  • Java API执行Job示例如下
        OozieClient wc = new OozieClient("http://localhost:11000/oozie");
        // create a workflow job configuration and set the workflow application
        // path
        Properties conf = wc.createConfiguration();
        conf.setProperty(OozieClient.APP_PATH,"hdfs://localhost:9000/user/supertool/examples/apps/map-reduce/");
        // setting workflow parameters
        conf.setProperty("jobTracker", "localhost:9001");
        conf.setProperty("nameNode", "hdfs://localhost:9000");
        conf.setProperty("inputDir", "/user/supertool/examples/input-data/text");
        conf.setProperty("outputDir", "/user/supertool/examples/output-data/map-reduce");
        conf.setProperty("queueName", "default");
        conf.setProperty("examplesRoot", "examples");
        conf.setProperty("outputDir", "map-reduce");
        // submit and start the workflow job
        String jobId = wc.run(conf);
        System.out.println("Workflow job submitted");
        // wait until the workflow job finishes printing the status every 10
        // seconds
        while (wc.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) {
            System.out.println("Workflow job running ...");
            Thread.sleep(10 * 1000);
        }
        // print the final status o the workflow job
        System.out.println("Workflow job completed ...");
        System.out.println(wc.getJobInfo(jobId));
  • mbm工作流模拟实现
    过程如下图所示。每分钟8台log服务器都会将log汇总到logserver,如果当前小时收到的log文件数量正确,开始做该小时的ETL过程。若ETL过程正确结束,启动小时报表任务。若当天24小时的ETL都正确执行,启动24小时ETL结果文件合并的任务,若此任务正确执行,启动将今天的聚合文件与前若干天的聚合文件再次聚合的任务。

    此过程在Oozie中分解成5个任务hdfs、etl、hourreport、dayreport、daysmerge,分别如下:
    hdfs主要功能是检查HDFS某目录下文件数目是否为某一值,若满足条件创建标志文件,若不满足,且等待时间超过指定分钟,任务失败。
         <main-class>com.test.HDFS</main-class>
         <!--inputpath-->
         <arg>${inputData}</arg>
         <!--file number-->
         <arg>3</arg>
         <!--wait minute-->
         <arg>1</arg>
         <!--success file-->
         <arg>_SUCCESS</arg>
     
        <datasets>
            <dataset name="raw-logs" frequency="${coord:hours(1)}" initial-instance="${start}" timezone="Asia/Shanghai">
                <uri-template>${nameNode}/user/${coord:user()}/${examplesRoot}/input-data/rawLogs/${YEAR}/${MONTH}/day-${DAY}/${HOUR}/</uri-template>
                <done-flag></done-flag>
            </dataset>
        </datasets>

    etl主要功能是检测原始日志目录下是否有标志文件(默认_SUCCESS),若满足条件执行etl任务

        <datasets>
            <dataset name="raw-logs" frequency="${coord:hours(1)}" initial-instance="${start}" timezone="Asia/Shanghai">
                <uri-template>${nameNode}/user/${coord:user()}/${examplesRoot}/input-data/rawLogs/${YEAR}/${MONTH}/day-${DAY}/${HOUR}/</uri-template>
                <done-flag>_SUCCESS</done-flag>
            </dataset>
            <dataset name="etl-logs" frequency="${coord:hours(1)}" initial-instance="${start}" timezone="Asia/Shanghai">
                <uri-template>${nameNode}/user/${coord:user()}/${examplesRoot}/etl/Logs/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
            </dataset>
        </datasets>

    hourreport主要功能是检测etl输出目录是否有标志文件,若满足条件执行小时报表任务

        <datasets>
            <dataset name="etl-logs" frequency="${coord:hours(1)}" initial-instance="${start}" timezone="Asia/Shanghai">
                <uri-template>${nameNode}/user/${coord:user()}/${examplesRoot}/etl/Logs/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
            </dataset>
            <dataset name="hourreport-logs" frequency="${coord:hours(1)}" initial-instance="${start}" timezone="Asia/Shanghai">
                <uri-template>${nameNode}/user/${coord:user()}/${examplesRoot}/hourreport/Logs/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
            </dataset>
        </datasets>

    dayreport主要功能是检测某天24小时的etl输出目录下是否都有标志文件,若满足条件执行合并任务。

        <datasets>
            <dataset name="etl-logs" frequency="${coord:hours(1)}" initial-instance="${start}" timezone="Asia/Shanghai">
                <uri-template>${nameNode}/user/${coord:user()}/${examplesRoot}/etl/Logs/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
                <done-flag>_SUCCESS</done-flag>
            </dataset>
            <dataset name="dayreport-logs" frequency="${coord:days(1)}" initial-instance="${start}" timezone="Asia/Shanghai">
                <uri-template>${nameNode}/user/${coord:user()}/${examplesRoot}/dayreport/Logs/${YEAR}/${MONTH}/${DAY}</uri-template>
            </dataset>
        </datasets>
     
        <input-events>
            <data-in name="input" dataset="etl-logs">
                <start-instance>${coord:current(8)}</start-instance>
                <end-instance>${coord:current(31)}</end-instance>
            </data-in>
        </input-events>
     
        <output-events>
            <data-out name="output" dataset="dayreport-logs">
                <instance>${coord:current(1)}</instance>
            </data-out>
        </output-events>

    在上例中,start=2013-06-24T17:00Z的情况下input会被解析成

    ${nameNode}/user/${coord:user()}/${examplesRoot}/etl/Logs/2013/06/25/1
    ......
    ......
    ${nameNode}/user/${coord:user()}/${examplesRoot}/etl/Logs/2013/06/26/0

    daysmerge主要任务是检测dayreport的输出是否正确,若正确则开始聚合从指定某天到调度当天的数据。

        <datasets>
            <dataset name="dayreport-logs" frequency="${coord:days(1)}" initial-instance="${start}" timezone="Asia/Shanghai">
                <uri-template>${nameNode}/user/${coord:user()}/${examplesRoot}/dayreport/Logs/${YEAR}/${MONTH}/${DAY}</uri-template>
            </dataset>
            <dataset name="daysmerge-logs" frequency="${coord:days(1)}" initial-instance="${start}" timezone="Asia/Shanghai">
                <uri-template>${nameNode}/user/${coord:user()}/${examplesRoot}/daysmerge/Logs/${YEAR}/${MONTH}/${DAY}</uri-template>
            </dataset>
        </datasets>
     
        <input-events>
            <data-in name="input" dataset="dayreport-logs">
                <start-instance>${coord:current(-365)}</start-instance>
                <end-instance>${coord:current(0)}</end-instance>
            </data-in>
        </input-events>
     
        <output-events>
            <data-out name="output" dataset="daysmerge-logs">
                <instance>${coord:current(0)}</instance>
            </data-out>
        </output-events>
    由于目前oozie不支持路径模板从某天开始的解析方式,所以这里采用多加了一个输入路径(addInputData)的方式解决此问题,另外提供了createPath.jar用于生成所需路径。此jar包需要三个参数,开始时间、结束时间和Path模板。
        <property>
            <name>mapred.input.dir</name>
            <value>${inputData},${addInputData}</value>
        </property>
     
Edit

Azkaban与Oozie对比

比较重要的相同点:

都服务器端提供持续服务,依赖Web服务,有Web UI
都可以跟踪任务执行状态
都可以执行MR、Pig任务
都支持失败重试功能
都需要部署安装
都可以通过Http获取任务相关信息


比较重要的不同点:

工作流描述语言       Azkaban使用属性文件描述工作流,而Oozie采用HPDL(Hadoop Process Definition Language)作为描述语言
稳定性/技术支持      Oozie由于和Hadoop CDH版本绑定,使用人数较多,技术保证更好些
对工作流的控制方式    Oozie在工作流控制上功能比Azkaban稍多,比Azkaban要好一些(Oozie比Azkaban多了一种数据依赖触发的方式)
文档资料            Azkaban文档资料较少,而且很多地方说的很模糊,很多功能需要探索。Oozie文档相对比较详细
提供的操作接口       Oozie提供的操作方式比Azkaban多。其中Oozie支持Java API、命令调用、Web Service API、Web UI四种方式,Azkaban不支持前两者
支持的JOB类型       Oozie支持的Job类型比Azkaban多一点(Sqoop Action,Ssh Action)
WEB UI             Azkaban的Web UI做的很好,体验和功能都比Oozie要好。其中Azkaban支持在Web UI提交Job,Oozie的UI只能查看状态信息
操作是否方便         Azkaban操作很方便,而Oozie使用有些复杂(需要了解EL表达式)

Edit

遗留问题

  • 时区问题,目前TimeZone只支持以UTC的方式作为输入,我们是Asia/Shanghai时区,调度时需要将当前时间-8个小时才是UTC时间。
  • 跟踪时对于具体运行的某个Job的细节,仍需查看Hadoop自带的相关工具

抱歉!评论已关闭.