在一个应用服务中, 对于时效性要求没那么高的业务场景, 我们没必要等到所有任务执行完才返回结果, 例如用户注册场景中, 保存了用户账号密码之后, 就可以立即返回, 后续的账号激活邮件, 可以用一种异步的形式去处理, 这种异步操作可以用队列服务来实现. 否则, 如果等到邮件发送成功可能几秒过去了.
Celery 是什么?
Celery 是 Python 语言实现的分布式队列服务, 除了支持即时任务, 还支持定时任务, Celery 有 5 个核心角色.
Task
任务 (Task) 就是你要做的事情, 例如一个注册流程里面有很多任务, 给用户发验证邮件就是一个任务, 这种耗时的任务就可以交给 Celery 去处理, 还有一种任务是定时任务, 比如每天定时统计网站的注册人数, 这个也可以交给 Celery 周期性的处理.
Broker
Broker 的中文意思是经纪人, 指为市场上买卖双方提供中介服务的人. 在 Celery 中这个角色相当于数据结构中的队列, 介于生产者和消费者之间经纪人. 例如一个 web 系统中, 生产者是主程序, 它生产任务, 将任务发送给 Broker, 消费者是 Worker, 是专门用于执行任务的后台服务. Celery 本身不提供队列服务, 一般用 Redis 或者 RabbitMQ 来实现队列服务.
Worker
Worker 就是那个一直在后台执行任务的人, 也成为任务的消费者, 它会实时地监控队列中有没有任务, 如果有就立即取出来执行.
Beat
Beat 是一个定时任务调度器, 它会根据配置定时将任务发送给 Broker, 等待 Worker 来消费.
Backend
Backend 用于保存任务的执行结果, 每个任务都有返回值, 比如发送邮件的服务会告诉我们有没有发送成功, 这个结果就是存在 Backend 中, 当然我们并不总是要关心任务的执行结果.
记住这 5 个角色后面理解 Celery 就轻松了.
快速入门
接触任何新东西, 没有什么比实际动手学得更快了. 假设我们选择 Redis 作为 broker, 你需要安装 Redis 并且已经启动了 Redis 服务(这个步骤请自行借用搜索引擎解决)
pip install -U "celery[Redis]"
1, 创建 Celery 实例
- # tasks.py
- from celery import Celery
- App = Celery('tasks', broker='Redis://localhost:6379/0')
2, 创建任务
假设这个发送邮件的任务需要 5 秒钟才能执行完
- # tasks.py
- @App.task
- def send_mail(email):
- print("send mail to", email)
- import time
- time.sleep(5)
- return "success"
在没有 Celery 的情况下, 程序顺序执行, 每个步骤都需要等上一步执行完成.
1. 插入记录到数据库
2. 发邮件
3. 注册成功
我们可以把 2 放在一个任务中交给 celery 去执行, 这样我们就不需要等待发邮件完成, 你只需要安排 celery 去处理帮我去完成就好了. 代码就变成了
1. 插入记录到数据库
2. celery 帮我去发邮件
3. 注册成功
第二步是非常快的, 它只需要把任务放进队列里面去, 并不会等任务真正执行完. 这跟生活是完全贴切的, 例如我们很多事情都不是自己亲历其为去做, 而是将一个不太重要或即时性没那么高的事情转交给别人处理.
3, 启动 Worker
启动 Worker, 监听 Broker 中是否有任务, 命令: celery worker, 你可能需要指定参数
celery -A tasks worker --loglevel=info
-A: 指定 celery 实例所在哪个模块中, 例子中, celery 实例在 tasks.py 文件中, 启动成功后, 能看到信息
函数用 App.task 装饰器修饰之后, 就会成为 Celery 中的一个 Task.
4, 调用任务
在主程序中调用任务, 掉任务发送给 Broker, 而不是真正执行该任务
- # user.py
- from tasks import send_mail
- def register():
- import time
- start = time.time()
- print("1. 插入记录到数据库")
- print("2. celery 帮我发邮件")
- send_mail.delay("xx@gmail.com")
- print("3. 告诉用户注册成功")
- print("耗时:%s 秒" % (time.time() - start))
- if __name__ == '__main__':
- register()
在主程序中, 调用函数的. delay 方法
目录结构:
── celery_test
├── tasks.py
└── user.py
运行 python http://user.py , 启动应用程序
1. 插入记录到数据库
2. celery 帮我发邮件
3. 告诉用户注册成功
耗时: 0.22688984870910645 秒
程序花了不到 0.23 秒就执行完成, 如果按照正常的同步逻辑去执行, 至少需要 5 秒钟, 因为发邮件的任务就花了 5 秒.
在 worker 服务窗口看日志信息
注意:
1,celery worker 启动时, 如果是 root 用户, 需要设置环境变量:
$ export C_FORCE_ROOT='true'
2, Celery4.x 开始不再支持 Windows 平台, 如果需要在 Windows 开发, 请使用 3.x 的版本.
3, 使用 RabbitMQ 或 Redis 作为 Broker, 生产环境永远不要使用关系数据库
4, 不要使用复杂对象作为任务函数的参数
- # Good
- @App.task
- def my_task(user_id):
- user = User.objects.get(id=user_id)
- print(user.name)
- # ...
- # Bad
- @App.task
- def my_task(user):
- print(user.name)
- # ...
小结
学习 Celery, 首先需要知道它的应用场景, 然后是 Celery 中的常见角色, 最后按照步骤感受一下 Celery 是如何跑起来的.
参考链接:
- http://funhacks.NET/2016/12/13/celery/
- http://celerytaskschecklist.com/
来源: https://juejin.im/post/5bbe18255188255c81391385