作者: Loris Cro
翻译: Kevin (公众号: 中间件小哥)
近年来, 许多编程语言都在努力改进它们的并发原语. Go 语言有 goroutines,Ruby 有 fibers, 当然, 还有 Node.JS 帮助普及的 async/await, 这是当今使用最为广泛的并发操作类型. 在本文中, 我将以 python 为例讨论 async/await 的基础知识. 我选择 python 语言, 是因为这个功能在 python 3 中比较新, 很多用户可能对它还不是很熟悉. 使用 async/await 的主要原因是通过减少 I/O 执行时的空闲时间来提高程序的吞吐量. 使用这个操作符的程序通过隐式地使用一个称为事件循环的抽象来同时处理多个执行路径. 在某些方面, 这些事件循环类似于多线程编程, 但是事件循环通常存在于单个线程中, 因此, 它不能同时执行多个计算. 正因为如此, 单独的事件循环不能提高计算密集型应用程序的性能. 但是, 对于进行大量网络通信的程序, 比如连接到 Redis 数据库的应用程序, 它可以极大地提高性能. 每次程序向 Redis 发送一个命令时, 它都会等待 Redis 的响应, 如果 Redis 部署在另一台机器上, 就会出现网络延迟. 而一个不使用事件循环的单线程应用程序在等待响应时处于空闲状态, 会占用大量的 CPU 周期. 需要注意的是, 网络延迟是以毫秒为单位的, 而 CPU 指令需要纳秒来执行, 这两者相差六个数量级. 这里举个例子, 下面的代码样例是用来跟踪一个游戏的获胜排行榜. 每个流条目都包含获胜者的名字, 我们的程序会更新一个 Redis 的有序集合(Sorted Set), 这个有序集合用来作为排行榜. 这里我们主要关注的是阻塞代码和非阻塞代码的性能.
- import Redis
- # The operation to perform for each event
- def add_new_win(conn, winner):
- conn.zincrby('wins_counter', 1, winner)
- conn.incr('total_games_played')
- def main():
- # Connect to Redis
- conn = Redis.Redis()
- # Tail the event stream
- last_id = '$'
- while True:
- events = conn.xread({'wins_stream': last_id}, block=0, count=10)
- # Process each event by calling `add_new_win`
- for _, e in events:
- winner = e['winner']
- add_new_win(conn, winner)
- last_id = e['id']
- if __name__ == '__main__':
- main()
我们使用 aio-libs/aioredis 实现与上面代码有相同效果的异步版本. aio-libs 社区正在重写许多 Python 网络库, 以包括对 asyncio 的支持, asyncio 是 Python 事件循环的标准库实现. 下面是上面代码的非阻塞版本:
- import asyncio
- import aioredis
- async def add_new_win(pool, winner):
- await pool.zincrby('wins_counter', 1, winner)
- await pool.incr('total_games_played')
- async def main():
- # Connect to Redis
- pool = await aioredis.create_redis_pool('redis://localhost', encoding='utf8')
- # Tail the event stream
- last_id = '$'
- while True:
- events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10)
- # Process each event by calling `add_new_win`
- for _, e_id, e in events:
- winner = e['winner']
- await add_new_win(pool, winner)
- last_id = e_id
- if __name__ == '__main__':
- loop = asyncio.get_event_loop()
- loop.run_until_complete(main())
这段代码与上面那段代码相比, 除了多了一些 await 关键字之外, 其他的几乎是相同的. 最大的不同之处在最后两行. 在 Node.JS 中, 环境会默认加载事件循环, 而在 Python 中, 必须显示地开启.
重写之后, 我们可能会认为这么做就可以提高性能了. 不幸的是, 我们代码的非阻塞版本还没有提高性能. 这里的问题在于我们编写代码的细节, 而不仅仅是使用 async / await 的一般思想.
Await 使用的限制
我们重写代码后的主要问题是我们过度使用了 await. 当我们在异步调用前面加上 await 时, 我们做了以下两件事:
1. 为执行做相应的调度
2. 等待完成
有时候, 这样做是对的. 例如, 在完成对第 15 行流的读取之前, 我们不能对每个事件进行迭代. 在这种情况下, await 关键字是有意义的, 但是看看 add_new_win 方法:
- async def add_new_win(pool, winner):
- await pool.zincrby('wins_counter', 1, winner)
- await pool.incr('total_games_played')
在这个函数中, 第二个操作并不依赖于第一个操作. 我们可以将第二个命令与第一个命令一起发送, 但是当我们发送第一个命令时, await 将阻塞执行流. 我们其实更想要一种能立即执行这两个操作的方法. 为此, 我们需要一个不同的同步原语.
- async def add_new_win(pool, winner):
- task1 = pool.zincrby('wins_counter', 1, winner)
- task2 = pool.incr('total_games_played')
- await asyncio.gather(task1, task2)
首先, 调用一个异步函数不会执行其中的任何代码, 而是会先实例化一个 "任务". 根据选择的语言, 这可能被称为 coroutine, promise 或 future 等等. 对我们来说, 任务是一个对象, 它表示一个值, 该值只有在使用了 await 或其他同步原语 (如 asyncio.gather) 之后才可用. 在 Python 的官方文档中, 你可以找到更多关于 asyncio.gather 的信息. 简而言之, 它允许我们在同一时间执行多个任务. 我们需要等待它的结果, 因为一旦所有的输入任务完成, 它就会创建一个新的任务. Python 的 asyncio.gather 相当于 JavaScript 的 Promise.all,C# 的 Task.WhenAll, Kotlin 的 awaitAll 等等.
改进我们的主循环代码
我们对 add_new_win 所做的事情也可以用于主流事件处理循环. 这是我所指的代码:
- last_id = '$'
- while True:
- events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10)
- for _, e_id, e in events:
- winner = e['winner']
- await add_new_win(pool, winner)
- last_id = e_id
到目前为止, 你会注意到我们是顺序地处理每个事件. 因为在第 6 行中, 使用 await 既可以执行又可以等待 add_new_win 的完成. 有时这正是你希望发生的情况, 因为如果你不按顺序执行, 程序逻辑就会中断. 在我们的例子中, 我们并不真正关心排序, 因为我们只是更新计数器.
- last_id = '$'
- while True:
- events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10)
- tasks = []
- for _, e_id, e in events:
- winner = e['winner']
- tasks.append(add_new_win(pool, winner))
- last_id = e_id
- await asyncio.gather(*tasks)
我们现在也在并发地处理每一批事件, 并且对代码的改动是最小的. 最后要记住, 有时即使不使用 asyncio.gather, 程序也可以是高性能的. 特别是, 当你为 web 服务器编写代码并使用像 Sanic 这样的异步框架时, 该框架将以并发的方式调用你的请求处理程序, 即使你在等待每个异步函数调用, 也能确保巨大的吞吐量.
总结
下面是我们进行上面两个更改之后的完整代码示例:
- import asyncio
- import aioredis
- async def add_new_win(pool, winner):
- # Creating tasks doesn't schedule them
- # so you can create multiple and then
- # schedule them all in one go using `gather`
- task1 = pool.zincrby('wins_counter', 1, winner)
- task2 = pool.incr('total_games_played')
- await asyncio.gather(task1, task2)
- async def main():
- # Connect to Redis
- pool = await aioredis.create_redis_pool('redis://localhost', encoding='utf8')
- # Tail the event stream
- last_id = '$'
- while True:
- events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10)
- tasks = []
- for _, e_id, e in events:
- winner = e['winner']
- # Again we don't actually schedule any task,
- # and instead just prepare them
- tasks.append(add_new_win(pool, winner))
- last_id = e_id
- # Notice the spread operator (`*tasks`), it
- # allows using a single list as multiple arguments
- # to a function call.
- await asyncio.gather(*tasks)
- if __name__ == '__main__':
- loop = asyncio.get_event_loop()
- loop.run_until_complete(main())
为了利用非阻塞 I/O, 你需要重新考虑如何处理网络操作. 值得高兴的是这并不是很困难, 你只需要知道顺序性什么时候重要, 什么时候不重要. 尝试使用 aioredis 或等效的异步 Redis 客户端, 看看可以在多大程度上提高应用程序的吞吐量.
更多优质中间件技术资讯 / 原创 / 翻译文章 / 资料 / 干货, 请关注 "中间件小哥" 公众号!
来源: https://www.cnblogs.com/middleware/p/11996731.html