Airflow 的第一个 DAG
考虑了很久, 要不要记录 airflow 相关的东西, 应该怎么记录. 官方文档已经有比较详细的介绍了, 还有各种博客, 我需要有一份自己的笔记吗?
答案就从本文开始了.
本文将从一个陌生视角开始认知 airflow, 顺带勾勒出应该如何一步步搭建我们的数据调度系统.
现在是 9102 年 9 月上旬, Airflow 最近的一个版本是 1.10.5.
ps. 查资料发现自己好多文章被爬走, 换了作者. 所以, 接下里的内容会随机添加一些防伪标识, 忽略即可.
什么数据调度系统?
中台这个概念最近比较火, 其中就有一个叫做数据中台, 文章数据中台到底是什么 https://zhuanlan.zhihu.com/p/53843604 给出了一个概念.
我粗糙的理解, 大概就是: 收集各个零散的数据, 标准化, 然后服务化, 提供统一数据服务. 而要做到数据整理和处理, 必然涉及数据调度, 也就需要一个调度系统.[本文出自 Ryan Miao]
数据调度系统可以将不同的异构数据互相同步, 可以按照规划去执行数据处理和任务调度. Airflow 就是这样的一个任务调度平台.
前面 Airflow1.10.4 介绍与安装已经
安装好了我们的 airflow, 可以直接使用了. 这是第一个 DAG 任务链.
创建一个任务 Hello World
目标: 每天早上 8 点执行一个任务 -- 打印 Hello World
在 Linux 上, 我们可以在 crontab 插入一条记录:
使用 Springboot, 我们可以使用 @Scheduled(cron="0 0 8 * * ?") 来定时执行一个 method.
使用 quartz, 我们可以创建一个 CronTrigger, 然后去执行对应的 JobDetail.
- CronTrigger trigger = (CronTrigger)TriggerBuilder.newTrigger()
- .withIdentity("trigger1", "group1")
- .withSchedule(CronScheduleBuilder.cronSchedule("0 0 8 * * ?"))
- .build();
使用 Airflow, 也差不多类似.
在 docker-airflow 中, 我们将 dag 挂载成磁盘, 现在只需要在 dag 目录下编写 dag 即可.
- volumes:
- - ./dags:/usr/local/airflow/dags
创建一个 hello.py
- """
- Airflow 的第一个 DAG
- """
- from airflow import DAG
- from airflow.operators.bash_operator import BashOperator
- from datetime import datetime
- default_args = {
- "owner": "ryan.miao",
- "start_date": datetime(2019, 9, 1)
- }
- dag = DAG("Hello-World",
- description="第一个 DAG",
- default_args=default_args,
- schedule_interval='0 8 * * *')
- t1 = BashOperator(task_id="hello", bash_command="echo'Hello World, today is {{ ds }}'", dag=dag)
这是一个 Python 脚本, 主要定义了两个变量.
DAG
表示一个有向无环图, 一个任务链, 其 id 全局唯一. DAG 是 airflow 的核心概念, 任务装载到 dag 中, 封装成任务依赖链条. DAG 决定这些任务的执行规则, 比如执行时间. 这里设置为从 9 月 1 号开始, 每天 8 点执行.
TASK
task 表示具体的一个任务, 其 id 在 dag 内唯一. task 有不同的种类, 通过各种 Operator 插件来区分任务类型. 这里是一个 BashOperator, 来自 airflow 自带的插件, airflow 自带了很多拆箱即用的插件.
ds
airflow 内置的时间变量模板, 在渲染 operator 的时候, 会注入一个当前执行日期的字符串. 后面会专门讲解这个执行日期.
来源: https://www.cnblogs.com/woshimrf/p/airflow-first-dag.html