The above example is overly simple, the background job is started and then the application forgets about it. Most Celery tutorials for web development end right there,
事实上对于很多应用来说,有必要监控它的后台任务并且从中获取结果。
接下来使用第二个例子,一个虚构的耗时任务来扩展上面的应用,用户可以通过点击一个按钮启动一个或更多这些长时间运行任务。运行在你的浏览器上的网页通过ajax轮训你的服务获取这些任务的状态更新。对于每个任务,网页会展示图形状态栏,一个完成百分比,一个状态消息,当任务完成时,结果值会被展示。
有状态更新的后台任务
Background Tasks with Status Updates
Let me start by showing you the background task that I’m using for this second example:
- @celery.task(bind=True)
- def long_task(self):
- """Background task that runs a long function with progress reports."""
- verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
- adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
- noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
- message = ''
- total = random.randint(10, 50)
- for i in range(total):
- if not message or random.random() < 0.25:
- message = '{0} {1} {2}...'.format(random.choice(verb),
- random.choice(adjective),
- random.choice(noun))
- self.update_state(state='PROGRESS',
- meta={'current': i, 'total': total,
- 'status': message})
- time.sleep(1)
- return {'current': 100, 'total': 100, 'status': 'Task completed!',
- 'result': 42}
这个路由生成一个JSON响应,它包含任务状态和我在update_state()调用中作为元参数设置的所有值,客户端可以用它构建一个进程条。不幸的是这个函数需要检查一些edge条件,所以最后有点长。为了检查任务数据我再创了一个任务对象,一个AsyncResult类的实例,使用URL中给定的任务id。
首先如果if块是当任务还没有开始(悬起PENDING状态)。这种情况下没有状态信息,所以我制造一些数据。跟着elif块从后台任务中返回状态信息。这里由任务提供的信息作为task.info是可访问的。如果数据包含了一个结果键,就意味着这是最终结果并且任务结束,所以我把结果也添加到response。else块覆盖了一个错误的可能性,Celery会通过设置任务状态为“FAILURE”来报告,这种情况下task.info会包含被引发的异常,为了处理错误我设置了异常文本作为一个状态消息。
以上都是发生在服务器端,剩下的需要客户端实现,在本例中就是一个包含JavaScript脚本的网页
Client-Side Javascript
解析本例的JavaScript部分不是本文真正的关注点,但如果你感兴趣的话,下面有一些信息。
图形进度条部分我使用了nanobar.js,通过CDN包含进来。同时也包含了,简化了ajax调用:
- <script src="//cdnjs.cloudflare.com/ajax/libs/nanobar/0.2.1/nanobar.min.js"></script>
- <script src="//cdnjs.cloudflare.com/ajax/libs/jquery/2.1.3/jquery.min.js"></script>
启动后台工作的按钮被关联到下面的Javascript上:
- function start_long_task() {
- // add task status elements
- div = $('<div class="progress"><div></div><div>0%</div><div>...</div><div> </div></div><hr>');
- $('#progress').append(div);
- // create a progress bar
- var nanobar = new Nanobar({
- bg: '#44f',
- target: div[0].childNodes[0]
- });
- // send ajax POST request to start background job
- $.ajax({
- type: 'POST',
- url: '/longtask',
- success: function(data, status, request) {
- status_url = request.getResponseHeader('Location');
- update_progress(status_url, nanobar, div[0]);
- },
- error: function() {
- alert('Unexpected error');
- }
- });
- }
这个函数起始于添加一些用来展示新的后台任务进度条和状态的html元素,这是动态的因为用户可以添加任何数量任务,每个任务需要获取它自己的HTML元素集合。
To help you understand this better, here is the structure of the added elements for a task, with comments to indicate what each div is used for:
- <div class="progress">
- <div></div> <-- Progress bar
- <div>0%</div> <-- Percentage
- <div>...</div> <-- Status message
- <div> </div> <-- Result
- </div>
- <hr>
然后 start_long_task()函数按照nanobar的文档实例化这个进度条 ,最后发送ajax POST请求到 /longtask,在服务端初始化Celery 后台任务。
当 POST ajax 调用返回, 回调函数包含了the callback function obtains the value of the Location header,这就像你在前面部分看到的一样是为了客户端状态更新。然后使用这个状态URL,进度条对象,为任务创建的根div的子树,调用另一个函数update_progress() 。下面你可以看到这个update_progress() 函数,它发送状态请求然后用它返回的信息更新UI元素:
- function update_progress(status_url, nanobar, status_div) {
- // send GET request to status URL
- $.getJSON(status_url,
- function(data) {
- // update UI
- percent = parseInt(data['current'] * 100 / data['total']);
- nanobar.go(percent);
- $(status_div.childNodes[1]).text(percent + '%');
- $(status_div.childNodes[2]).text(data['status']);
- if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') {
- if ('result' in data) {
- // show result
- $(status_div.childNodes[3]).text('Result: ' + data['result']);
- } else {
- // something unexpected happened
- $(status_div.childNodes[3]).text('Result: ' + data['state']);
- }
- } else {
- // rerun in 2 seconds
- setTimeout(function() {
- update_progress(status_url, nanobar, status_div);
- },
- 2000);
- }
- });
- }
这个函数发送GET请求到状态URL,当一个响应被接收后它为任务更新不同的HTML元素。如果后台任务结束并且结果可用那么它就被添加到页面上。如果没有结果就意味着任务由于错误而结束,所以任务的状态,将会是FAILURE,就像结果所展示的。
当服务端正在执行任务我需要继续轮训任务状态并且更新UI。为实现这个我设置了一个定时器在两秒内来再次调用这个函数。这持续到Celery任务结束。
一个worker运行尽可能多的并发任务,按照默认CPU数。所以当你弄这个例子时确保开启大量任务来查看Celery如何保持工作为PENDING状态知道worker能够处理它。A Celery worker runs as many concurrent jobs as there are CPUs by default
If you made it all the way here without running the example application, then it is now time for you to try all this Celery goodness. Go ahead and clone the Github repository, create a virtual environment, and populate it:
- $ git clone https://github.com/miguelgrinberg/flask-celery-example.git
- $ cd flask-celery-example
- $ virtualenv venv
- $ source venv/bin/activate
- (venv) $ pip install -r requirements.txt
Note that the requirements.txt file included with this repository contains Flask, Flask-Mail, Celery and the Redis client, along with all their dependencies.
Now you need to run the three processes required by this application, so the easiest way is to open three terminal windows. On the first terminal run Redis. You can just install Redis according to the download instructions for your operating system, but if you are on a Linux or OS X machine, I have included a small script that downloads, compiles and runs Redis as a private server:
- $. / run - redis.sh
Note that for the above script to work you need to have gcc installed. Also note that the above command is blocking, Redis will start in the foreground.
On the second terminal run a Celery worker. This is done with the celery command, which is installed in your virtual environment. Since this is the process that will be sending out emails, the MAIL_USERNAME and MAIL_PASSWORD environment variables must be set to a valid Gmail account before starting the worker:
- $ export MAIL_USERNAME=<your-gmail-username>
- $ export MAIL_PASSWORD=<your-gmail-password>
- $ source venv/bin/activate
- (venv) $ celery worker -A app.celery --loglevel=info
The -A option gives Celery the application module and the Celery instance, and –loglevel=info makes the logging more verbose, which can sometimes be useful in diagnosing problems.
Finally, on the third terminal window run the Flask application, also from the virtual environment:
- $ source venv/bin/activate
- (venv) $ python app.py
Now you can navigate to http://localhost:5000/ in your web browser and try the examples!
来源: http://blog.csdn.net/zhangfh1990/article/details/77942296