这里不解释 celery, 如果不清楚可以参考下面链接:
http://docs.celeryproject.org/en/latest/getting-started/introduction.html
这里来演示一下在 Django 项目中如何使用 celery:
1. 首先我们需要使用到两个库, 用 pip 安装:
- pip install celery
- pip install django-celery
2. 在 celery 建议使用 rabbitmq 作为消息代理, 当然也支持 Redis 作代理, abbitmq 提供的队列和消息持久化机制确实更加稳定, 所以对于追求稳定性的任务更适合适配 rabbitmq 作为中间件, 这里用 rabbitmq 作为消息代理, 用 Redis 作为存储后端
我的环境是 deepin, 安装 rabbitmq 和 Redis
- sudo apt-get install rabbitmq-server
- sudo apt-gei install Redis
3. 在 django 中使用 celery 的方式和普通 py 文件中的方式略有不同, 下面是一个向通过秒滴平台发送短信验证码的 demo:
普通 py 文件用法:
- # tasks.py
- import os
- from celery import Celery
- App = Celery('tasks', backend='amqp://guest@localhost//', broker='redis://localhost:6379/1')
- @App.task(name="send_verification_code")
- def _send_verification_code(phone_number, verification_code):
- """
- :param phone_number: 目标手机号
- :param verification_code: 验证码
- :return:
- True: 发送成功
- False: 发送失败
- """ API = getConfig('MiaoDi','api')
- accountSid = getConfig('MiaoDi', 'accountSid')
- templateid = getConfig('MiaoDi', 'templateid')
- timeout_s = getConfig('MiaoDi', 'timeout')
- param = '{},{}'.format(verification_code, timeout_s)
- timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
- sign = hash_sign(timestamp)
- data = {
- 'accountSid': accountSid, 'templateid': templateid, 'param': param,
- 'to': phone_number, 'timestamp': timestamp, 'sig': sign
- }
- response = requests.post(url=API, data=data)
- ret_json = response.text
- ret_dict = eval(ret_json)
- if ret_dict.get('respCode') != '00000':
- return False
- else:
- return True
- # view.py
- from tasks import _send_verification_code
- def send_verification_code(phone_number, verification_code):
- task = _send_verification_code.delay(phone_number, verification_code)
- if __name__ == '__main__':
- phone_number = input('请输入手机号:')
- verification_code = input('请输入验证码:')
- send_verification_code(phone_number, verification_code)
来源: http://www.bubuko.com/infodetail-3005647.html