Celery
官方
Celery 官网: http://www.celeryproject.org/
Celery 官方文档英文版: http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版: http://docs.jinkan.org/docs/celery/
Celery 架构
Celery 的架构由三部分组成, 消息中间件 (message broker), 任务执行单元(worker) 和 任务执行结果存储 (task result store) 组成.
消息中间件
Celery 本身不提供消息服务, 但是可以方便的和第三方提供的消息中间件集成. 包括, RabbitMQ, Redis 等等
任务执行单元
Worker 是 Celery 提供的任务执行的单元, worker 并发的运行在分布式的系统节点中.
任务结果存储
Task result store 用来存储 Worker 执行的任务的结果, Celery 支持以不同方式存储任务的结果, 包括 AMQP, Redis 等
使用场景
异步任务: 将耗时操作任务提交给 Celery 去异步执行, 比如发送短信 / 邮件, 消息推送, 音视频处理等等
定时任务: 定时执行某件事情, 比如每天数据统计
Celery 的安装配置
pip install celery
消息中间件: RabbitMQ/Redis
App=Celery('任务名', broker='xxx', backend='xxx')
Celery 执行异步任务
包架构封装
project
├── celery_task # celery 包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery 连接和配置相关文件, 且名字必须是 celery.py
│ └── tasks.py # 所有任务函数
├── add_task.py # 添加任务
└── get_result.py # 获取结果
基本使用(添加立即执行任务)
执行流程:
1)创建 App + 任务
2)启动 celery(App)服务:
非 Windows
命令: celery worker -A celery_task -l info
- Windows:
- pip3 install eventlet
- celery worker -A celery_task -l info -P eventlet
3)添加任务: 手动添加, 要自定义添加任务的脚本, 右键执行脚本
4)获取结果: 手动获取, 要自定义获取任务的脚本, 右键执行脚本
celery.py
- from celery import Celery
- # broker: 任务仓库
- broker = 'redis://127.0.0.1:6379/5'
- # backend: 任务结果仓库
- backend = 'redis://127.0.0.1:6379/6'
- # include: 任务 (函数) 所在文件
- App = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
- tasks.py(任务文件)
- from .celery import App
- import time
- @App.task
- def add(n, m):
- print(n)
- print(m)
- time.sleep(10)
- print('n+m 的结果:%s' % (n + m))
- return n + m
- @App.task
- def low(n, m):
- print(n)
- print(m)
- print('n-m 的结果:%s' % (n - m))
- return n - m
- add_task.py(添加要执行的任务)
- # 右键执行该文件, 下面的导入环境是合理的
- from celery_task.tasks import add, low
- # 往 celery 的 Broker 中添加立即任务
- # 先启动 celery: celery worker -A celery_task -l info -P eventlet , 然后右键运行执行
- t1 = add.delay(10, 20)
- t2 = low.delay(50, 10)
- print(t2.id)
- get_result.py(查看任务结果)
- from celery_task.celery import App
- from celery.result import AsyncResult
- # 任务执行的 id, 可从上方任务执行完获取
- id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
- if __name__ == '__main__':
- async = AsyncResult(id=id, App=App)
- if async.successful():
- # 拿到任务执行完的结果
- result = async.get()
- print(result)
- elif async.failed():
- print('任务失败')
- elif async.status == 'PENDING':
- print('任务等待中被执行')
- elif async.status == 'RETRY':
- print('任务异常后正在重试')
- elif async.status == 'STARTED':
- print('任务已经开始被执行')
高级使用(执行延迟任务)
celery.py
- from celery import Celery
- # broker: 任务仓库
- broker = 'redis://127.0.0.1:6379/15'
- # backend: 任务结果仓库
- backend = 'redis://127.0.0.1:6379/15'
- # include: 任务 (函数) 所在文件
- App = Celery(broker=broker, backend=backend, include=['celery_package.tasks'])
tasks.py
- from .celery import App
- @App.task
- def jump(n1, n2):
- res = n1 * n2
- print('n1 * n2 = %s' % res)
- return res
- add_task.py(添加延迟任务)
注:
args 是 jump 任务需要的参数, 没有就设置为空()
eta 是该任务执行的 UTC 格式的时间
- from celery_package.tasks import jump
- # # 直接执行函数
- # jump(10, 20)
- # 添加 celery 立即任务
- # jump.delay(10, 20)
- from datetime import datetime, timedelta
- # 以秒为单位添加延迟时间
- def eta_second(second):
- ctime = datetime.now()
- utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
- time_delay = timedelta(seconds=second)
- return utc_ctime + time_delay
- # 以天为单位添加延迟时间
- def eta_days(days):
- ctime = datetime.now()
- utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
- time_delay = timedelta(days=days)
- return utc_ctime + time_delay
- # apply_async 就是添加延迟任务
- jump.apply_async(args=(200, 50), eta=eta_second(10))
高级使用(自动任务)
执行流程:
1)创建 App + 任务
2)启动 celery(App)服务:
非 Windows
命令: celery worker -A celery_task -l info
- Windows:
- pip3 install eventlet
- celery worker -A celery_task -l info -P eventlet
3)添加任务: 自动添加任务, 所以要启动一个添加任务的服务
命令: celery beat -A celery_task -l info
4)获取结果: 手动获取, 要自定义获取任务的脚本, 右键执行脚本
celery.py
- from celery import Celery
- broker = 'redis://127.0.0.1:6379/15'
- backend = 'redis://127.0.0.1:6379/15'
- App = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
- # 时区
- App.conf.timezone = 'Asia/Shanghai'
- # 是否使用 UTC
- App.conf.enable_utc = False
- # 自动任务的定时配置
- from celery.schedules import crontab
- from datetime import timedelta
- App.conf.beat_schedule = {
- # 定时任务: 任务名自定义
- 'fall_task': {
- 'task': 'celery_task.tasks.fall', # 任务源
- 'args': (30, 10), # 任务参数
- 'schedule': timedelta(seconds=3), # 定时添加任务的时间
- # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
- }
- }
tasks.py
- from .celery import App
- @App.task
- def fall(n1, n2):
- res = n1 / n2
- print('n1 / n2 = %s' % res)
- return res
get_result.py
- from celery_task.celery import App
- from celery.result import AsyncResult
- id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
- if __name__ == '__main__':
- async = AsyncResult(id=id, App=App)
- if async.successful():
- result = async.get()
- print(result)
- elif async.failed():
- print('任务失败')
- elif async.status == 'PENDING':
- print('任务等待中被执行')
- elif async.status == 'RETRY':
- print('任务异常后正在重试')
- elif async.status == 'STARTED':
- print('任务已经开始被执行')
django 中使用
注意点:
添加自动任务时, 需要另外启动一个添加任务的服务, 就是再起一个服务端运行下面的命令.
命令: celery beat -A celery_task -l info
celery.py
- # 加载 django 环境
- import os, django
- os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")
- django.setup()
- from celery import Celery
- # 任务仓库
- broker = 'redis://127.0.0.1:6379/15'
- # 任务结果仓库
- backend = 'redis://127.0.0.1:6379/15'
- # include 任务函数文件的位置
- App = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
- # 时区
- App.conf.timezone = 'Asia/Shanghai'
- # 是否使用 UTC
- App.conf.enable_utc = False
- # 自动任务的定时配置
- from celery.schedules import crontab
- from datetime import timedelta
- App.conf.beat_schedule = {
- # 定时任务: 任务名自定义
- 'update_banner_cache': {
- 'task': 'celery_task.tasks.update_banner_cache', # 任务源
- 'args': (), # 任务参数
- 'schedule': timedelta(seconds=10), # 定时添加任务的时间
- # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
- }
- }
tasks.py
- from .celery import App
- # 获取项目中的模型类
- from API.models import Banner
- @App.task
- def test_django_celery():
- banner_query = Banner.objects.filter(is_delete=False).all()
- print(banner_query)
来源: https://www.cnblogs.com/guapitomjoy/p/11984873.html