管理 Hadoop 作业的官方工作流程调度程序是 Apache Oozie. 与许多其他 Hadoop 产品一样, Oozie 是用 Java 编写的, 是基于服务器的 web 应用程序, 它运行执行 Hadoop MapReduce 和 Pig 的工作流作业. Oozie 工作流是在 xml 文档中指定的控制依赖性指导非循环图 (DAG) 中排列的动作集合. 虽然 Oozie 在 Hadoop 社区中有很多支持, 但通过 xml 属性配置工作流和作业的学习曲线非常陡峭.
Luigi 是 Spotify 创建的 Python 替代方案, 可以构建和配置复杂的批处理作业管道. 它处理依赖项解析, 工作流管理, 可视化等等. 它还拥有庞大的社区, 并支持许多 Hadoop 技术. 在 GitHub 上超过 1 万星.
本章介绍 Luigi 的安装和工作流程的详细说明.
安装
pip install luigi
工作流
在 Luigi 中, 工作流由一系列操作组成, 称为任务. Luigi 任务是非特定的, 也就是说, 它们可以是任何可以用 Python 编写的东西. 任务的输入和输出数据的位置称为目标(target). 目标通常对应于磁盘上, HDFS 上或数据库中的文件位置. 除了任务和目标之外, Luigi 还利用参数来自定义任务的执行方式.
任务
任务是构成 Luigi 工作流的操作序列. 每个任务都声明其依赖于其他任务创建的目标. 这样 Luigi 能够创建依赖链.
图片. PNG
目标
目标是任务的输入和输出. 最常见的目标是磁盘上的文件, HDFS 中的文件或数据库中的记录. Luigi 包装了底层文件系统操作, 以确保与目标的交互是原子的. 这允许从故障点重放工作流, 而不必重放任何已经成功完成的任务.
参数
参数允许通过允许值从命令行, 以编程方式或从其他任务传递任务来自定义任务. 例如, 任务输出的名称可以通过参数传递给任务的日期来确定.
参考资料
python 测试开发项目实战 - 目录 https://china-testing.github.io/practices.html
python 工具书籍下载 - 持续更新 https://china-testing.github.io/python_books.html
python 3.7 极速入门教程 - 目录 https://china-testing.github.io/python3_quick.html
讨论 qq 群 630011153 144081101
- #!/usr/bin/env python
- # 项目实战讨论 QQ 群 630011153 144081101
- # https://github.com/china-testing/python-api-tesing
- import luigi
- class InputFile(luigi.Task):
- """
- A task wrapping a Target
- """
- input_file = luigi.Parameter()
- def output(self):
- """
- Return the target for this task
- """
- return luigi.LocalTarget(self.input_file)
- class WordCount(luigi.Task):
- """
- A task that counts the number of words in a file
- """
- input_file = luigi.Parameter()
- output_file = luigi.Parameter(default='/tmp/wordcount')
- def requires(self):
- """ The task's dependencies:
- """
- return InputFile(self.input_file)
- def output(self):
- """ The task's output
- """
- return luigi.LocalTarget(self.output_file)
- def run(self):
- """ The task's logic
- """
- count = {}
- ifp = self.input().open('r')
- for line in ifp:
- for Word in line.strip().split():
- count[Word] = count.get(Word, 0) + 1
- ofp = self.output().open('w')
- for k, v in count.items():
- ofp.write('{}\t{}\n'.format(k, v))
- ofp.close()
- if __name__ == '__main__':
- luigi.run()
- $ python wordcount.py WordCount --local-scheduler --input-file /home/hduser_/input2.txt --output-file /tmp/wordcount2.txt
- DEBUG: Checking if WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt) is complete
- DEBUG: Checking if InputFile(input_file=/home/hduser_/input2.txt) is complete
- INFO: Informed scheduler that task WordCount__home_hduser__in__tmp_wordcount2__a94efba0f2 has status PENDING
- INFO: Informed scheduler that task InputFile__home_hduser__in_0eced493f7 has status DONE
- INFO: Done scheduling tasks
- INFO: Running Worker with 1 processes
- DEBUG: Asking scheduler for work...
- DEBUG: Pending tasks: 1
- INFO: [pid 21592] Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) running WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)
- INFO: [pid 21592] Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) done WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)
- DEBUG: 1 running tasks, waiting for next task to finish
- INFO: Informed scheduler that task WordCount__home_hduser__in__tmp_wordcount2__a94efba0f2 has status DONE
- DEBUG: Asking scheduler for work...
- DEBUG: Done
- DEBUG: There are no more tasks to run at this time
- INFO: Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) was stopped. Shutting down Keep-Alive thread
- INFO:
- ===== Luigi Execution Summary =====
- Scheduled 2 tasks of which:
- * 1 complete ones were encountered:
- - 1 InputFile(input_file=/home/hduser_/input2.txt)
- * 1 ran successfully:
- - 1 WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)
- This progress looks :) because there were no failed tasks or missing dependencies
- ===== Luigi Execution Summary =====
- hduser_@andrew-PC:/home/andrew/code/HadoopWithPython/python/Luigi$ cat /tmp/wordcount2.txt
- jack 2
- be 2
- nimble 1
- quick 1
来源: http://www.jianshu.com/p/bcf9daa0a012