Airflow1.10.4 介绍与安装
现在是 9102 年, 8 月中旬. airflow 当前版本是 1.10.4.
随着公司调度任务增大, 原有的, 基于 crontab 和 MySQL 的任务调度方案已经不太合适了, 需要寻找一个可以支持分布式扩容的调度系统解决方案.
最初瞄准 azkaban 来着, 想着基于这个的二次开发. 对比功能和社区热度之后, Airflow 比较符合我们寻找的调度系统.
什么是 Airflow
Airflow 是一个以编程方式创作, 安排和监控工作流程的平台. 对比 crontab 来看, 它是一个可以定时调度任务的系统, 只不过, airflow 的调度更容易管理.
airflow 支持任务依赖 pipeline, 这是 crontab 以及 quartz 所不支持的.
airflow 调度系统和业务系统解耦. 业务单独编写流程, 支持任务热加载.
airflow 支持 crontab 定时格式
airflow 通过 Python 来定义 task, 可以实现复杂的逻辑, 支持分支条件等
airflow 有一套完整的 UI 和管理系统
airflow 有强大的插件扩展方式, 各种插件很丰富, 很容易二次开发, 添加新功能
airflow 是分布式设计, 支持水平扩容
airflow 支持 task 实例, 并支持数据业务日期 bizdate, 也叫 execution_date.
airflow 支持任务补录 backfill
airflow 支持任务之间数据传递(这个任务依赖于上个任务的变量)
airflow 支持序列执行(这个周期的任务依赖于上一个周期的执行结果是否成功)
Airflow 于 2014 年 10 月由 Airbnb 的 Maxime Beauchemin 开始. 它是第一次提交的开源, 并在 2015 年 6 月宣布正式加入 Airbnb GitHub.
该项目于 2016 年 3 月加入了 Apache Software Foundation 的孵化计划.
关于 airflow 具体使用细节, 后面再详细介绍, 这里就是一些在调度系统选型过程中查找的资料.
阿里基于 airflow 二次开发了调度平台 Maat:
基于 DAG 的分布式任务调度平台 - Maat https://yq.aliyun.com/articles/609299
阿里如何实现秒级百万 TPS? 搜索离线大数据平台架构解读
有赞基于 airflow 二次开发了大数据任务调度平台:
每日 7 千次的跨部门任务调度, 有赞怎么设计大数据开发平台? https://www.infoq.cn/article/YhzfqgL4bi*LdwG3R3Xq
Google cloud 提供了基于 airflow 的数据分析产品:
微软 Azure 支持 airflow 的运行:
当然, 这些云厂商很可能是为了让你使用他们的数据产品, 比如对象存储, lambda 等.
社区异常活跃, star 破万, 更新频繁, Apache 背书. 据说作者早期在 Facebook 搞过一套调度系统, 到 airbnb 就开源了 airflow. 大公司背书.
Slack 群组也很活跃
虽然是 Python 开发的, 我也没玩过 Python web, 但调研结果就是: 用 Airflow 吧.
Airflow 的安装
官方文档有非常详细的安装教程. 这里不再赘述.
想要记录的是基于 docker 安装 airflow, 以及做了一些特定的修改.
最终 docker 镜像为: https://github.com/Ryan-Miao/docker-airflow
使用方式很简单:
clone 项目
构建 airflow 镜像
make build
启动
docker-compose -f docker-compose-CeleryExecutor.YAML up -d
浏览器访问 localhost:8089 可以查看 dag
浏览器访问 localhost:5555 可以查看 worker
扩容 3 个 worker
docker-compose -f docker-compose-CeleryExecutor.YAML scale worker=3
所做的一些修改
修改时区为 utc+8
Docker 容器的时区
- ENV LANGUAGE zh_CN.UTF-8
- ENV LANG zh_CN.UTF-8
- ENV LC_ALL zh_CN.UTF-8
- ENV LC_CTYPE zh_CN.UTF-8
- ENV LC_MESSAGES zh_CN.UTF-8
- sed -i 's/^# zh_CN.UTF-8 UTF-8$/zh_CN.UTF-8 UTF-8/g' /etc/locale.gen \
- && locale-gen \
- /bin/cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo 'Asia/Shanghai'>/etc/timezone
Web server ui 显示的时区, 以及任务运行时的 ds 等时区:
参考 https://blog.csdn.net/Crazy__Hope/article/details/83688986,
- airflow.cfg
- default_timezone = Asia/Shanghai
- /usr/local/lib/python3.7/site-packages/airflow/utils/timezone.py
在 utc = pendulum.timezone('UTC') 这行 (第 27 行) 代码下添加,
- from airflow import configuration as conf
- try:
- tz = conf.get("core", "default_timezone")
- if tz == "system":
- utc = pendulum.local_timezone()
- else:
- utc = pendulum.timezone(tz)
- except Exception:
- pass
修改 utcnow()函数 (在第 69 行)
原代码 d = dt.datetime.utcnow()
修改为 d = dt.datetime.now()
/usr/local/lib/python3.7/site-packages/airflow/utils/sqlalchemy.py
在 utc = pendulum.timezone('UTC') 这行 (第 37 行) 代码下添加
- from airflow import configuration as conf
- try:
- tz = conf.get("core", "default_timezone")
- if tz == "system":
- utc = pendulum.local_timezone()
- else:
- utc = pendulum.timezone(tz)
- except Exception:
- pass
- /usr/local/lib/python3.7/site-packages/airflow/www/templates/admin/master.html
把代码 var UTCseconds = (x.getTime() + x.getTimezoneOffset()*60*1000);
改为 var UTCseconds = x.getTime();
把代码 "timeFormat":"H:i:s %UTC%",
改为 "timeFormat":"H:i:s",
webserver 查看日志, 中文乱码问题
容器编码设置没问题, 进去看日志文件也没问题, 但是 webserver 查看的时候日志中文乱码. 原因是 http 请求的 mime
没设置编码.
/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py
修改 mime
mimetype="application/json;charset=utf-8",
Hive beeline 认证
airflow 支持 beeline, 在 connection 里填写 beeline 的配置后, 使用 HiveOperator 进行 hive 操作. 我们的 hive
没有使用 kerberos, 而是 ldap 的账号密码认证. 需要对后台的 hvie 任务做认证的修改.
修改 hive_hooks.py 的认证部分即可. Dockerfile 注释掉的部分就是.
添加 hive 的支持
GitHub 的 airflow docker 没有 hive 相关的 lib. 我在 Dockerfile 里添加了 hive 的环境, 这个后面再做优化, 针对
不同的 pool, 安装不同的依赖.
ldap 配置
参见 https://www.cnblogs.com/woshimrf/p/ldap.html 配置我们的 ldap 服务.
然后修改 airflow.cfg. 找到 263 行
- authenticate = False
- # 设置为 True 并打开 ldap 即可使用 ldap 配置
- # auth_backend = airflow.contrib.auth.backends.ldap_auth
以及 518 行
- [ldap]
- # set this to ldaps://<your.ldap.server>:<port>
- uri = ldap://192.168.2.2:389
- user_filter = objectClass=inetOrgPerson
- user_name_attr = sn
- group_member_attr = memberOf
- superuser_filter = memberOf=cn=g-admin,ou=group,dc=demo,dc=com
- data_profiler_filter = memberOf=cn=g-users,ou=group,dc=demo,dc=com
- bind_user = cn=admin,dc=demo,dc=com
- bind_password = admin
- basedn = dc=demo,dc=com
- cacert =
- search_scope = SUBTREE
参考
airflow 官方文档: https://airflow.apache.org/
airflow 中文文档: http://airflow.apachecn.org/#/zh/howto/operator
airflow 源码: https://github.com/apache/airflow
airflow docker: https://github.com/Ryan-Miao/docker-airflow
来源: https://www.cnblogs.com/woshimrf/p/airflow-install-with-docker.html