前段时间需要使用 rabbitmq 做写缓存, 一直使用 pika+rabbitmq 的组合, pika 这个模块虽然可以很直观地操作 rabbitmq, 但是官方给的例子太简单, 对其底层原理了解又不是很深, 遇到很多坑, 尤其是需要自己写连接池管理和 channel 池管理. 虽然也有用过 celery, 一直也是 celery+redis 的组合, 涉及很浅; 目前打算深研一下 celery+redis+rabbitmq 的使用.
celery + rabbitmq 初步
我们先不在集成框架如 flask 或 Django 中使用, 而仅仅单独使用.
简单介绍
Celery 是一个异步任务队列. 一个 Celery 安装有三个核心组件:
Celery 客户端: 用于发布后台作业. 当与 Flask 一起工作的时候, 客户端与 Flask 应用一起运行.
Celery workers: 运行后台作业的进程. Celery 支持本地和远程的 workers, 可以在 Flask 服务器上启动一个单独的 worker, 也可以在远程服务器上启动 worker, 需要拷贝代码;
消息代理: 客户端通过消息队列和 workers 进行通信, Celery 支持多种方式来实现这些队列. 最常用的代理就是 RabbitMQ 和 Redis.
安装 rabbitmq 和 redis
rabbitmq 安装和配置参考: http://www.cnblogs.com/cwp-bg/p/8397529.html
redis 的安装和配置参考: http://www.cnblogs.com/cwp-bg/p/8094914.html
redis-py 安装:
sudo pip install redis
redis-py 操作 redis 参考: http://www.cnblogs.com/cwp-bg/p/8274269.html
为了提高性能, 官方推荐使用 librabbitmq, 这是一个连接 rabbitmq 的 C++ 的库;
sudo pip install celery[librabbitmq]
初步使用
使用 redis 做结果存储, 使用 rabbitmq 做任务队列;
- # tasks.py
- from celery import Celery
- app = Celery('tasks', broker='amqp://username:passwd@ip:port/varhost',backend='redis://username:passwd@ip:6390/db')
- @app.task
- def add(x, y):
- return x + y
- if __name__ == '__main__':
- result = add.delay(30, 42)
broker: 任务队列的中间人;
backend: 任务执行结果的存储;
发生了什么事
app.task 装饰后将 add 函数变成一个异步的任务, add.delay 函数将任务序列化发送到 rabbitmq;
该过程创建一个名字为 celery 的 exchange, 类型为 direct(直连交换机); 创建一个名为 celery 的 queue, 队列和交换机使用路由键 celery 绑定;
打开 rabbitmq 管理后台, 可以看到有一条消息已经在 celery 队列中;
记住: 当有多个装饰器的时候, celery.task 一定要在最外层;
扩展
如果使用 redis 作为任务队列中间人, 在 redis 中存在两个键 celery 和 _kombu.binding.celery , _kombu.binding.celery 表示有一名为 celery 的任务队列 (Celery 默认), 而 celery 为默认队列中的任务列表, 使用 list 类型, 可以看看添加进去的任务数据.
开启 worker
在 task.py 同一个目录下执行:
celery -A tasks worker --loglevel=info
task 指的就是该 celery 任务的名字, 注意文件名 tasks.py 和创建 celery 对象的名字必须一致, 否则 wroker 启动失败;
执行完毕后结果存储在 redis 中, 查看 redis 中的数据, 发现存在一个 string 类型的键值对:
celery-task-meta-064e4262-e1ba-4e87-b4a1-52dd1418188f:data
该键值对的失效时间为 24 小时.
分析消息
这是添加到任务队列中的消息数据.
- {"body": "gAJ9cQAoWAQAAAB0YXNrcQFYGAAAAHRlc3RfY2VsZXJ5LmFkZF90b2dldGhlcnECWAIAAABpZHEDWCQAAAA2NmQ1YTg2Yi0xZDM5LTRjODgtYmM5OC0yYzE4YjJjOThhMjFxBFgEAAAAYXJnc3EFSwlLKoZxBlgGAAAAa3dhcmdzcQd9cQhYBwAAAHJldHJpZXNxCUsAWAMAAABldGFxCk5YBwAAAGV4cGlyZXNxC05YAwAAAHV0Y3EMiFgJAAAAY2FsbGJhY2tzcQ1OWAgAAABlcnJiYWNrc3EOTlgJAAAAdGltZWxpbWl0cQ9OToZxEFgHAAAAdGFza3NldHERTlgFAAAAY2hvcmRxEk51Lg==", # body 是序列化后使用 base64 编码的信息, 包括具体的任务参数, 其中包括了需要执行的方法, 参数和一些任务基本信息
- "content-encoding": "binary", # 序列化数据的编码方式
- "content-type": "application/x-python-serialize", # 任务数据的序列化方式, 默认使用 python 内置的序列化模块 pickle
- "headers": {},
- "properties":
- {"reply_to": "b7580727-07e5-307b-b1d0-4b731a796652", # 结果的唯一 id
- "correlation_id": "66d5a86b-1d39-4c88-bc98-2c18b2c98a21", # 任务的唯一 id
- "delivery_mode": 2,
- "delivery_info": {"priority": 0, "exchange": "celery", "routing_key": "celery"}, # 指定交换机名称, 路由键, 属性
- "body_encoding": "base64", # body 的编码方式
- "delivery_tag": "bfcfe35d-b65b-4088-bcb5-7a1bb8c9afd9"}}
将序列化消息反序列化
- import pickle
- import base64
- result =
- base64.b64decode('gAJ9cQAoWAQAAAB0YXNrcQFYGAAAAHRlc3RfY2VsZXJ5LmFkZF90b2dldGhlcnECWAIAAABpZHEDWCQAAAA2NmQ1YTg2Yi0xZDM5LTRjODgtYmM5OC0yYzE4YjJjOThhMjFxBFgEAAAAYXJnc3EFSwlLKoZxBlgGAAAAa3dhcmdzcQd9cQhYBwAAAHJldHJpZXNxCUsAWAMAAABldGFxCk5YBwAAAGV4cGlyZXNxC05YAwAAAHV0Y3EMiFgJAAAAY2FsbGJhY2tzcQ1OWAgAAABlcnJiYWNrc3EOTlgJAAAAdGltZWxpbWl0cQ9OToZxEFgHAAAAdGFza3NldHERTlgFAAAAY2hvcmRxEk51Lg==')
- print(pickle.loads(result))
- # 结果
- {
- 'task': 'test_celery.add_together', # 需要执行的任务
- 'id': '66d5a86b-1d39-4c88-bc98-2c18b2c98a21', # 任务的唯一 id
- 'args': (9, 42), # 任务的参数
- 'kwargs': {},
- 'retries': 0,
- 'eta': None,
- 'expires': None, # 任务失效时间
- 'utc': True,
- 'callbacks': None, # 完成后的回调
- 'errbacks': None, # 任务失败后的回调
- 'timelimit': (None, None), # 超时时间
- 'taskset': None,
- 'chord': None
- }
常见的数据序列化方式
binary: 二进制序列化方式; python 的 pickle 默认的序列化方法;
json:json 支持多种语言, 可用于跨语言方案, 但好像不支持自定义的类对象;
XML: 类似标签语言;
msgpack: 二进制的类 json 序列化方案, 但比 json 的数据结构更小, 更快;
yaml:yaml 表达能力更强, 支持的数据类型较 json 多, 但是 python 客户端的性能不如 json
经过比较, 为了保持跨语言的兼容性和速度, 采用 msgpack 或 json 方式;
celery 配置
celery 的性能和许多因素有关, 比如序列化的方式, 连接 rabbitmq 的方式, 多进程, 单线程等等;
基本配置项
CELERY_DEFAULT_QUEUE: 默认队列
BROKER_URL : 代理人的网址
CELERY_RESULT_BACKEND: 结果存储地址
CELERY_TASK_SERIALIZER: 任务序列化方式
CELERY_RESULT_SERIALIZER: 任务执行结果序列化方式
CELERY_TASK_RESULT_EXPIRES: 任务过期时间
CELERY_ACCEPT_CONTENT: 指定任务接受的内容序列化类型 (序列化), 一个列表;
采用配置文件的方式执行 celery
- # main.py
- from celery import Celery
- import celeryconfig
- app = Celery(__name__, include=["task"])
- # 引入配置文件
- app.config_from_object(celeryconfig)
- if __name__ == '__main__':
- result = add.delay(30, 42)
- # task.py
- from main import app
- @app.task
- def add(x, y):
- return x + y
- # celeryconfig.py
- BROKER_URL = 'amqp://username:password@localhost:5672/yourvhost'
- CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
- CELERY_TASK_SERIALIZER = 'msgpack'
- CELERY_RESULT_SERIALIZER = 'msgpack'
- CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
- CELERY_ACCEPT_CONTENT = ["msgpack"] # 指定任务接受的内容类型.
一些方法
- r.ready() # 查看任务状态, 返回布尔值, 任务执行完成, 返回 True, 否则返回 False.
- r.wait() # 等待任务完成, 返回任务执行结果, 很少使用;
- r.get(timeout=1) # 获取任务执行结果, 可以设置等待时间
- r.result # 任务执行结果.
- r.state # PENDING, START, SUCCESS, 任务当前的状态
- r.status # PENDING, START, SUCCESS, 任务当前的状态
- r.successful # 任务成功返回 true
- r.traceback # 如果任务抛出了一个异常, 你也可以获取原始的回溯信息
celery 的装饰方法 celery.task
- @celery.task()
- def name():
- pass
task() 方法将任务装饰成异步, 参数:
name: 可以显示指定任务的名字;
serializer: 指定序列化的方法;
bind: 一个 bool 值, 设置是否绑定一个 task 的实例, 如果把绑定, task 实例会作为参数传递到任务方法中, 可以访问 task 实例的所有的属性, 即前面反序列化中那些属性
- @task(bind=True) # 第一个参数是 self, 使用 self.request 访问相关的属性
- def add(self, x, y):
- logger.info(self.request.id)
base: 定义任务的基类, 可以以此来定义回调函数
- import celery
- class MyTask(celery.Task):
- # 任务失败时执行
- def on_failure(self, exc, task_id, args, kwargs, einfo):
- print('{0!r} failed: {1!r}'.format(task_id, exc))
- # 任务成功时执行
- def on_success(self, retval, task_id, args, kwargs):
- pass
- # 任务重试时执行
- def on_retry(self, exc, task_id, args, kwargs, einfo):
- pass
- @task(base=MyTask)
- def add(x, y):
- raise KeyError()
exc: 失败时的错误的类型;
task_id: 任务的 id;
args: 任务函数的参数;
kwargs: 参数;
einfo: 失败时的异常详细信息;
retval: 任务成功执行的返回值;
另外还可以指定 exchange 信息等, 不过一般不使用;
调用异步任务的方法
task.delay(): 这是 apply_async 方法的别名, 但接受的参数较为简单;
task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value})
send_task(): 可以发送未被注册的异步任务, 即没有被 celery.task 装饰的任务;
- # tasks.py
- from celery import Celery
- app = Celery()
- def add(x,y):
- return x+y
- app.send_task('tasks.add',args=[3,4]) # 参数基本和 apply_async 函数一样
- # 但是 send_task 在发送的时候是不会检查 tasks.add 函数是否存在的, 即使为空也会发送成功
apply_async 的参数:
countdown : 设置该任务等待一段时间再执行, 单位为 s;
eta : 定义任务的开始时间; eta=time.time()+10;
expires : 设置任务时间, 任务在过期时间后还没有执行则被丢弃;
retry : 如果任务失败后, 是否重试; 使用 true 或 false, 默认为 true
shadow: 重新指定任务的名字 str, 覆盖其在日志中使用的任务名称;
retry_policy : 重试策略.
max_retries : 最大重试次数, 默认为 3 次.
interval_start : 重试等待的时间间隔秒数, 默认为 0 , 表示直接重试不等待.
interval_step : 每次重试让重试间隔增加的秒数, 可以是数字或浮点数, 默认为 0.2
interval_max : 重试间隔最大的秒数, 即 通过 interval_step 增大到多少秒之后, 就不在增加了, 可以是数字或者浮点数, 默认为 0.2 .
- add.apply_async((2, 2), retry=True, retry_policy={
- 'max_retries': 3,
- 'interval_start': 0,
- 'interval_step': 0.2,
- 'interval_max': 0.2,
- })
routing_key: 自定义路由键;
queue: 指定发送到哪个队列;
exchange: 指定发送到哪个交换机;
priority: 任务队列的优先级, 0-9 之间;
serializer: 任务序列化方法; 通常不设置;
compression: 压缩方案, 通常有 zlib, bzip2
headers: 为任务添加额外的消息;
link: 任务成功执行后的回调方法; 是一个 signature 对象; 可以用作关联任务;
link_error: 任务失败后的回调方法, 是一个 signature 对象;
自定义发布者, 交换机, 路由键, 队列, 优先级, 序列方案和压缩方法:
- task.apply_async((2,2),
- compression='zlib',
- serialize='json',
- queue='priority.high',
- routing_key='web.add',
- priority=0,
- exchange='web_exchange')
一份比较常用的配置文件
- # 注意, celery4 版本后, CELERY_BROKER_URL 改为 BROKER_URL
- BROKER_URL = 'amqp://username:passwd@host:port / 虚拟主机名'
- # 指定结果的接受地址
- CELERY_RESULT_BACKEND = 'redis://username:passwd@host:port/db'
- # 指定任务序列化方式
- CELERY_TASK_SERIALIZER = 'msgpack'
- # 指定结果序列化方式
- CELERY_RESULT_SERIALIZER = 'msgpack'
- # 任务过期时间, celery 任务执行结果的超时时间
- CELERY_TASK_RESULT_EXPIRES = 60 * 20
- # 指定任务接受的序列化类型.
- CELERY_ACCEPT_CONTENT = ["msgpack"]
- # 任务发送完成是否需要确认, 这一项对性能有一点影响
- CELERY_ACKS_LATE = True
- # 压缩方案选择, 可以是 zlib, bzip2, 默认是发送没有压缩的数据
- CELERY_MESSAGE_COMPRESSION = 'zlib'
- # 规定完成任务的时间
- CELERYD_TASK_TIME_LIMIT = 5 # 在 5s 内完成任务, 否则执行该任务的 worker 将被杀死, 任务移交给父进程
- # celery worker 的并发数, 默认是服务器的内核数目, 也是命令行 - c 参数指定的数目
- CELERYD_CONCURRENCY = 4
- # celery worker 每次去 rabbitmq 预取任务的数量
- CELERYD_PREFETCH_MULTIPLIER = 4
- # 每个 worker 执行了多少任务就会死掉, 默认是无限的
- CELERYD_MAX_TASKS_PER_CHILD = 40
- # 设置默认的队列名称, 如果一个消息不符合其他的队列就会放在默认队列里面, 如果什么都不设置的话, 数据都会发送到默认的队列中
- CELERY_DEFAULT_QUEUE = "default"
- # 设置详细的队列
- CELERY_QUEUES = {
- "default": { # 这是上面指定的默认队列
- "exchange": "default",
- "exchange_type": "direct",
- "routing_key": "default"
- },
- "topicqueue": { # 这是一个 topic 队列 凡是 topictest 开头的 routing key 都会被放到这个队列
- "routing_key": "topic.#",
- "exchange": "topic_exchange",
- "exchange_type": "topic",
- },
- "task_eeg": { # 设置扇形交换机
- "exchange": "tasks",
- "exchange_type": "fanout",
- "binding_key": "tasks",
- },
- }
- 参考:
- http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-options
- http://docs.jinkan.org/docs/flask/patterns/celery.html
- http://www.pythondoc.com/flask-celery/first.html
- https://blog.csdn.net/kk123a/article/details/74549117
- https://blog.csdn.net/preyta/article/details/54288870
来源: https://www.cnblogs.com/cwp-bg/p/8759638.html