大数据工作流组件oozie简介

Oozie是一个管理 Apache Hadoop 作业的工作流调度系统。

Oozie的 workflow jobs 是由 actions 组成的 有向无环图(DAG)。

Oozie的 coordinator jobs 是由时间 (频率)和数据可用性触发的重复的 workflow jobs 。

Oozie与Hadoop生态圈的其他部分集成在一起,支持多种类型的Hadoop作业(如Java map-reduce、流式map-reduce、Pig、Hive、Sqoop和Distcp)以及特定于系统的工作(如Java程序和shell脚本),不同作业对应不同的workflow action。

主要概念

Workflow:工作流,由我们需要处理的每个工作组成。

Coordinator:协调器,根据条件触发工作流执行,支持周期触发和检测数据是否准备好。

Bundle:将一堆的coordinator进行汇总处理。

组件图

使用

job的组成

  • job.properties 记录了job的属性
  • workflow.xml 使用hPDL 定义任务的流程和分支
  • lib目录 用来执行具体的任务
  • coordinator.xml 定义调度策略

job.properties

KEY 含义
nameNode HDFS地址
jobTracker jobTracker(ResourceManager)地址
queueName Oozie队列(默认填写default)
oozie.usr.system.libpath 是否加载用户lib目录(true/false)
oozie.libpath 用户lib库所在的位置
oozie.wf.application.path Oozie流程所在hdfs地址(workflow.xml所在的地址)
user.name 当前用户
oozie.coord.application.path Coordinator.xml地址(没有可以不写)
oozie.bundle.application.path Bundle.xml地址(没有可以不写)

workflow.xml

包含控制流节点(control flow nodes)和动作节点(action nodes)

  • [控制流节点]:主要包括start、end、fork、join等,其中fork、join成对出现,在fork展开。分支,最后在join结点汇聚
    * start
    * end
  • [动作节点]:包括Hadoop任务、SSH、HTTP、EMAIL、OOZIE子任务
    * ok    --> end
    * error --> kill
    * 定义具体需要执行的job任务
    * MapReduce、shell、hive

actions 在远程系统(如Hadoop、Pig)中启动工作。在action完成时,远程系统回调Oozie通知action完成,此时Oozie将继续在workflow 中进行下一步操作。

1
2
3
4
<workflow-app xmlns="uri:oozie:workflow:0.2" name="no-op-wf">
<start to="end"/>
<end name="end"/>
</workflow-app>

coordinator.xml

workflow 作业是基于常规的时间间隔(time intervals)和数据可用性(data availability)运行的。

包括controls、datasets、input-events、output-events、action节点

  • controls
元素名称 含义说明
timeout 超时时间,单位为分钟。当一个Coordinator Job启动的时候,会初始化多个Coordinator动作,timeout用来限制这个初始化过程。默认值为-1,表示永远不超时,如果为0 则总是超时。
concurrency 并发数,指多个Coordinator Job并发执行,默认值为1。
execution 配置多个Coordinator Job并发执行的策略:默认是FIFO。另外还有两种:LIFO(最新的先执行)、LAST_ONLY(只执行最新的Coordinator Job,其它的全部丢弃)。
throttle 一个Coordinator Job初始化时,允许Coordinator动作处于WAITING状态的最大数量。
  • datasets

Coordinator Job中有一个Dataset的概念,它可以为实际计算提供计算的数据,主要是指HDFS上的数据目录或文件,能够配置数据集生成的频率(Frequency)、URI模板、时间等信息

1
2
3
4
5
6
7
8
<datasets>
<include>[SHARED_DATASETS]</include>
...
<dataset name="[NAME]" frequency="[FREQUENCY]" initial-instance="[DATETIME]" timezone="[TIMEZONE]">
<uri-template>[URI TEMPLATE]</uri-template>
</dataset>
...
</datasets>
  • input-events和output-events元素

一个Coordinator应用的输入事件指定了要执行一个Coordinator动作必须满足的输入条件,在Oozie当前版本,只支持使用dataset实例。

  • action节点定义需要运行的workflow

Coordinator 动作的状态变迁

转移前状态 转以后状态集合
WAITING READY 、 TIMEDOUT 、 KILLED
READY SUBMITTED 、 KILLED
SUBMITTED RUNNING 、 KILLED 、 FAILED
RUNNING SUCCEEDED 、 KILLED 、 FAILED
  • EL变量
常量表示形式 含义说明
${coord:minutes(int n)} 返回日期时间:从一开始,周期执行n分钟
${coord:hours(int n)} 返回日期时间:从一开始,周期执行n * 60分钟
${coord:days(int n)} 返回日期时间:从一开始,周期执行n * 24 * 60分钟
${coord:current(int n)} 返回日期时间:从一个Coordinator动作(Action)创建时开始计算,第n个dataset实例执行时间
${coord:dataIn(String name)} 在输入事件(input-events)中,解析dataset实例包含的所有的URI
${coord:dataOut(String name)} 在输出事件(output-events)中,解析dataset实例包含的所有的URI
${coord:offset(int n, String timeUnit)} 表示时间偏移,如果一个Coordinator动作创建时间为T,n为正数表示向时刻T之后偏移,n为负数向向时刻T之前偏移,timeUnit表示时间单位(选项有MINUTE、HOUR、DAY、MONTH、YEAR)
${coord:nominalTime()} nominal时间等于Coordinator Job启动时间,加上多个Coordinator Job的频率所得到的日期时间。例如:start=”2009-01-01T24:00Z”,end=”2009-12-31T24:00Z”,frequency=”${coord:days(1)}”,frequency=”${coord:days(1)},则nominal时间为:2009-01-02T00:00Z、2009-01-03T00:00Z、2009-01-04T00:00Z、…、2010-01-01T00:00Z
${coord:actualTime()} Coordinator动作的实际创建时间。例如:start=”2011-05-01T24:00Z”,end=”2011-12-31T24:00Z”,frequency=”${coord:days(1)}”,则实际时间为:2011-05-01,2011-05-02,2011-05-03,…,2011-12-31
${coord:user()} 启动当前Coordinator Job的用户名称
${coord:dateOffset(String baseDate, int instance, String timeUnit)} 计算新的日期时间的公式:newDate = baseDate + instance * timeUnit,如:baseDate=’2009-01-01T00:00Z’,instance=’2′,timeUnit=’MONTH’,则计算得到的新的日期时间为’2009-03-01T00:00Z’。
${coord:formatTime(String timeStamp, String format)} 格式化时间字符串,format指定模式
打赏
  • 版权声明: 本博客所有文章除特别声明外,均采用 Apache License 2.0 许可协议。转载请注明出处!
  • © 2015-2024 RivenZoo
  • Powered by Hexo Theme Ayer
  • PV: UV:

请我喝杯咖啡吧~

支付宝
微信