一旦数据仓库开始使用,就需要不断从源系统给数据仓库提供新数据。为了确保数据流的稳定,需要使用所在平台上可用的任务调度器来调度 ETL 定期执行。调度模块是 ETL 系统必不可少的组成部分,它不但是数据仓库的基本需求,也对项目的成功起着举足轻重的作用。本篇说明如何使用 HDP 中的 Oozie 和 Falcon 服务实现 ETL 执行自动化。
Oozie 是一个管理 Hadoop 作业、可伸缩、可扩展、可靠的工作流调度系统,它内部定义了三种作业:工作流作业、协调器作业和 Bundle 作业。工作流作业是由一系列动作构成的有向无环图(DAGs),协调器作业是按时间频率周期性触发 Oozie 工作流的作业,Bundle 管理协调器作业。Oozie 支持的用户作业类型有 Java map-reduce、Streaming map-reduce、Pig、 Hive、Sqoop 和 Distcp,及其 Java 程序和 shell 脚本或命令等特定的系统作业。
使用 Oozie 主要基于以下两点原因:
Oozie 的体系结构如图 1 所示。
图 1 Oozie 是一种 Java web 应用程序,它运行在 Java Servlet 容器、即 Tomcat 中,并使用数据库来存储以下内容:
Oozie 工作流是放置在 DAG(有向无环图 Direct Acyclic Graph)中的一组动作,例如,Hadoop 的 Map/Reduce 作业、Pig 作业等。DAG 控制动作的依赖关系,指定了动作执行的顺序。Oozie 使用 hPDL 这种 XML 流程定义语言来描述这个图。 hPDL 是一种很简洁的语言,它只会使用少数流程控制节点和动作节点。控制节点会定义执行的流程,并包含工作流的起点和终点(start、end 和 fail 节点)以及控制工作流执行路径的机制(decision、fork 和 join 节点)。动作节点是实际执行操作的部分,通过它们工作流会触发执行计算或者处理任务。 所有由动作节点触发的计算和处理任务都不在 Oozie 中运行。它们是由 Hadoop 的 MapReduce 框架执行的。这种低耦合的设计方法让 Oozie 可以有效利用 Hadoop 的负载平衡、灾难恢复等机制。这些任务主要是串行执行的,只有文件系统动作例外,它是并行处理的。这意味着对于大多数工作流动作触发的计算或处理任务类型来说,在工作流操作转换到工作流的下一个节点之前都需要等待,直到前面节点的计算或处理任务结束了之后才能够继续。Oozie 可以通过两种不同的方式来检测计算或处理任务是否完成,这就是回调和轮询。当 Oozie 启动了计算或处理任务时,它会为任务提供唯一的回调 URL,然后任务会在完成的时候发送通知给这个特定的 URL。在任务无法触发回调 URL 的情况下(可能是因为任何原因,比方说网络闪断),或者当任务的类型无法在完成时触发回调 URL 的时候,Oozie 有一种机制,可以对计算或处理任务进行轮询,从而能够判断任务是否完成。 Oozie 工作流可以参数化,例如在工作流定义中使用像 ${inputDir} 之类的变量等。在提交工作流操作的时候,我们必须提供参数值。如果经过合适地参数化,比如使用不同的输出目录,那么多个同样的工作流操作可以并发执行。 一些工作流是根据需要触发的,但是大多数情况下,我们有必要基于一定的时间段、数据可用性或外部事件来运行它们。Oozie 协调系统(Coordinator system)让用户可以基于这些参数来定义工作流执行计划。Oozie 协调程序让我们可以用谓词的方式对工作流执行触发器进行建模,谓词可以是时间条件、数据条件、内部事件或外部事件。工作流作业会在谓词得到满足的时候启动。不难看出,这里的谓词,其作用和 SQL 语句的 WHERE 子句中的谓词类似,本质上都是在满足某些条件时触发某种事件。 有时,我们还需要连接定时运行、但时间间隔不同的工作流操作。多个以不同频率运行的工作流的输出会成为下一个工作流的输入。把这些工作流连接在一起,会让系统把它作为数据应用的管道来引用。Oozie 协调程序支持创建这样的数据应用管道。
我们的定期 ETL 需要使用 Oozie 中的 FS、Sqoop 和 SSH 三种动作,其中增量数据抽取要用到 Sqoop job。由于 Oozie 在执行这些动作时存在一些特殊要求,因此在定义工作流前先要进行适当的配置。
我的实验环境用的是 HDP2.5.0,在安装之时就已经配置并启动了 Oozie 服务。
缺省时,Sqoop metastore 自动连接存储在~/.sqoop/. 目录下的本地嵌入式数据库。然而要在 Oozie 中执行 Sqoop job 需要 Sqoop 使用共享的元数据存储,否则会报类似如下的错误:ERROR org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage - Cannot restore job。在本例中我使用 hdp2 上的 MySQL 数据库存储 Sqoop 的元数据。(1)记录当前 Sqoop 作业的 last.value 值
- last_value=`sqoop job --show myjob_incremental_import | grep incremental.last.value | awk '{print $3}'`
该值在后面重建作业时会用到。(2)在 MySQL 中创建 Sqoop 的元数据存储数据库
- create database sqoop;
- create user 'sqoop'@'hdp2' identified by 'sqoop';
- grant all privileges on sqoop.* to 'sqoop'@'hdp2';
- flush privileges;
(3)配置 Sqoop 的元数据存储参数
在 Ambari 的 Sqoop -> Configs -> Custom sqoop-site 中添加如图 2 所示的参数
图 2
(4)重启 Sqoop 服务 重启完成后,MySQL 的 sqoop 库中有了一个名为 SQOOP_ROOT 的空表。
- mysql> show tables;
- +-----------------+
- | Tables_in_sqoop |
- +-----------------+
- | SQOOP_ROOT |
- +-----------------+
- 1 row in set (0.00 sec)
(5)预装载 SQOOP 表
- use sqoop;
- insert into SQOOP_ROOT values (NULL, 'sqoop.hsqldb.job.storage.version', '0');
(6)创建初始表
- sqoop job --list
此时并不会返回先前已经创建的 myjob_incremental_import 作业,因为此时 MySQL 中没有元数据信息。该命令执行完成后,MySQL 的 sqoop 库中有了一个名为 SQOOP_SESSIONS 的空表,该表存储 sqoop job 相关信息。
- mysql> show tables;
- +-----------------+
- | Tables_in_sqoop |
- +-----------------+
- | SQOOP_ROOT |
- | SQOOP_SESSIONS |
- +-----------------+
- 2 rows in set (0.00 sec)
(6)将表的存储引擎修改为 MYISAM
- alter table SQOOP_ROOT engine=myisam;
- alter table SQOOP_SESSIONS engine=myisam;
因为每次执行增量抽取后都会更新 last_value 值,如果使用 Innodb 可能引起事务锁超时错误。
- sqoop job --create myjob_incremental_import \
- -- import \
- --connect "jdbc:mysql://172.16.1.127:3306/source?usessl=false&user=dwtest&password=123456" \
- --table sales_order \
- --target-dir /data/ext/sales_order \
- --compress \
- --where "entry_date < current_date()" \
- --incremental append \
- --check-column order_number \
- --last-value $last_value
上面的命令执行后,SQOOP_SESSIONS 表中存储了 Sqoop job 的信息。
- select * from SQOOP_SESSIONS\G
- ...
- *************************** 53. row ***************************
- job_name: myjob_incremental_import
- propname: sqoop.property.set.id
- propval: 0
- propclass: schema
- *************************** 54. row ***************************
- job_name: myjob_incremental_import
- propname: sqoop.tool
- propval: import
- propclass: schema
- *************************** 55. row ***************************
- job_name: myjob_incremental_import
- propname: temporary.dirRoot
- propval: _sqoop
- propclass: SqoopOptions
- *************************** 56. row ***************************
- job_name: myjob_incremental_import
- propname: verbose
- propval: false
- propclass: SqoopOptions
- 56 rows in set (0.00 sec)
此时执行 sqoop job --list 可以看到刚创建的 job。
- sqoop job --list
- ...
- Available jobs:
- myjob_incremental_import
关于使用 MySQL 作为 Sqoop 元数据存储的配置,可以参考 "Using SQOOP with MySQL as metastore"。
Oozie 中执行 Sqoop 时如果缺少 java-json.jar 文件,会报类似如下的错误:
- Failing Oozie Launcher,
- Main class[org.apache.oozie.action.hadoop.SqoopMain],
- main() threw exception,
- org / json / JSONObject
在我的 HDP2.5.0 安装中没有该文件,需要自行下载,然后拷贝到相关目录。
- cp java-json.jar /usr/hdp/current/sqoop-client/lib/
- su - hdfs -c 'hdfs dfs -put /usr/hdp/current/sqoop-client/lib/java-json.jar /user/oozie/share/lib/lib_20170208131207/sqoop/'
实际的数据装载过程是通过 HAWQ 的函数实现的,自然工作流中要执行包含 psql 命令行的本地 shell 脚本文件。这需要明确要调用的 shell 使用的是本地的 shell,可以通过 Oozie 中的 SSH 动作指定本地文件。在使用 SSH 这个动作的时候,可能会遇到 AUTH_FAILED:Not able to perform operation 的问题,解决该问题要对 Oozie 的服务器做免密码登录。(1)修改 / etc/passwd 文件 HDP 缺省运行 Oozie Server 的用户是 Oozie,因此在 / etc/passwd 中更改 Oozie 用户,使得其可登录。我的环境配置是:
- oozie:x:506:504:Oozie user:/home/oozie:/bin/bash
(2)从 Oozie 用户到 root 用户做免密码登录 我是用 root 提交 Oozie 的任务的,所以这里要对从 Oozie 用户到 root 用户做免密码登录。
- su - oozie
- ssh-keygen
- ... 一路回车生成密钥文件 ...
- su -
- # 将oozie的公钥复制到root的authorized_keys文件中
- cat /home/oozie/.ssh/id_rsa.pub >> authorized_keys
完成以上配置后,在 oozie 用户下可以免密码 ssh root@hdp2。关于 oozie 调用本地 shell 脚本可以参考 "OOZIE 调用 shell 脚本做 mr 计算挂死问题分析和解决"。
建立内容如下的 workflow.xml 文件:
- <?xml version="1.0" encoding="UTF-8" ?>
- <workflow-app xmlns="uri:oozie:workflow:0.4" name="RegularETL">
- <start to="hdfsCommands" />
- <action name="hdfsCommands">
- <fs>
- <delete path='${nameNode}/data/ext/sales_order/*' />
- </fs>
- <ok to="fork-node" />
- <error to="fail" />
- </action>
- <fork name="fork-node">
- <path start="sqoop-customer" />
- <path start="sqoop-product" />
- <path start="sqoop-sales_order" />
- </fork>
- <action name="sqoop-customer">
- <sqoop xmlns="uri:oozie:sqoop-action:0.2">
- <job-tracker>
- ${jobTracker}
- </job-tracker>
- <name-node>
- ${nameNode}
- </name-node>
- <arg>
- import
- </arg>
- <arg>
- --connect
- </arg>
- <arg>
- jdbc:mysql://172.16.1.127:3306/source?useSSL=false
- </arg>
- <arg>
- --username
- </arg>
- <arg>
- dwtest
- </arg>
- <arg>
- --password
- </arg>
- <arg>
- 123456
- </arg>
- <arg>
- --table
- </arg>
- <arg>
- customer
- </arg>
- <arg>
- --target-dir
- </arg>
- <arg>
- /data/ext/customer
- </arg>
- <arg>
- --delete-target-dir
- </arg>
- <arg>
- --compress
- </arg>
- </sqoop>
- <ok to="joining" />
- <error to="fail" />
- </action>
- <action name="sqoop-product">
- <sqoop xmlns="uri:oozie:sqoop-action:0.2">
- <job-tracker>
- ${jobTracker}
- </job-tracker>
- <name-node>
- ${nameNode}
- </name-node>
- <arg>
- import
- </arg>
- <arg>
- --connect
- </arg>
- <arg>
- jdbc:mysql://172.16.1.127:3306/source?useSSL=false
- </arg>
- <arg>
- --username
- </arg>
- <arg>
- dwtest
- </arg>
- <arg>
- --password
- </arg>
- <arg>
- 123456
- </arg>
- <arg>
- --table
- </arg>
- <arg>
- product
- </arg>
- <arg>
- --target-dir
- </arg>
- <arg>
- /data/ext/product
- </arg>
- <arg>
- --delete-target-dir
- </arg>
- <arg>
- --compress
- </arg>
- </sqoop>
- <ok to="joining" />
- <error to="fail" />
- </action>
- <action name="sqoop-sales_order">
- <sqoop xmlns="uri:oozie:sqoop-action:0.2">
- <job-tracker>
- ${jobTracker}
- </job-tracker>
- <name-node>
- ${nameNode}
- </name-node>
- <command>
- job --meta-connect jdbc:mysql://hdp2/sqoop?user=sqoop&password=sqoop --exec
- myjob_incremental_im port
- </command>
- <archive>
- /user/oozie/share/lib/lib_20170208131207/sqoop/java-json.jar#java-json.jar
- </archive>
- </sqoop>
- <ok to="joining" />
- <error to="fail" />
- </action>
- <join name="joining" to="psql-node" />
- <action name="psql-node">
- <ssh xmlns="uri:oozie:ssh-action:0.1">
- <host>
- ${focusNodeLogin}
- </host>
- <command>
- ${myScript}
- </command>
- <capture-output/>
- </ssh>
- <ok to="end" />
- <error to="fail" />
- </action>
- <kill name="fail">
- <message>
- Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
- </message>
- </kill>
- <end name="end" />
- </workflow-app>
这个工作流的 DAG 如图 3 所示。
图 3 上面的 XML 文件使用 hPDL 的语法定义了一个名为 RegularETL 的工作流。该工作流包括 10 个节点,其中有 5 个控制节点,5 个动作节点:工作流的起点 start、终点 end、失败处理节点 fail(DAG 图中未显示),两个执行路径控制节点 fork-node 和 joining,一个 FS 动作节点 hdfsCommands 用于删除增量抽取的 HDFS 数据目录;三个并行处理的 Sqoop 动作节点 sqoop-customer、sqoop-product、sqoop-sales_order 用作数据抽取;一个 SSH 动作节点 psql-node 调用本地 shell 脚本,执行 HAWQ 数据装载。 Oozie 的工作流节点分为控制节点和动作节点两类。控制节点控制着工作流的开始、结束和作业的执行路径。动作节点触发计算或处理任务的执行。节点的名字必须符合 [a-zA-Z][\-_a-zA-Z0-0]* 这种正则表达式模式,并且不能超过 20 个字符。为了能让 Falcon 调用 Oozie 工作流,工作流名称不要带下划线等字符。 工作流定义中可以使用形式参数。当工作流被 Oozie 执行时,所有形参都必须提供具体的值。参数定义使用 JSP 2.0 的语法,参数不仅可以是单个变量,还支持函数和复合表达式。参数可以用于指定动作节点和 decision 节点的配置值、XML 属性值和 XML 元素值,但是不能在节点名称、XML 属性名称、XML 元素名称和节点的转向元素中使用参数。上面工作流中的 ${jobTracker} 和 ${nameNode}两个参数,分别指定 YARN 资源管理器的主机 / 端口和 HDFS NameNode 的主机 / 端口(如果配置了 HDFS HA,nameNode 使用 Nameservice ID)。${focusNodeLogin}指定本地 shell 脚本所在主机,${myScript}指定本地 shell 脚本文件全路径。 Oozie 的工作流作业本身还提供了丰富的内建函数,Oozie 将它们统称为表达式语言函数(Expression Language Functions,简称 EL 函数)。通过这些函数可以对动作节点和 decision 节点的谓词进行更复杂的参数化。我的工作流中使用了 wf:errorMessage 和 wf:lastErrorNode 两个内建函数。wf:errorMessage 函数返回特定节点的错误消息,如果没有错误则返回空字符串。错误消息常被用于排错和通知的目的。wf:lastErrorNode 函数返回最后出错的节点名称,如果没有错误则返回空字符串。
这里所说的部署就是把相关文件上传到 HDFS 的对应目录中。我们需要上传工作流定义文件,还要上传 file、archive、script 元素中指定的文件。可以使用 hdfs dfs -put 命令将本地文件上传到 HDFS,-f 参数的作用是,如果目标位置已经存在同名的文件,则用上传的文件覆盖已存在的文件。
- # 上传工作流文件
- hdfs dfs -put -f workflow.xml /user/oozie/
- # 上传MySQL JDBC驱动文件到Oozie的共享库目录中
- hdfs dfs -put /var/lib/ambari-agent/tmp/mysql-connector-java-5.1.38-bin.jar /user/oozie/share/lib/lib_20170208131207/sqoop/
建立内容如下的 / root/regular_etl.sh 文件:
- #!/bin/bash
- # 使用gpadmin用户执行定期装载函数
- su - gpadmin -c 'export PGPASSWORD=123456;psql -U dwtest -d dw -h hdp3 -c "set search_path=tds;select fn_regular_load ();"'
该 shell 文件内容很简单,可执行的就一行,调用 psql 执行 HAWQ 定期数据装载函数。
Apache Falcon 是一个面向 Hadoop 的、新的数据处理和管理平台,设计用于数据移动、数据管道协调、生命周期管理和数据发现。它使终端用户可以快速地将他们的数据及其相关的处理和管理任务 "上载(onboard)" 到 Hadoop 集群。 Apache Falcon 解决了大数据领域中一个非常重要和关键的问题。升级为顶级项目是该项目的一个重大进展。Apache Falcon 有一个完善的路线图,可以减少应用程序开发和管理人员编写和管理复杂数据管理和处理应用程序的痛苦。 用户会发现,在 Apache Falcon 中,"基础设施端点(infrastructure endpoint)"、数据集(也称 Feed )、处理规则均是声明式的。这种声明式配置显式定义了实体之间的依赖关系。这也是该平台的一个特点,它本身只维护依赖关系,而并不做任何繁重的工作。所有的功能和工作流状态管理需求都委托给工作流调度程序来完成。
图 4 是 Falcon 的架构图。
图 4 从上图可以看出,Apache Falcon:
而按照开发人员 Michael Miklavcic 的说法,Apache Falcon 使他们的团队逐步构建起一个复杂的管道。该管道包含超过 90 个 Process 和 200 个 Feed。如果单独使用 Apache Oozie,这会是一项重大挑战。
Falcon 选择 Oozie 作为缺省的调度器。Hadoop 上的许多数据处理需要基于数据可用性或时间进行调度,当前 Oozie 本身就支持这些功能。同时 Falcon 系统又是开放的,可以整合其它调度器。Falcon process 调度流程如图 5 所示。
图 5
本示例中,只使用 Falcon 的 process 功能,调用前面定义的 Oozie 工作流定期自动执行。
我的实验环境用的是 HDP2.5.0,在安装之时就已经配置并启动了 Falcon 服务。
(1)建立目录
- hdfs dfs -mkdir /apps/falcon/primaryCluster
- hdfs dfs -mkdir /apps/falcon/primaryCluster/staging
- hdfs dfs -mkdir /apps/falcon/primaryCluster/working
(2)修改属主
- hdfs dfs - chown - R falcon: users / apps / falcon
- /**/
(3)修改权限
- hdfs dfs -chmod -R 777 /apps/falcon/primaryCluster/staging
- hdfs dfs -chmod -R 755 /apps/falcon/primaryCluster/working
Falcon 里的 Cluster 定义集群上各种资源的缺省访问点,还定义 Falcon 作业使用的缺省工作目录。在 Falcon Web UI 中,点击 Create -> Cluster,在界面中填写 Cluster 相关信息,我的定义如下。
其它属性使用缺省值,所有信息确认后保存 Cluster 定义。 创建 Falcon Cluster 可以参考 "CREATE A FALCON CLUSTER"。
在 Falcon Web UI 中,点击 Create -> Process,在界面中填写 Process 相关信息,我的定义如下。
在 Oozie 的 workflow.xml 中使用了 ${jobTracker}、${nameNode}、${focusNodeLogin}、${myScript} 等形式参数。当工作流被 Oozie 执行时,所有形参都必须提供具体的值。这些值在创建 process 时的 ADVANCED OPTIONS -> Properties 指定。我的配置如图 6 所示。所有信息确认后保存 process 定义。
图 6 定义 Falcon Process 可以参考 "DEFINE AND PROCESS DATA PIPELINES IN HADOOP WITH APACHE FALCON"。
首次执行 process 前,先将 Sqoop 的目标数据目录改为完全读写模式,否则可能报权限错误。这是初始化性质的一次性操作,之后不再需要这步。
- su - hdfs -c 'hdfs dfs -chmod -R 777 /data/ext'
等到下午一点开始第一次执行 RegularETL Process,之后每半小时执行一次。Falcon 的执行结果如图 7 所示。
图 7 在 Oozie Web UI 可以看到,Falcon 在 Oozie 中自动创建了 Workflow Job、Coordinator Job 和 Bundle Job,分别如图 8、图 9、图 10 所示。
图 8
图 9
图 10
来源: http://blog.csdn.net/wzy0623/article/details/72476848