流畅的 Python笔记.
本篇主要讨论 asyncio 包, 这个包使用事件循环驱动的协程实现并发.
1. 前言
本篇主要介绍如果使用 asyncio 包将上一篇中线程版的 "国旗下载" 程序改为协程版本, 通过异步非阻塞来实现并发.
说实话, 我在读这部分内容的时候是懵逼的, 书中阻塞非阻塞, 同步异步的概念和我之前的理解有很大差异. 之前一直以为同步就意味着阻塞, 异步就意味着非阻塞. 但其实, 阻塞非阻塞与同步异步并没有本质的联系.
同步 (Synchronizing) 异步 (Asynchronizing) 是对指令而言的, 也就是程序(理解成 "函数" 会更好一些). 以含有 I/O 操作的函数为例(被调用方), 如果这个函数要等到 I/O 操作结束, 获取了数据, 才返回到调用方, 这就叫同步(绝大部分函数都同步); 反之, 不等 I/O 执行完毕就返回到调用方, 获取的数据以其他方式转给调用方, 这就叫异步.
阻塞 (Blocking) 非阻塞 (Non-Blocking) 是对进程线程而言(为了简洁, 只以 "线程" 为例). 因为某些原因(比如 I/O), 线程被挂起(被移出 CPU), 这就叫阻塞; 反之, 即使因为这些原因, 线程依然不被挂起(不被移出 CPU), 这就叫非阻塞.
可见, 这两组概念一共可以组成四种不同情况: 同步阻塞(常见), 同步非阻塞(不常见), 异步阻塞(不常见), 异步非阻塞(常见).
仍以上述 I/O 函数为例:
如果这个函数的 I/O 请求已发出, 只是单纯地在等服务器发回数据, 线程也只是单纯地在等这个函数返回结果, CPU 将会把这个线程挂起, 这就叫做同步阻塞;
如果这个函数中调用的是一个执行复杂计算的子函数, 此时, 函数依然在等结果没有返回, 但线程并不是没有运行, 不会被 CPU 挂起, 这就叫做同步非阻塞("CPU 以轮询的方式查看 I/O 是否结束" 更能说明这种情况, 但这已是很古老的方式了);
如果这个函数在 I/O 请求没得到结果之前就返回了, 但线程依然在等这个结果(在函数体之外等待使用这个数据), 这就叫异步阻塞;
如果这个函数在没得到结果之前返回了, 线程继续执行其他函数, 这就叫做异步非阻塞. 更具体一点, 这种情况对应的是使用回调实现异步非阻塞的情况; 而 Python 中还有一种情况, 也是本篇要讲的, 就是使用协程实现异步非阻塞: 协程在得到结果前依然不返回, 但线程并没有等待, 而是去执行其他协程. 协程看起来就像同步一样.
由于之前并没有遇到代码世界中的同步非阻塞和异步阻塞这两种情况, 所以我也不确定上述这两种情况的例子是否准确, 欢迎大佬留言指导. 但这四种情况在现实生活中就很常见了, 下面举个在某处 https://www.zhihu.com/question/19732473/answer/23434554 看到的例子:
老张把一普通水壶接上水放火上, 眼睛直勾勾盯着等水开, 不干其他事, 这叫同步阻塞;
老张依然用一普通水壶烧水, 但把水壶放火上后去客厅看电视, 时不时回来看水烧好了没有, 这叫同步非阻塞;
老张用一能响的水壶烧水, 没盯着看, 但也没干其他事, 只是在那儿发愣. 水烧好后, 壶可劲儿的响, 老张一惊, 取走水壶, 这叫异步阻塞;
老张用一能响的水壶烧水, 把壶放火上后去客厅看电视, 等壶响了再去拿壶, 这叫异步非阻塞;
从这四个例子可以看出, 阻不阻塞是对老张而言的, 在计算机中对应的就是进程线程; 同步异步是对水壶而言的, 在计算机中对应的就是函数.
有了上述概念后, 我们接下来将使用 asyncio 包, 将之前下载国旗的程序改为协程版本.
2. 异步
之前 http://www.vpointer.net/articles/Python学习之路36-使用future处理并发/ 我们使用线程实现了并发下载数据, 它是同步阻塞的, 因为一到 I/O 操作, 线程就被阻塞, 然后调入新的线程. 现在, 我们将实现一个异步非阻塞版本. 但从上述介绍知道, 异步有两种方式: 回调和协程. 本文并不会实现回调版本的 "下载国旗", 提出回调只是为了和协程进行比较.
2.1 回调
举个例子说明回调. 在调用函数 A 时除了传入必要的参数外, 还传入一个参数: 函数 B.A 中有一些费时的操作, 比如 I/O,A 在没得到结果之前就返回, 而将等待结果以及进行后续处理的事情交给函数 B. 这个过程就是回调, 函数 B 就称为回调函数.
这种编程方式不太符合人的思维习惯, 代码也不易于理解, 情况一复杂, 就很可能遇到 **"回调地狱"**: 多层嵌套回调. 下面是一个 JavaScript 中使用回调的例子, 它嵌套了 3 层:
- // 代码 2.1
- api_call1(request1, function (response1){ // 多么痛的领悟
- var request2 = step1(response1); // 第一步
- api_call2(request2, function (response2){
- var request3 = step2(response2); // 第二步
- api_call3(request3, function (response3){
- step(response3); // 第三步
- })
- })
- })
复制代码
api_call1,api_call2 和 api_call3 都是库函数, 用于异步获取结果. JavaScript 中常用匿名函数作为回调函数. 下面我们使用 Python 来实现上述代码, 上述三个匿名函数分别命名为 stage1,stage2 和 stage3:
- # 代码 2.2
- def stage1(response1):
- request2 = step1(response1)
- api_call2(request2, stage2)
- def stage2(response2):
- request3 = step2(response2)
- api_call3(request3, stage3)
- def stage3(response3):
- step3(response3)
- api_call1(request1, stage1) # 代码从这里开始执行
复制代码
可见, 即使用 Python 写, 也不容易理解, 这要是再多嵌套几层, 不逼疯已经不错了. 而且, 如果要在 stage2 中使用 request2, 还得使用闭包, 这就又变成了嵌套定义函数的情况. 并且上述代码还没有考虑抛出异常的情况: 在基于回调的 API 中, 这个问题的解决办法是为每个异步调用注册两个回调, 一个用于处理操作成功时返回的结果, 一个用于处理错误. 可以看出, 一旦涉及错误处理, 回调将更可怕.
2.2 协程
现在我们用协程来改写上述代码:
- # 代码 2.3
- import asyncio
- @asyncio.coroutine
- def three_stages(request1):
- response1 = yield from api_call1(request1)
- request2 = step1(response1)
- response2 = yield from api_call2(request2)
- request3 = step2(response2)
- response3 = yield from api_call3(request3)
- step3(response3)
- loop = asyncio.get_event_loop()
- loop.create_task(three_stages(request1))
复制代码
与前面两个版本的回调相比, 这个版本的代码将 3 个步骤依次写在同一函数中, 易于理解, 这样看起来是不是也更像同步函数? 如果要处理异常, 只需要相应的 yield from 语句处添加 try/except 即可.
但也别急着把这称为 "协程天堂", 因为:
不能使用常规函数, 必须使用协程, 而且要习惯 yield from 语句;
不能直接调用协程. 即, 不能像直接调用
api_call1(request1)
那样直接调用
three_stages(request1)
, 必须使用事件循环 (上面的 loop) 来驱动协程.
但不管怎样, 代码读起来和写起来比回调简单多了, 尤其是嵌套回调.
小技巧: 读协程的代码时, 为了便于理解代码的意思, 可以直接将 yield from 关键字忽略掉.
2.3 下载国旗批量版
下面我们开始实现协程版本的 "下载国旗".
为了将其改为协程版本, 我们不能使用之前的 requests 包, 因为它会阻塞线程, 改为使用 aiohttp 包. 为了尽量保持代码的简洁, 这里不处理异常. 下方是完整的代码, 代码中我们使用了新语法. 以下代码的基本思路是: 在一个单线程程序中使用主循环一次激活队列中的协程, 各个协程向前执行几步, 然后把控制权让给主循环, 主循环再激活队列中的下一个协程.
- # 代码 2.4
- import aiohttp, os, sys, time, asyncio # 代码中请勿这么写, 这里只是为了减少行数
- POP20_CC = ("CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR").split()
- BASE_URL = "http://flupy.org/data/flags"
- DEST_DIR = "downloads/"
- def save_flag(img, filename):
- path = os.path.join(DEST_DIR, filename)
- with open(path, "wb") as fp:
- fp.write(img)
- def show(text):
- print(text, end=" ")
- sys.stdout.flush()
- async def get_flag(cc): # aiohttp 只支持 TCP 和 UDP 请求
- url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
- async with aiohttp.ClientSession() as session: # <1> 开启一个会话
- async with session.get(url) as resp: # 发送请求
- image = await resp.read() # 读取请求
- return image
- async def download_one(cc):
- image = await get_flag(cc)
- show(cc)
- save_flag(image, cc.lower() + ".gif")
- return cc
- def download_many(cc_list):
- loop = asyncio.get_event_loop() # 获取事件循环
- to_do = [download_one(cc) for cc in sorted(cc_list)] # 生成协程列表
- wait_coro = asyncio.wait(to_do) # 将协程包装成 Task 类, wait_coro 并不是运行结果! 而是协程!
- res, _ = loop.run_until_complete(wait_coro) # 驱动每个协程运行
- loop.close() # 循环结束
- return len(res)
- def main(download_many):
- t0 = time.time()
- count = download_many(POP20_CC)
- elapsed = time.time() - t0
- msg = "\n{} flags downloaded in {:.2f}s"
- print(msg.format(count, elapsed))
- if __name__ == "__main__":
- main(download_many)
- # 结果:
- VN TR FR DE IN ID RU NG CN EG BR MX PH CD IR PK ET JP BD US
- 20 flags downloaded in 1.27s
复制代码
解释:
这里使用了新的语法 async/await. 再 Python3.5 之前, 如果想定义一个协程只能延用函数的定义方式 def, 然后在定义体里面使用 yield 或 yield from. 如果想把一个函数更明确地声明为协程(或者说异步函数), 还可以使用 asyncio 中的 coroutine 装饰器, 但这么做是不是挺麻烦的? 从 Python3.5 起, 可以明确 ** 使用 async 来定义协程(异步函数)** 和异步生成器. 使用 async 则可以省略掉 @asyncio.coroutine 装饰器; 在用 async 修饰的协程的定义体中可以使用 yield 关键字, 但不能使用 yield from, 它必须被替换为 await, 即使 yield from 后面只是一个普通的生成器; 从由 async 修饰的协程或生成器中获取数据时, 必须使用 await.
如果要使用 @asyncio.coroutine 装饰器明确声明协程, 那么在协程定义体内部只能使用 yield from, 不能使用 yield, 因为使用到 yield 的地方已经在 asyncio 中全部封装成了函数或者方法. 最新版的 @asyncio.coroutine 也可以装饰 async 修饰的协程, 这种情况下 coroutine 不做任何事, 只是原封不动的返回被装饰的协程.
- <1 > 处的代码之所以改用 async with(异步上下文管理器), 是因为新版 asyncio 并不支持书中的旧语法
- yield from aiohttp.request("GET", url)
. 关于 async/await,
async with/async for
的相关内容将在后续文章中介绍, 这里只需要知道 async 对应于 @asyncio.coroutine,await 对应于 yield from 即可.
我们将 get_flag 改成了协程版本, 并使用 aiohttp 来实现异步请求; download_one 函数也随之变成了协程版本.
download_many 只是一个普通函数, 它要驱动协程运行. 在这个函数中, 我们通过
asyncio.get_event_loop()
创建事件循环 (实质就是一个线程) 来驱动协程的运行. 接着生成含 20 个 download_one 协程的协程列表 to_do, 随后再调用
asyncio.wait(to_do)
将这个协程列表包装成一个 wait 协程, 取名为 wait_coro.wait 协程会将 to_do 中所有的协程包装成 Task 对象(Future 的子类), 再形成列表. 最后, 我们通过
loop.run_until_complete(wait_coro)
驱动协程 wait_coro 运行. 整个的驱动链是这样的:
loop.run_until_complete
驱动协程 wait_coro,wait_coro 再在内部驱动 20 个协程.
wait 协程最后会返回一个元组, 第一个元素是完成的协程数, 第二个是未完成的协程数,
loop.run_until_complete
返回传入的协程的返回值(实际代码是 Future.result()). 有点绕, 其实就是 wait_coro 最后返回一个元组给 run_until_complete,run_until_complete 再把这个值返回给调用方.
在上一篇 http://www.vpointer.net/articles/Python学习之路36-使用future处理并发/ 中, 我们知道 concurrent.futures 中有一个 Future, 且通过它的 result 方法获取最后运行的结果; 在 asyncio 包中, 不光有 Future, 还有它的子类 Task, 但获取结果通常并不是调用 result 方法, 而是通过 yield from 或 await, 即 yield from future 获取结果. asyncio.Future 类的 result 方法没有参数, 不能设置超时时间; 如果调用 result 时 future 还未运行完毕, 它并不会阻塞去等待结果, 而是抛出
asyncio.InvalidStateError
异常.
2.4 下载国旗改进版
上一篇 http://www.vpointer.net/articles/Python学习之路36-使用future处理并发/ 中, 我们除了使用 Executor.map()批量处理线程之外, 我们还使用了
concurrent.futures.as_completed()
挨个迭代运行完的线程返回的结果. asyncio 也实现了这个方法, 我们将使用这个函数改写上方的代码.
还有一个问题: 我们往往只关注了网络 I/O 请求, 常常忽略本地的 I/O 操作. 线程版本中的 save_flag 函数也是会阻塞线程的, 因为它操作了磁盘. 但由于图片太小, 速度太快, 我们感觉并不明显, 如果换成更高像素的图片, 这种速度差异就会很明显. 我们将会以某种方式使其避免阻塞线程. 下面是改写的代码:
- # 代码 2.5
- import asyncio, os, sys, time, aiohttp
- async def download_one(cc, semaphore):
- async with semaphore:
- image = await get_flag(cc)
- loop = asyncio.get_event_loop()
- loop.run_in_executor(None, save_flag, image, cc + ".gif")
- return cc
- async def download_coro(cc_list, concur_req):
- semaphore = asyncio.Semaphore(concur_req) # 它是一个信号量, 用于控制并发量
- to_do = [download_one(cc, semaphore) for cc in sorted(cc_list)]
- to_do_iter = asyncio.as_completed(to_do)
- for future in to_do_iter:
- res = await future
- print("Downloaded", res)
- def download_many(cc_list, concur_req): # 变化不大
- loop = asyncio.get_event_loop()
- coro = download_coro(cc_list, concur_req)
- loop.run_until_complete(coro)
- loop.close()
- if __name__ == "__main__":
- t0 = time.time()
- download_many(POP20_CC, 1000) # 第二个参数表示最大并发数
- print("\nDone! Time elapsed {:.2f}s.".format(time.time() - t0))
- # 结果:
- Downloaded BD
- Downloaded CN
- -- snip --
- Downloaded US
- Done! Time elapsed 1.21s.
复制代码
上述代码有 3 个地方值得关注:
asyncio.as_completed()
以元素为协程的可迭代对象为参数, 但自身并不是协程, 只是一个生成器. 它在内部将传入的协程包装成 Task, 然后返回一个生成器, 产出协程的返回值. 这个生成器按协程完成的顺序生成值(先完成先产出), 而不是按协程在迭代器中的顺序生成值.
asyncio.Semaphore 是个信号量类, 内部维护这一个计数器, 调用它的 acquire 方法(这个方法是个协程), 计数器减一; 对其调用 release 方法(这个方法不是协程), 计数器加一; 当计数器为 0 时, 会阻塞调用这个方法的协程.
我们将 save_flag 函数放到了其他线程中,
loop.run_in_executor()
的第一个参数是 Executor 实例, 如果为 None, 则使用事件循环的默认 ThreadPoolExecutor 实例. 余下的参数是可调用对象, 以及可调用对象的位置参数.
3. 总结
本章开篇介绍了阻塞非阻塞, 同步异步的概念, 然后介绍了异步的两种实现方式: 回调和协程. 并通过代码比较了回调和协程的实现方式. 然后我们使用 asyncio 和 aiohttp 两个库, 将之前线程版本的下载国旗程序改为了协程版本. 可惜我也是刚接触协程不久, 写的内容不一定准确, 尤其是关于 asyncio 的内容, 这个库之前是一点都没接触过. 后面我会专门研究 Python 中的协程, 以及 asyncio 的实现, 争取把这部分内容彻底搞懂.
来源: https://juejin.im/post/5b277decf265da59ab29817e