Celery
Celery 是一个异步任务框架, 是一个独立运行的服务.(内置 socket)
相当于一个生产者消费者模型的任务队列.
拥有高可用, 异步, 简易, 等特点.
celery 是一个独立的 socket
官网
Celery 官网: http://www.celeryproject.org/
Celery 官方文档英文版: http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版: http://docs.jinkan.org/docs/celery/
Celery 架构
Celery 的架构由三部分组成, 消息中间件 (message broker), 任务执行单元(worker) 和 任务执行结果存储 (backend)(task result store) 组成.
RabbitMQ 异步消息队列, 可编程
Redis 数据库
消息中间件
Celery 本身不提供消息服务, 但是可以方便的于第三方提供的消息中间件集成. 包括 RabbitMQ,Redis 等.
任务执行单元
Worker 是 Celery 提供的任务执行的单元. worker 并发的运行在系统节点中.
任务结果存储
Task result store 用来存储 Worker 执行的任务的结果, Celery 支持以不同方式存储任务的结果, 包括 AMQP, Redis 等, 一般需要支持过期时间.
使用场景
异步执行 delay(): 解决耗时任务
- # 添加任务
- # result = add.delay(20, 30)
- # print(result.id)
)创建 Celery 框架对象 App, 配置 broker 和 backend, 得到的 App 就是 worker
)给 worker 对应的 App 添加可处理的任务函数
)启动 celery 服务, 运行 worker
)书写添加任务的脚本, 执行脚本添加任务到 broker,worker 会自己异步从 broker 中拿任务执行, 执行结果放在 backend 中
) 书写获取任务结果的脚本, 明确任务 id 与执行的 App, 获取任务结果
延迟执行: 解决延迟任务
- # 添加延迟任务
- from datetime import datetime, timedelta
- result = add.apply_async(args=(10, 20), eta=datetime.utcnow() + timedelta(seconds=10))
- print(result)
)创建 Celery 框架对象 App, 配置 broker 和 backend, 得到的 App 就是 worker
)给 worker 对应的 App 添加可处理的任务函数, 用 include 配置给 worker 的 App
)启动 celery 服务, 运行 worker
)书写添加任务的脚本, 执行脚本添加任务到 broker,worker 会自己异步从 broker 中拿任务执行, 执行结果放在 backend 中
) 书写获取任务结果的脚本, 明确任务 id 与执行的 App, 获取任务结果
定时执行: 解决周期 (周期) 任务
- # 时区
- App.conf.timezone = 'Asia/Shanghai'
- # 是否使用
- UTCapp.conf.enable_utc = False
- # 定时任务配置
- from datetime import timedelta
- from celery.schedules import crontab
- App.conf.beat_schedule = {
- 'add-task': {
- 'task': 'celery_task.tasks.add',
- 'schedule': timedelta(seconds=3),
- # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
- 'args': (20, 10),
- },
- 'low-task': {
- 'task': 'celery_task.tasks.low',
- 'schedule': timedelta(seconds=6),
- 'args': (20, 10), },
- }
)创建 Celery 框架对象 App, 配置 broker 和 backend, 得到的 App 就是 worker
)给 worker 对应的 App 添加可处理的任务函数, 用 include 配置给 worker 的 App
)完成提供的任务的定时配置 App.conf.beat_schedule
)启动 celery 服务, 运行 worker, 执行任务
)启动 beat 服务, 运行 beat, 添加任务
Celery 安装与配置
安装
pip install celery
消息中间件
RaddbiMQ/Redis
实例化对象
App = Celert('任务名',broker="xxx",backend="xxx")
Celery 执行异步任务
包架构封装
project
├── celery_task # celery 包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery 连接和配置相关文件, 且名字必须交 celery.py
│ └── tasks.py # 所有任务函数
├── add_task.py # 添加任务
└── get_result.py # 获取结果
异步执行
celery.py
- # 1)创建 App + 任务
- # 2)启动 celery(App)服务:
- # 非 Windows
- # 命令: celery worker -A celery_task -l info
- # Windows:
- # pip3 install eventlet
- # celery worker -A celery_task -l info -P eventlet
- # 3)添加任务: 手动添加, 要自定义添加任务的脚本, 右键执行脚本
- # 4)获取结果: 手动获取, 要自定义获取任务的脚本, 右键执行脚本
- from celery import Celery
- #broker
- broker = 'redis://127.0.0.1:6379/1'
- #backend
- backend = 'redis://127.0.0.1:6379/2'
- #worker
- App = Celery(broker=broker, backend=backend, include=['celery_task.tasks']) #include 是任务 tasks
task.py
- from .celery import App
- import time
- @App.task
- def add(n, m):
- print(n)
- print(m)
- time.sleep(10)
- print('n+m 的结果:%s' % (n + m))
- return n + m
- @App.task
- def low(n, m):
- print(n)
- print(m)
- print('n-m 的结果:%s' % (n - m))
- return n - m
add_task.py
- from celery_task import tasks
- # 添加立即执行任务
- t1 = tasks.add.delay(10, 20)
- t2 = tasks.low.delay(100, 50)
- print(t1.id)
- # 添加延迟任务
- from datetime import datetime, timedelta
- eta=datetime.utcnow() + timedelta(seconds=10)
- tasks.low.apply_async(args=(200, 50), eta=eta)
get_result.py
- from celery_task.celery import App
- from celery.result import AsyncResult
- id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
- if __name__ == '__main__':
- async = AsyncResult(id=id, App=App)
- if async.successful():
- result = async.get()
- print(result)
- elif async.failed():
- print('任务失败')
- elif async.status == 'PENDING':
- print('任务等待中被执行')
- elif async.status == 'RETRY':
- print('任务异常后正在重试')
- elif async.status == 'STARTED':
- print('任务已经开始被执行')
定时任务
celery.py
- # 1)创建 App + 任务
- # 2)启动 celery(App)服务:
- # 非 Windows
- # 命令: celery worker -A celery_task -l info
- # Windows:
- # pip3 install eventlet
- # celery worker -A celery_task -l info -P eventlet
- # 3)添加任务: 自动添加任务, 所以要启动一个添加任务的服务
- # 命令: celery beat -A celery_task -l info
- # 4)获取结果
- from celery import Celery
- broker = 'redis://127.0.0.1:6379/1'
- backend = 'redis://127.0.0.1:6379/2'
- App = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
- # 时区
- App.conf.timezone = 'Asia/Shanghai'
- # 是否使用 UTC
- App.conf.enable_utc = False
- # 任务的定时配置
- from datetime import timedelta
- from celery.schedules import crontab
- App.conf.beat_schedule = {
- 'low-task': {
- 'task': 'celery_task.tasks.low',
- 'schedule': timedelta(seconds=3),
- # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
- 'args': (300, 150),
- }
- }
tasks.py
- from .celery import App
- import time
- @App.task
- def add(n, m):
- print(n)
- print(m)
- time.sleep(10)
- print('n+m 的结果:%s' % (n + m))
- return n + m
- @App.task
- def low(n, m):
- print(n)
- print(m)
- print('n-m 的结果:%s' % (n - m))
- return n - m
get_result.py
- from celery_task.celery import App
- from celery.result import AsyncResult
- id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
- if __name__ == '__main__':
- async = AsyncResult(id=id, App=App)
- if async.successful():
- result = async.get()
- print(result)
- elif async.failed():
- print('任务失败')
- elif async.status == 'PENDING':
- print('任务等待中被执行')
- elif async.status == 'RETRY':
- print('任务异常后正在重试')
- elif async.status == 'STARTED':
- print('任务已经开始被执行')
来源: http://www.bubuko.com/infodetail-3375259.html