airflow 1.10.0
官方: http://airflow.apache.org/
一 简介
Airflow is a platform to programmatically author, schedule and monitor workflows.
Use airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.
When workflows are defined as code, they become more maintainable, versionable, testable, and collaborative.
airflow 是一个可以通过 python 代码来编排, 调度和监控工作流的平台; 工作流是一系列 task 的 dag(directed acyclic graphs, 有向无环图);
1 集群角色
- webserver
- Web server
- scheduler
- The Airflow scheduler monitors all tasks and all DAGs, and triggers the task instances whose dependencies have been met. Behind the scenes, it spins up a subprocess, which monitors and stays in sync with a folder for all DAG objects it may contain, and periodically (every minute or so) collects DAG parsing results and inspects active tasks to see whether they can be triggered.
- worker
四种 Executor:SequentialExecutor,LocalExecutor,CeleryExecutor,MesosExecutor:
1)Airflow uses a SQLite database, which you should outgrow fairly quickly since no parallelization is possible using this database backend. It works in conjunction with the SequentialExecutor which will only run task instances sequentially.
- )LocalExecutor, tasks will be executed as subprocesses;
- )CeleryExecutor is one of the ways you can scale out the number of workers. For this to work, you need to setup a Celery backend (RabbitMQ, Redis, ...) and change your airflow.cfg to point the executor parameter to CeleryExecutor and provide the related Celery settings.
4)MesosExecutor allows you to schedule airflow tasks on a Mesos cluster.
SequentialExecutor 搭配 SQLite 库使用, LocalExecutor 使用子进程来执行任务, CeleryExecutor 需要依赖 backend 执行(比如 RabbitMQ 或 Redis),MesosExecutor 会提交任务到 mesos 集群;
2 概念
- DAG
- In Airflow, a DAG - or a Directed Acyclic Graph - is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.
dag 是一系列 task 的集合按照依赖关系组织成有向无环图, 相当于 workflow;
Operator
An operator describes a single task in a workflow. Operators are usually (but not always) atomic, meaning they can stand on their own and don't need to share resources with any other operators. The DAG will make sure that operators run in the correct certain order; other than those dependencies, operators generally run independently. In fact, they may run on two completely different machines.
operator 描述了工作流中的一个 task, 是一个抽象的概念, 相当于抽象 task 定义;
- Task
- Once an operator is instantiated, it is referred to as a "task". The instantiation defines specific values when calling the abstract operator, and the parameterized task becomes a node in a DAG.
operator 实例化 (构造函数) 之后成为 task,task 是一个具体的概念, 作为 dag 的一部分;
DAG Run
A DAG Run is an object representing an instantiation of the DAG in time.
dag run 是一个 dag 的实例对象, 相当于 workflow instance;
- Task Instance
- A task instance represents a specific run of a task and is characterized as the combination of a dag, a task, and a point in time. Task instances also have an indicative state, which could be "running", "success", "failed", "skipped", "up for retry", etc.
task 每次执行都会生成一个 task instance, 每个 task instance 都有状态, 比如 running,success,failed 等;
二 安装
ambari 安装
详见: https://www.cnblogs.com/barneywill/p/10284804.html
手工安装
1 检查 python
# python --version
2 安装 pip
- # curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
- # python get-pip.py
- pip is already installed if you are using Python 2>=2.7.9 or Python 3>=3.4 downloaded from python.org
3 安装 airflow
# pip install apache-airflow
1)如果报错:
- Complete output from command python setup.py egg_info:
- Traceback (most recent call last):
- File "<string>", line 1, in <module>
- File "/tmp/pip-install-xR3O9b/apache-airflow/setup.py", line 394, in <module>
- do_setup()
- File "/tmp/pip-install-xR3O9b/apache-airflow/setup.py", line 259, in do_setup
- verify_gpl_dependency()
- File "/tmp/pip-install-xR3O9b/apache-airflow/setup.py", line 49, in verify_gpl_dependency
- raise RuntimeError("By default one of Airflow's dependencies installs a GPL "RuntimeError: By default one of Airflow's dependencies installs a GPL dependency (unidecode). To avoid this dependency set SLUGIFY_USES_TEXT_UNIDECODE=yes in your environment when you install or upgrade Airflow. To force installing the GPL version set AIRFLOW_GPL_UNIDECODE
- ----------------------------------------
- Command "python setup.py egg_info" failed with error code 1 in /tmp/pip-install-xR3O9b/apache-airflow/
需要设置环境变量
# export SLUGIFY_USES_TEXT_UNIDECODE=yes
2)如果报错:
- psutil/_psutil_linux.c:12:20: fatal error: Python.h: No such file or directory
- #include <Python.h>
- ^
compilation terminated.
- error: command 'gcc' failed with exit status 1
- ----------------------------------------
- Command "/bin/python -u -c"import setuptools, tokenize;__file__='/tmp/pip-install-v4aq0G/psutil/setup.py';f=getattr(tokenize, 'open', open)(__file__);code=f.read().replace('\r\n', '\n');f.close();exec(compile(code, __file__, 'exec'))"install --record /tmp/pip-record-2jrZ_B/install-record.txt --single-version-externally-managed --compile" failed with error code 1 in /tmp/pip-install-v4aq0G/psutil/
需要安装
# yum install python-devel
4 设置环境变量
# export AIRFLOW_HOME=/path/to/airflow
5 验证
- # whereis airflow
- airflow: /usr/bin/airflow
- # airflow version
- ____________ _____________
- ____ |__( )_________ __/__ /________ __
- ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| //
- ___ ___ | / _ / _ __/ _ // /_/ /_ |/ |/ /
- _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
- v1.10.1
自动创建 $AIRFLOW_HOME/airflow.cfg
6 修改数据库配置
$AIRFLOW_HOME/airflow.cfg
修改如下配置
- # The SqlAlchemy connection string to the metadata database.
- # SqlAlchemy supports many different database engine, more information
- # their website
- sql_alchemy_conn = SQLite:////export/App/airflow//airflow.db
修改为 MySQL 或 postgres 连接串
MySQL://airflow:airflow@localhost:3306/airflow
7 初始化 db
# airflow initdb
8 常用命令
# airflow -h
如果报错
- No handlers could be found for logger "airflow.logging_config"
- Traceback (most recent call last):
- File "/usr/bin/airflow", line 21, in <module>
- from airflow import configuration
- File "/usr/lib/python2.7/site-packages/airflow/__init__.py", line 36, in <module>
- from airflow import settings
- File "/usr/lib/python2.7/site-packages/airflow/settings.py", line 229, in <module>
- configure_logging()
- File "/usr/lib/python2.7/site-packages/airflow/logging_config.py", line 71, in configure_logging
- raise e
- ValueError: Unable to configure handler 'task': Cannot resolve 'airflow.utils.log.file_task_handler.FileTaskHandler': cannot import name UnrewindableBodyError
重装 urllib3
- # pip uninstall urllib3
- # pip install urllib3
如果还有问题, 重装 chardet,idna,urllib3
三 使用
1 dag
dag 示例:
- from datetime import timedelta, datetime
- import airflow
- from airflow import DAG
- from airflow.operators.bash_operator import BashOperator
- from airflow.operators.python_operator import PythonOperator
- from airflow.operators.dummy_operator import DummyOperator
- default_args = {
- 'owner': 'www',
- 'depends_on_past': False,
- 'start_date': datetime(2019, 1, 25),
- 'email': ['test@cdp.com'],
- 'email_on_failure': False,
- 'email_on_retry': False,
- 'retries': 1,
- 'retry_delay': timedelta(minutes=5),
- }
- dag = DAG(
- 'hello_dag',
- default_args=default_args,
- description='hello world DAG',
- schedule_interval='*/5 * * * *'
- )
- start_operator = DummyOperator(task_id='start_task', dag=dag)
- sh_hello_operator = BashOperator(
- task_id='sh_hello_task',
- depends_on_past=False,
- bash_command='echo"hello {{ params.p }} : "`date`>> /tmp/test.txt',
- params={'p':'world'},
- dag=dag
- )
- def print_hello():
- return 'Hello world!'
- py_hello_operator = PythonOperator(
- task_id='py_hello_task',
- python_callable=print_hello,
- dag=dag)
- start_operator>> sh_hello_operator
- sh_hello_operator>> py_hello_operator
示例 dag 中包含常用的 BashOperator 和 PythonOperator, 以及 task 之间的依赖关系
页面上看起来是这样的
Airflow Python script is really just a configuration file specifying the DAG's structure as code. The actual tasks defined here will run in a different context from the context of this script. Different tasks run on different workers at different points in time, which means that this script cannot be used to cross communicate between tasks.
People sometimes think of the DAG definition file as a place where they can do some actual data processing - that is not the case at all! The script's purpose is to define a DAG object. It needs to evaluate quickly (seconds, not minutes) since the scheduler will execute it periodically to reflect the changes if any.
airflow 的 python 脚本只是定义 dag 的结构, 实际执行时每个 task 都会在不同的 worker 或者不同的 context 下执行, 所以不要在脚本中传递变量或者执行实际业务逻辑, 脚本会被 scheduler 定期执行来刷新 dag;
Airflow leverages the power of Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. Airflow also provides hooks for the pipeline author to define their own parameters, macros and templates.
dag 脚本中支持 jinja 模板, jinja 模板详见: http://jinja.pocoo.org/docs/dev/api/
参考: http://airflow.apache.org/tutorial.html#it-s-a-dag-definition-file
2 本地测试 dag 及 task 执行
- Time to run some tests. First let's make sure that the pipeline parses. Let's assume we're saving the code from the previous step in tutorial.py in the DAGs folder referenced in your airflow.cfg. The default location for your DAGs is ~/airflow/dags.
- # test your code without syntax error
- # python ~/airflow/dags/$dag.py
- # print the list of active DAGs
- # airflow list_dags
- # prints the list of tasks the dag_id
- airflow list_tasks $dag_id
- # prints the hierarchy of tasks in the DAG
- airflow list_tasks $dag_id --tree
- # test your task instance
- # airflow test $dag_id $task_id 2015-01-01
- # run your task instance
- # airflow run $dag_id $task_id 2015-01-01
- # get the status of task
- # airflow task_state $dag_id $task_id 2015-01-01
- # trigger a dag run
- # airflow trigger_dag $dag_id 2015-01-01
- # get the status of dag
- # airflow dag_state $dag 2015-01-01
- # run a backfill over 2 days
- # airflow backfill $dag_id -s 2015-01-01 -e 2015-01-02
airflow run|test 都可以执行 task, 区别是 run 会进行很多检查, 比如:
- dependency 'Trigger Rule' FAILED: Task's trigger rule'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es).
- dependency 'Task Instance State' FAILED: Task is in the 'success' state which is not a valid state for execution. The task must be cleared in order to be run.
执行 task 之后日志位于~/airflow/logs/$dag_id/$task_id / 下;
3 启动服务器
- # start the Web server, default port is 8080
- airflow webserver -p 8080
- # start the scheduler
- airflow scheduler
- # visit localhost:8080 in the browser and enable the example dag in the home page
将定义 dag 的 py 文件拷贝到 $AIRFLOW_HOME/dags / 目录下, scheduler 会自动发现和加载
[原创] 大数据基础之 Airflow(1)简介, 安装, 使用
来源: http://www.bubuko.com/infodetail-2934752.html