# celery task
前言
讨论一个定时任务, 一般而言, 需要的功能如下:
封装成对象, 独立执行;
对象有一些接口, 便于了解它的状态;
定时调用;
行为控制, 包括重试, 成功 / 失败回调等;
下面分别介绍 celery 的这些功能实现.
1.task basic
celery 的 task 基础类是 tasks.Task()
1.1 bound tasks
绑定代表第一个参数默认是 self
- logger = get_task_logger(__name__)
- @task(bind=True)
- def add(self, x, y):
- logger.info(self.request.id)
1.2 Task 类继承
需要注意的是声明的位置, 是在把方法修饰成 Task 类时声明.
- @App.task(base=MyTask)
- def add(x, y):
- #raise KeyError
- return x + y
- 1.3 names
每个 task 实例都有一个非重复的名字, 譬如下例:
- @App.task(name='tasks.mul')
- def mul(x, y):
一般不必要使用这一功能, 特别是在 task 方法放在单独的 module 中时, 默认 name 就是 module name + 方法名 (celery_tasks.mul).
尽量不要把任务模块命名为 tasks.py, 命名为 celery_1.py 更好一些.
1.4 其它属性
Task.max_retries 最大重试次数
Task.default_retry_delay 默认重试等待时间.
Task.ignore_result 抛弃结果, 意味着不能通过 AsyncResult 查看结果.
2.task 自定义任务行为
文档: http://docs.celeryproject.org/en/latest/userguide/tasks.html#custom-task-classes
主要有四种, 包括失败 / 成功 / 重试 / 完成
- on_failure on_success on_retry after_return
- # celery_tasks.py
- class MyTask(Task):
- def on_success(self, retval, task_id, args, kwargs):
- print 'task done: {0}'.format(retval)
- return super(MyTask, self).on_success(retval, task_id, args, kwargs)
- def on_failure(self, exc, task_id, args, kwargs, einfo):
- print 'task fail, reason: {0}'.format(exc)
- return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)
- @App.task(base=MyTask)
- def add(x, y):
- return x + y
- 2.1 retry
App.Task.retry() 是实现重试的方法.
- # an example of using retry:
- @App.task(bind=True)
- def send_twitter_status(self, OAuth, tweet):
- try:
- Twitter = Twitter(OAuth)
- Twitter.update_status(tweet)
- except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
- raise self.retry(exc=exc)
可以指定重试间隔时间, 默认为 180 秒, 下面案例指定为 1800 秒.
- @App.task(bind=True, default_retry_delay=30 * 60) # retry in 30 minutes.
- def add(self, x, y):
关于最大重试次数等参数是在 task 实例中指定.
3. 定时执行 periodic task
文档: http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html
celery beat is a scheduler.
默认的参数源自 beat_schedule.
3.1 时区
调度器默认使用 UTC 时区, 当然需要修改:
timezone = 'Europe/London'
或 App.conf.timezone = 'Europe/London'
3.2 entries
有两种添加定时任务的方式, 装饰器和配置文件.
常用配置文件方式.
3.2.1 从配置文件中读取定时任务
- celery_config.py # 配置文件
- !/usr/bin/env python
- coding:utf-8
- """
- celery configure
- """author ='sss'
- from future import absolute_import
- from celery.schedules import crontab
- from datetime import timedelta
使用 Redis 存储任务队列及结果
- broker_url = 'redis://:[email protected]:6379/0'
- result_backend = 'redis://:[email protected]:6379/1'
- task_serializer = 'json'
- result_serializer = 'json'
- accept_content = ['json']
时区
timezone = 'Asia/Shanghai'
celery 默认开启自己的日志
False 表示不关闭
worker_hijack_root_logger = False
存储结果过期时间, 过期后自动删除
单位为秒
result_expires = 60 * 60 * 24
导入任务所在文件
- imports = [
- 'celery_tasks',]
定时任务配置
- beat_schedule = {
- 'test1': {
- # 具体需要执行的函数
- # 该函数必须要使用 @App.task 装饰
- 'task': 'celery_tasks.test1_run',
- # 定时时间
- # 每分钟执行一次, 不能为小数
- 'schedule': crontab(minute='/1'),
- # 或者这么写, 每小时执行一次
- # "schedule": crontab(minute=0, hour="/1")
- # 执行的函数需要的参数
- 'args': ()
- },
- 'test2': {
- 'task': 'celery_tasks.test2_run',
- # 设置定时的时间, 10 秒一次
- 'schedule': timedelta(seconds=10),
- 'args': ()
- }
- }
- celery_workder.py # 主文件
- from future import absolute_import
拒绝隐式引入, 如果 celery.py 和 celery 模块名字一样, 避免冲突, 需要加上这条语句
该代码中, 名字是不一样的, 最好也要不一样
- celery test -- worker
- from celery import Celery
- def create_worker():
- # App = Celery('tasks', broker=di)
- '''app = Celery('tasks',
- #backend=di_backend,
- broker=di_broker,
- include=['celery_tasks'])
- '''
- App = Celery()
- #App.conf.update(result_expires=3600,)
- App.config_from_object('celery_config')
- return App
- App = create_worker()
- celery_tasks.py # task 方法文件
- from celery_worker import App
- from celery.task import Task
- import time
tasks.py
- class MyTask(Task):
- def on_success(self, retval, task_id, args, kwargs):
- print('task done: {0}'.format(retval))
- return super(MyTask, self).on_success(retval, task_id, args, kwargs)
- def on_failure(self, exc, task_id, args, kwargs, einfo):
- print('task fail, reason: {0}'.format(exc))
- return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)
- @App.task(base=MyTask)
- def add(x, y):
- #raise KeyError
- print('wwww')
- return x + y
- @App.task
- def mul(x, y):
- return x * y
- @App.task
- def xsum(numbers):
- return sum(numbers)
- @App.task
- def test(arg):
- print(arg)
- def test11():
- time.sleep(1)
- print('test11')
.... 略
运行 - 发布任务
celery -A celery_worker beat
运行 - 执行任务
celery -A celery_worker -l info -P eventlet
3.2.2 装饰器添加任务 - 动态添加
在上一章节中给出的案例是在配置文件中写入参数 beat_schedule;
有时这样不太方便, 需要更灵活的添加定时任务;
- def setup_periodic_tasks(sender, **kwargs):
- # 每 5s 调用 test('hello')
- sender.add_periodic_task(5.0, test.s('hello'), name='add every 5')
- # 每 20s 调用 test('world')
- sender.add_periodic_task(10.0, test.s('world'), expires=7)
- # 每周一早上 7:30 执行 test('Happy Mondays!')
- sender.add_periodic_task(
- crontab(hour=7, minute=30, day_of_week=1), # 可灵活修改
- test.s('Happy Mondays!'),
- )
- setup_periodic_tasks(App)
- print('定时任务列表', App.conf.beat_schedule)
执行命令 celery -A celery_test beat -l debug
可以比较一下定时任务列表的输出.
没有添加任务:
(vir_venv) E:\python\code_2>celery -A celery_test beat -l debug
定时任务列表 {}
celery beat v4.3.0 (rhubarb) is starting.
添加任务之后:
(vir_venv) E:\python\code_2>celery -A celery_test beat -l debug
定时任务列表 # 已格式化
- {
- "add every 5": {
- "schedule": 5.0,
- "task": "celery_tasks.test",
- "args": [
- "hello"
- ],
- "kwargs": {
- },
- "options": {
- }
- },
- "celery_tasks.test('world')": {
- "schedule": 10.0,
- "task": "celery_tasks.test",
- "args": [
- "world"
- ],
- "kwargs": {
- },
- "options": {
- "expires": 7
- }
- },
- "celery_tasks.test('Happy Mondays!')": {
- "schedule": "",
- "task": "celery_tasks.test",
- "args": [
- "HappyMondays!"
- ],
- "kwargs": {
- },
- "options": {
- }
- }
- }
相关启动命令:
- celery -A celery_worker worker -l info -P eventlet --logfile=c.log
- celery -A celery_test beat -l debug
4. celery 相关命令
发布任务
celery -A celery_task beat
执行任务
celery -A celery_task worker -l info -P eventlet
后台启动 celery worker 进程
celery multi start work_1 -A appcelery
停止 worker 进程, 如果无法停止, 加上 - A
celery multi stop WORKNAME
重启 worker 进程
celery multi restart WORKNAME
查看进程数
celery status -A celery_task
5. 指定时间格式
复杂的定时功能可以使用 crontab 功能, 它跟 Linux 自带的 crontab 所支持的格式是一样的, 非常方便定制任务执行时间.
- from celery.schedules import crontab
- App.conf.beat_schedule = {
- # Executes every Monday morning at 7:30 a.m.
- 'add-every-monday-morning': {
- 'task': 'tasks.add',
- 'schedule': crontab(hour=7, minute=30, day_of_week=1),
- 'args': (16, 16),
- },
- }
上面的案例是在每个周一的 7:30 执行 tasks.add 任务.
来源: http://www.bubuko.com/infodetail-3216396.html