celery 是一个分布式的任务调度模块,那么 celery 是如何和分布式挂钩呢?
celery 可以支持多台不同的计算机执行不同的任务或者相同的任务。
如果要说 celery 的分布式应用的话,就要提到 celery 的消息路由机制,提到 AMQP 协议。
具体可以查看 AMQP 文档详细了解。
简单理解:
可以有多个 "消息队列"(message Queue),不同的消息可以指定发送给不同的 Message Queue,
而这是通过 Exchange 来实现的,发送消息到 "消息队列" 中时,可以指定 routiing_key,Exchange 通过 routing_key 来吧消息路由(routes)到不同的 "消息队列" 中去。
如图:
exchange 对应 一个消息队列 (queue),即:通过 "消息路由" 的机制使 exchange 对应 queue,每个 queue 对应每个 worker
写个例子:
vim demon3.py
- from celery import Celery
- app = Celery()
- app.config_from_object("celeryconfig")
- @app.task
- def taskA(x, y):
- return x * y
- @app.task
- def taskB(x, y, z):
- return x + y + z
- @app.task
- def add(x, y):
- return x + y
vim celeryconfig.py
- from kombu import Queue BORKER_URL = "redis://192.168.48.131:6379/1"#1库CELERY_RESULT_BACKEND = "redis://192.168.48.131:6379/2"#2库CELERY_QUEUES = { Queue("default", Exchange("default"), routing_key = "default"),
- Queue("for_task_A", Exchange("for_task_A"), routing_key = "for_task_A"),
- Queue("for_task_B", Exchange("for_task_B"), routing_key = "for_task_B")
- }#路由CELERY_ROUTES = { "demon3.taskA": {
- "queue": "for_task_A",
- "routing_key": "for_task_A"
- },
- "demon3.taskB": {
- "queue": "for_task_B",
- "routing_key": "for_task_B"
- }
- }
下面把两个脚本导入服务器:
指定 taskA 启动一个 worker:
- # celery - A demon3 worker - l info - n workerA. % h - Q for_task_A
同理:
- # celery - A demon3 worker - l info - n workerB. % h - Q for_task_B
下面远程客户端调用:新文件
vim remote.py
- from demon3 import *
- r1 = taskA.delay(10, 20)
- print (r1.result)
- print (r1.status)
- r2 = taskB.delay(10, 20, 30)
- time.sleep(1)
- prnit (r2.result)
- print (r2.status)
- #print (dir(r2))
- r3 = add.delay(100, 200)
- print (r3.result)
- print (r3.status) #PENDING
看到状态是 PENDING,表示没有执行,这个是因为没有 celeryconfig.py 文件中指定改 route 到哪一个 Queue 中,所以会被发动到默认的名字 celery 的 Queue 中,但是我们还没有启动 worker 执行 celery 中的任务。
下面,我们来启动一个 worker 来执行 celery 队列中的任务
- # celery - A tasks worker - l info - n worker. % h - Q celery ##默认的
可以看到这行的结果为 success
print(re3.status) #SUCCESS
定时任务:
Celery 与 定时任务
在 celery 中执行定时任务非常简单,只需要设置 celery 对象中的 CELERYBEAT_SCHEDULE 属性即可。
下面我们接着在配置文件:celeryconfig.py,添加关于 CELERYBEAT_SCHEDULE 变量到脚本中去:
- CELERY_TIMEZONE = 'UTC'
- CELERYBEAT_SCHEDULE = {
- 'taskA_schedule' : {
- 'task':'tasks.taskA',
- 'schedule':20,
- 'args':(5,6)
- },
- 'taskB_scheduler' : {
- 'task':"tasks.taskB",
- "schedule":200,
- "args":(10,20,30)
- },
- 'add_schedule': {
- "task":"tasks.add",
- "schedule":10,
- "args":(1,2)
- }
- }
注意格式,否则会有问题
启动:
celery -A demon3 worker -l info -n workerA.%h -Q for_task_A
celery -A demon3 worker -l info -n workerB.%h -Q for_task_B
celery -A tasks worker -l info -n worker.%h -Q celery
celery -A demon3 beat
原文:http://blog.51cto.com/286577399/2052690
来源: http://www.bubuko.com/infodetail-2435258.html