一, 介绍
celery 是一个基于 python 开发的分布式异步消息任务队列, 用于处理大量消息, 同时为操作提供维护此类系统所需的工具.
它是一个任务队列, 专注于实时处理, 同时还支持任务调度. 如果你的业务场景中需要用到异步任务, 就可以考虑使用 celery
二, 实例场景
1, 你想对 100 台机器执行一条批量命令, 可能会花很长时间 , 但你不想让你的程序等着结果返回, 而是给你返回 一个任务 ID, 你过一段时间只需要拿着这个任务 id 就可以拿到任务执行结果, 在任务执行 ing 进行时, 你可以继续做其它的事情.
2, 你想做一个定时任务, 比如每天检测一下你们所有客户的资料, 如果发现今天 是客户的生日, 就给他发个短信祝福
三, 优点
1, 简单: 一但熟悉了 celery 的工作流程后, 配置和使用还是比较简单的
2, 高可用: 当任务执行失败或执行过程中发生连接中断, celery 会自动尝试重新执行任务
3, 快速: 一个单进程的 celery 每分钟可处理上百万个任务
4, 灵活: 几乎 celery 的各个组件都可以被扩展及自定制
四, 入门
celery 需要一个解决方案来发送和接受消息, 通常, 这是以称为消息代理的单独服务的形式出现的
有以下几种解决方案, 包括:
一: RabbitMQ(消息队列, 一种程序之间的通信方式)
rabbitmq 功能齐全, 稳定, 耐用且易于安装. 它是生产环境的绝佳选择.
如果您正在使用 Ubuntu 或 Debian, 请执行以下命令安装 RabbitMQ:
$ sudo apt-get install rabbitmq-server
命令完成后, 代理已经在后台运行, 准备为您移动消息:.Starting rabbitmq-server: SUCCESS
二, Redis
Redis 功能齐全, 但在突然中止或者电源故障时更容易丢失数据
五, 安装
$ pip install celery
六, 应用
创建一个 tasks.py 文件
- from celery import Celery
- App = Celery('tasks', broker='pyamqp://[email protected]//')
- @App.task
- def add(x, y):
- return x + y
第一个参数 Celery 是当前模块的名称. 只有在__main__模块中定义任务时才能自动生成名称.
第二个参数是 broker 关键字参数, 指定要使用的消息代理的 URL. 这里使用 RabbitMQ(也是默认选项).
您可以使用 RabbitMQ amqp://localhost, 或者您可以使用 Redis Redis://localhost.
您定义了一个名为 add 的任务, 返回两个数字的总和.
- from __future__ import absolute_import
- import os
- from celery import Celery
- from django.conf import settings
- # set the default Django settings module for the 'celery' program.
- os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'saruman_server.settings')
- App = Celery('saruman_server')
- # Using a string here means the worker will not have to
- # pickle the object when using Windows.
- App.config_from_object('django.conf:settings')
- App.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
- @App.task(bind=True)
- def debug_task(self):
- print('Request: {0!r}'.format(self.request))
七, 运行 celery 工作服务器
您现在可以通过使用 worker 参数执行我们的程序来运行 worker :
celery -A tasks worker --loglevel=info
有关可用命令行选项的完整列表, 请执行以下操作:
$ celery worker --help
还有其他几个可用的命令, 也可以提供帮助:
$ celery help
八, 调用任务
要调用我们的任务, 您可以使用该 delay()方法.
apply_async() 可以更好地控制任务执行
- >>> from tasks import add
- >>> add.delay(4, 4)
调用任务会返回一个 AsyncResult 实例. 这可用于检查任务的状态, 等待任务完成, 或获取其返回值(或者如果任务失败, 则获取异常和回溯).
九, 保持结果
如果您想跟踪任务的状态, Celery 需要在某处存储或发送状态. 有几个内置的结果后端可供选择: SQLAlchemy / Django ORM, Memcached,Redis,RPC(RabbitMQ / AMQP), 以及 - 或者您可以定义自己的.
在本例中, 我们使用 rpc 结果后端, 它将状态作为瞬态消息发回. 后端通过 backend 参数 指定 Celery
App = Celery('tasks', backend='rpc://', broker='pyamqp://')
或者, 如果您想使用 Redis 作为结果后端, 但仍然使用 RabbitMQ 作为消息代理(一种流行的组合):
App = Celery('tasks', backend='redis://localhost', broker='pyamqp://')
现在配置了结果后端, 让我们再次调用该任务. 这次你将保持 AsyncResult 调用任务时返回的实例:
>>> result = add.delay(4, 4)
该 ready()方法返回任务是否已完成处理:
- >>> result.ready()
- False
十, 配置
与消费类电器一样, celery 不需要太多配置即可运行. 它有一个输入和一个输出. 输入必须连接代理, 输出可以
选择到结果后端.
可以直接在应用程序上或使用专用配置模块设置配置. 例如, 您可以通过更改 task_serializer 设置来配置用于序列化任务有效负载的默认序列化程序:
App.conf.task_serializer = 'json'
如果您一次配置了许多设置, 则可以使用 update:
- App.conf.update(
- task_serializer='json',
- accept_content=['json'], # Ignore other content
- result_serializer='json',
- timezone='Europe/Oslo',
- enable_utc=True,
- )
对于大型项目, 建议使用专用配置模块. 不鼓励硬编码周期性任务间隔和任务路由选项. 将它们保存在集中位置要好得多. 对于库来说尤其如此, 因为它使用户能够控制其任务的行为方式. 集中配置还允许您的 SysAdmin 在发生系统故障时进行简单的更改.
您可以通过调用 App.config_from_object()方法告诉 Celery 实例使用配置模块:
App.config_from_object('celeryconfig')
此模块通常称为 "celeryconfig", 但您可以使用任何模块名称.
在上面的例子中, 一个名为的模块 celeryconfig.py 必须可以从当前目录或 Python 路径加载. 它可能看起来像这样:
celeryconfig.py:
- broker_url = 'pyamqp://'
- result_backend = 'rpc://'
- task_serializer = 'json'
- result_serializer = 'json'
- accept_content = ['json']
- timezone = 'Europe/Oslo'
- enable_utc = True
- from datetime import timedelta
- import djcelery
- djcelery.setup_loader()
- BROKER_URL = 'amqp://[email protected]//' #输入
- CELERY_RESULT_BACKEND = 'amqp://[email protected]//' #返回的结果
- #导入指定的任务模块
- CELERY_IMPORTS = (
- 'fir.app.fir.tasks',
- )
- CELERYBEAT_SCHEDULE = {
- 'receive_mail': {
- "task": "fir.app.fir.tasks.receive_mail",
- "schedule": timedelta(seconds=5),
- "args": (),
- },
- }
- View Code
要验证配置文件是否正常工作且不包含任何语法错误, 您可以尝试导入它:
- ####################################################
- python -m celeryconfig
为了演示配置文件的强大功能, 您可以将行为不当的任务路由到专用队列:
- celeryconfig.py:
- task_routes = {
- 'tasks.add': 'low-priority',
- }
或者不是路由它, 而是可以对任务进行速率限制, 这样在一分钟 (10 / m) 内只能处理 10 种此类任务:
- celeryconfig.py:
- task_annotations = {
- 'tasks.add': {
- 'rate_limit': '10/m'
- }
- }
如果您使用 RabbitMQ 或 Redis 作为代理, 那么您还可以指示工作人员在运行时为任务设置新的速率限制:
- $ celery -A tasks control rate_limit tasks.add 10/m
- [email protected]: OK
- new rate limit set successfully
十一, 在项目中如何使用 celery
1, 可以把 celery 配置成一个应用
2, 目录结构如下:
proj/__init__.py
/celery.py
/tasks.py
3,proj/celery.py 内容
- from __future__ import absolute_import, unicode_literals
- from celery import Celery
- App = Celery('proj',
- broker='amqp://',
- backend='amqp://',
- include=['proj.tasks'])
- # Optional configuration, see the application user guide.
- App.conf.update(
- result_expires=3600,
- )
- if __name__ == '__main__':
- App.start()
4,proj/tasks.py 中的内容
- from __future__ import absolute_import, unicode_literals
- from .celery import App
- @App.task
- def add(x, y):
- return x + y
- @App.task
- def mul(x, y):
- return x * y
- @App.task
- def xsum(numbers):
- return sum(numbers)
5, 启动 worker
$ celery -A proj worker -l info
输出
- -------------- [email protected] v4.0.2 (latentcall)
- ---- **** -----
- --- * *** * -- Darwin-15.6.0-x86_64-i386-64bit 2017-01-26 21:50:24
- -- * - **** ---
- - ** ---------- [config]
- - ** ---------- .> App: proj:0x103a020f0
- - ** ---------- .> transport: Redis://localhost:6379//
- - ** ---------- .> results: Redis://localhost/
- - *** --- * --- .> concurrency: 8 (prefork)
- -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
- --- ***** -----
- -------------- [queues]
- .> celery exchange=celery(direct) key=celery
django 中使用 celery: 参考链接: http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery-with-django
十二, 监控工具 flower
如果有些任务出现问题, 可以用 flower 工具监控(基于 tornado)
安装: pip install flower
使用:
三种启动方式
- celery flower
- celery flower --broker
- python manage.py celery flower #就能读取到配置里的 broker_url 默认是 rabbitmq
打开运行后的链接
打开 worker
python manage.py celery worker -l info
来源: http://www.bubuko.com/infodetail-3382723.html