Coroutines and Tasks 属于 High-level APIs, 也就是高级层的 API.
本节概述用于协程和任务的高级异步 API.
Coroutines
Coroutines 翻译过来意思是协程,
使用 async/await 语法声明的协程是编写 asyncio 应用程序的首选方法.
- import asyncio
- async def main():
- print("hello")
- await asyncio.sleep(1)
- print("world")
- if __name__ == '__main__':
- # asyncio.run(main()) # 3.7 的用法
- # 阻塞直到 hello world()协程结束时返回
- loop = asyncio.get_event_loop()
- loop.run_until_complete(main())
第一个异步函数是通过创建 loop 循环去调用, 其他异步函数之间通过 await 进行调用.
像下面的一个例子
- import asyncio
- import time
- async def say_after(delay, what):
- await asyncio.sleep(delay)
- print(what)
- async def main():
- print(f"started at {time.strftime('%X')}")
- await say_after(1, 'hello')
- await say_after(2, 'world')
- print(f"finished at {time.strftime('%X')}")
- if __name__ == '__main__':
- loop = asyncio.get_event_loop()
- # 阻塞直到 hello world()协程结束时返回
- loop.run_until_complete(main())
- loop.close()
或者我们可以通过 asyncio.create_task()将协程 say_after 封装任务去调用就像下面这样.
- async def main():
- task1 = asyncio.create_task(
- say_after(1, 'hello'))
- task2 = asyncio.create_task(
- say_after(2, 'world'))
- print(f"started at {time.strftime('%X')}")
- # 等待两个子任务完成
- await task1
- await task2
- print(f"finished at {time.strftime('%X')}")
- Awaitables
我们说, 如果一个对象可以用在 await 表达式中, 那么它就是 Awaitables 的对象.
可等待对象主要有三种类型: coroutines, Tasks, and Futures.
Coroutines
前面的代码中演示了协程的运作方式, 这里主要强调两点.
协程函数: asyc def 定义的函数;
协程对象: 通过调用协程函数返回的对象.
Tasks
任务对协程进一步封装, 其中包含任务的各种状态.
协程对象不能直接运行, 在注册事件循环的时候, 其实是 run_until_complete 方法将协程包装成为了一个任务 (task) 对象.
- import asyncio
- async def nested():
- await asyncio.sleep(2)
- print("等待 2s")
- async def main():
- # 将协程包装成任务含有状态
- # task = asyncio.create_task(nested())
- task = asyncio.ensure_future(nested())
- print(task)
- # "task" can now be used to cancel "nested()", or
- # can simply be awaited to wait until it is complete:
- await task
- print(task)
- print(task.done())
- if __name__ == '__main__':
- loop = asyncio.get_event_loop()
- try:
- loop.run_until_complete(main())
- except KeyboardInterrupt as e:
- for task in asyncio.Task.all_tasks():
- print(task)
- task.cancel()
- print(task)
- loop.run_forever() # restart loop
- finally:
- loop.close()
可以看到
<Task pending coro=<nested() running at /Users/chennan/pythonproject/asyncproject/asyncio-cn/1-2-1.py:9>>
等待 2s
- <Task finished coro=<nested() done, defined at /Users/chennan/pythonproject/asyncproject/asyncio-cn/1-2-1.py:9>
- result=None> True
创建 task 后, task 在加入事件循环之前是 pending 状态然后调用 nested 函数等待 2s 之后打印 task 为 finished 状态. asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以创建一个 task,python3.7 增加了 asyncio.create_task(coro). 其中 task 是 Future 的一个子类
Future
future: 代表将来执行或没有执行的任务的结果. 它和 task 上没有本质的区别
通常不需要在应用程序级别代码中创建 Future 对象.
future 对象有几个状态:
- Pending
- Running
- Done
- Cancelled
通过上面的代码可以知道创建 future 的时候, task 为 pending, 事件循环调用执行的时候是 running, 调用完毕自然就是 done 于是调用 task.done()打印了 true.
如果在命令行中运行上述代码, ctrl+c 后会发现
输出以下内容
- <Task pending coro=<nested() running at 1-2-1.py:9>
- > ^C
- <Task pending coro=<main() running at 1-2-1.py:21>
- wait_for=
- <Task pending coro=<nested() running at 1-2-1.py:10>
- wait_for=
- <Future pending cb=[<TaskWakeupMethWrapper object at 0x10d342978>
- ()]> cb=[
- <TaskWakeupMethWrapper object at 0x10d342918>
- ()]>>
- <Task pending coro=<main() running at 1-2-1.py:21>
- wait_for=
- <Task pending coro=<nested() running at 1-2-1.py:10>
- wait_for=
- <Future cancelled>
- cb=[
- <TaskWakeupMethWrapper object at 0x10d342918>
- ()]>>
- <Task pending coro=<nested() running at 1-2-1.py:10>
- wait_for=
- <Future cancelled>
- cb=[
- <TaskWakeupMethWrapper object at 0x10d342918>
- ()]>
- <Task cancelling coro=<nested() running at 1-2-1.py:10>
- wait_for=
- <Future cancelled>
- cb=[
- <TaskWakeupMethWrapper object at 0x10d342918>
- ()]>
因为我们调用了 task.cancel() 所以可以看到此时的任务状态为取消状态.
并发的执行任务
通过使用 await+asyncio.gather 可以完成并发的操作.
asyncio.gather 用法如下.
**asyncio.gather(*AWS, loop=None, return_exceptions=False)AWS 是一系列协程, 协程都成功完成, 就返回值一个结果列表. 结果值的顺序与 AWS 中添加协程的顺序相对应.
return_exceptions=False, 其实就是如果有一个任务失败了, 就直接抛出异常. 如果等于 True 就把错误信息作为结果返回回来.
首先来一个正常情况不出错的例子:
- import asyncio
- async def factorial(name, number):
- f = 1
- for i in range(2, number + 1):
- print(f"Task {name}: Compute factorial({i})...")
- if number == 2:
- 1 / 0
- await asyncio.sleep(1)
- f *= i
- print(f"Task {name}: factorial({number}) = {f}")
- async def main():
- # Schedule three calls *concurrently*:
- res = await asyncio.gather(
- *[factorial("A", 2),
- factorial("B", 3),
- factorial("C", 4)]
- , return_exceptions=True)
- for item in res:
- print(item)
- if __name__ == '__main__':
- loop = asyncio.get_event_loop()
- try:
- loop.run_until_complete(main())
- except KeyboardInterrupt as e:
- for task in asyncio.Task.all_tasks():
- print(task)
- task.cancel()
- print(task)
- loop.run_forever() # restart loop
- finally:
- loop.close()
输入以下内容:
- Task A: Compute factorial(2)...
- Task B: Compute factorial(2)...
- Task C: Compute factorial(2)...
- Task B: Compute factorial(3)...
- Task C: Compute factorial(3)...
- Task B: factorial(3) = 6
- Task C: Compute factorial(4)...
- Task C: factorial(4) = 24
- division by zero
- None
- None
可以发现 async.gather 最后会返回一系列的结果, 如果出现了错误就把错误信息作为返回结果, 这里我当数字为 2 时人为加了异常操作 1/0, 于是返回了结果 division by zero, 对于其他的任务因为没有返回值所以是 None. 这里 return_exceptions=True 来保证了如果其中一个任务出现异常, 其他任务不会受其影响会执行到结束.
- asyncio.wait
- coroutine asyncio.wait(AWS, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
asyncio.wait 和 async.gather 用法差不多只是 async.wait 接收的是个列表.
第三个参数和 async.gather 有点区别.
参数名 | 含义 |
---|---|
FIRST_COMPLETED | 任何一个 future 完成或取消时返回 |
FIRST_EXCEPTION | 任何一个 future 出现错误将返回,如果出现异常等价于 ALL_COMPLETED |
ALL_COMPLETED | 当所有任务完成或者被取消时返回结果,默认值。 |
Timeouts
通过使用 asyncio.wait_for 来完成一个超时函数回调操作, 如果函数规定时间内未完成则报错.
**asyncio.wait_for(aw, timeout, *, loop=None)**
aw 代表一个协程, timeout 单位秒.
- async def eternity():
- # Sleep for one hour
- await asyncio.sleep(3600)
- print('yay!')
- async def main():
- # Wait for at most 1 second
- try:
- await asyncio.wait_for(eternity(), timeout=1.0)
- except asyncio.TimeoutError:
- print('timeout!')
- asyncio.run(main())
- # Expected output:
- #
- # timeout!
1 秒内 eternity 没有完成就报错了.
python3.7 中发生更改: 当 aw 由于超时而被取消时, 不再显示异常而是等待 aw 被取消.
说到 timeout 的, 如果仅仅是对一个代码块做 timeout 操作而不是等待某个协程此时推荐第三方模块 async_timeout
async_timeout
安装
pip installa async_timeout
使用方法很简单如下
- async with async_timeout.timeout(1.5) as cm:
- await inner()
- print(cm.expired)
如果 1.5s 可以运行完打印 true, 否则打印 false, 表示超时.
- asyncio.as_completed
- **asyncio.as_completed(AWS, *, loop=None, timeout=None)**
使用 as_completed 会返回一个可以迭代的 future 对象, 同样可以获取协程的运行结果, 使用方法如下:
- async def main():
- coroutine1 = do_some_work(1)
- coroutine2 = do_some_work(2)
- coroutine3 = do_some_work(4)
- tasks = [
- asyncio.ensure_future(coroutine1),
- asyncio.ensure_future(coroutine2),
- asyncio.ensure_future(coroutine3)
- ]
- for task in asyncio.as_completed(tasks):
- result = await task
- print('Task ret: {}'.format(result))
- start = now()
- loop = asyncio.get_event_loop()
- done = loop.run_until_complete(main())
- print('TIME:', now() - start)
协程嵌套
使用 async 可以定义协程, 协程用于耗时的 io 操作, 我们也可以封装更多的 io 操作过程, 这样就实现了嵌套的协程, 即一个协程中 await 了另外一个协程, 如此连接起来
官网实例:
图解:
1,run_until_complete 运行, 会注册 task(协程: print_sum)并开启事件循环 →
2,print_sum 协程中嵌套了子协程, 此时 print_sum 协程暂停 (类似委托生成器), 转到子协程(协程: compute) 中运行代码, 期间子协程需 sleep1 秒钟, 直接将结果反馈到 event loop 中, 即将控制权转回调用方, 而中间的 print_sum 暂停不操作 →
3,1 秒后, 调用方将控制权给到子协程(调用方与子协程直接通信), 子协程执行接下来的代码, 直到再遇到 wait(此实例没有)→
4, 最后执行到 return 语句, 子协程向上级协程(print_sum 抛出异常: StopIteration), 同时将 return 返回的值返回给上级协程(print_sum 中的 result 接收值),print_sum 继续执行暂时时后续的代码, 直到遇到 return 语句 →
5, 向 event loop 抛出 StopIteration 异常, 此时协程任务都已经执行完毕, 事件循环执行完成(event loop :the loop is stopped),close 事件循环.
调度线程
asyncio.run_coroutine_threadsafe(coro, loop)
等待其他线程返回一个 concurrent.futures.Future 对象, 这是一个线程安全的方法.
这个函数应该从不同的 OS 线程调用, 而不是从事件循环所在的线程调用.
- def start_loop(loop):
- asyncio.set_event_loop(loop)
- loop.run_forever()
- async def do_some_work(x):
- print('Waiting {}'.format(x))
- await asyncio.sleep(x)
- print('Done after {}s'.format(x))
- def more_work(x):
- print('More work {}'.format(x))
- time.sleep(x)
- print('Finished more work {}'.format(x))
- start = now()
- new_loop = asyncio.new_event_loop()
- t = Thread(target=start_loop, args=(new_loop,))
- t.start()
- print('TIME: {}'.format(time.time() - start))
- asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
- asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)
上述的例子, 主线程中创建一个 new_loop, 然后在另外的子线程中开启一个无限事件循环. 主线程通过 run_coroutine_threadsafe 新注册协程对象. 这样就能在子线程中进行事件循环的并发操作, 同时主线程又不会被 block. 一共执行的时间大概在 6s 左右.
- run_in_executor
- import time
- import asyncio
- async def main():
- print(f'{time.ctime()} Hello')
- await asyncio.sleep(1.0)
- print(f'{time.ctime()} Goodbye')
- loop.stop()
- def blocking(): # 1
- time.sleep(0.5) # 2
- print(f'{time.ctime()} Hello from a thread!')
- loop = asyncio.get_event_loop()
- loop.create_task(main())
- loop.run_in_executor(None, blocking) # 3
- loop.run_forever()
- pending = asyncio.Task.all_tasks(loop=loop) # 4
- group = asyncio.gather(*pending)
- loop.run_until_complete(group)
- loop.close()
输出
- Fri Jan 4 15:32:03 2019 Hello
- Fri Jan 4 15:32:04 2019 Hello from a thread!
- Fri Jan 4 15:32:04 2019 Goodbye
下面对上面的函数的序号进行讲解:
1 这个函数调用了常规的 sleep(), 这会阻塞主线程并阻止 loop 运行, 我们不能使这个函数变成协程, 更糟糕的是不能在主线程运行 loop 时调用它, 解决办法是用一个 executor 来运行它;
2 注意一点, 这个 sleep 运行时间比协程中的 sleep 运行时间要短, 后文再讨论如果长的话会发生什么;
3 该方法帮助我们在事件 loop 里用额外的线程或进程执行函数, 这个方法的返回值是一个 Future 对象, 意味着可以用 await 来切换它;
4 挂起的 task 中不包含前面的阻塞函数, 并且这个方法只返回 task 对象, 绝对不会返回 Future 对象.
绑定回调
绑定回调, 在 task 执行完毕的时候可以获取执行的结果, 回调的最后一个参数是 future 对象, 通过该对象可以获取协程返回值. 如果回调需要多个参数, 可以通过偏函数导入
- import time
- import asyncio
- now = lambda : time.time()
- async def do_some_work(x):
- print('Waiting:', x)
- return 'Done after {}s'.format(x)
- def callback(future): # 回调函数
- print('Callback:', future.result())
- start = now()
- coroutine = do_some_work(2)
- loop = asyncio.get_event_loop()
- get_future = asyncio.ensure_future(coroutine)
- task.add_done_callback(callback) # 添加回调函数
- loop.run_until_complete(get_future)
- print('TIME:', now() - start)
回调函数需要多个参数时, future 参数要放最后. 执行完成, 我们可以通过参数 future 获取协程的执行结果: future.result()
- import functools # functools.partial: 偏函数, 能将带参数的函数包装成一个新的函数
- def callback(t, future): # 回调函数 ,future 放最后
- print('Callback:', t, future.result())
- task.add_done_callback(functools.partial(callback, 2)
- asyncio.iscoroutine(obj)
- Return True if obj is a coroutine object.
判断是否为 coroutine 对象, 如果是返回 True
asyncio.iscoroutinefunction(func)
判断是否为 coroutine 函数, 如果是返回 True
参考资料
https://www.jianshu.com/p/b5e347b3a17c
来源: https://www.cnblogs.com/c-x-a/p/10220398.html