先来看一下一个简单的例子
例 1:
- async def foo():
- print('enter foo ...')
- await bar()
- print('exit foo ...')
- async def bar():
- print('enter bar ...')
- print('exit bar ...')
- f = foo()
- try:
- f.send(None)
- except StopIteration as e:
- print(e.value)
例 2:
- async def foo():
- print('enter foo ...')
- try:
- bar().send(None)
- except StopIteration as e:
- pass
- print('exit foo ...')
- async def bar():
- print('enter bar ...')
- print('exit bar ...')
- f = foo()
- try:
- f.send(None)
- except StopIteration as e:
- print(e.value)
也就是说 await bar() 等价于这个
- try:
- bar().send(None)
- except StopIteration as e:
- pass
更进一步来讲, await 协程的嵌套就跟函数调用一样, 没什么两样.
- def foo():
- print('enter foo ...')
- bar()
- print('exit foo ...')
- def bar():
- print('enter bar ...')
- print('exit bar ...')
- foo()
理解了跟函数调用一样就可以看看成是这样:
执行 f.send(None) 时其实就是执行
- print('enter foo ...')
- print('enter bar ...')
- print('exit bar ...')
- print('exit foo ...')
例 3:
- class Future:
- def __iter__(self):
- print('enter Future ...')
- yield self
- print('foo 恢复执行')
- print('exit Future ...')
- __await__ = __iter__
- async def foo():
- print('enter foo ...')
- await bar()
- print('exit foo ...')
- async def bar():
- future = Future()
- print('enter bar ...')
- await future
- print('exit bar ...')
- f = foo()
- try:
- f.send(None)
- print('foo 挂起在 yield 处')
- print('--'*10)
- f.send(None)
- except StopIteration as e:
- print(e.value)
执行结果:
- enter foo ...
- enter bar ...
- enter Future ...
foo 挂起在 yield 处
--------------------
foo 恢复执行
- exit Future ...
- exit bar ...
- exit foo ...
- None
Future 是一个 Awaitable 对象, 实现了__await__方法, await future 实际上是会进入到 future.__await__方法中也就是 future.__iter__方法中的逻辑, 执行到 yield self 处 foo 协程才真正被挂起, 返回 future 对象本身, f.send(None) 才真正的执行完毕,
第一次调用 f.send(None), 执行:
- print('enter foo ...')
- print('enter bar ...')
- print('enter Future ...')
被挂起
第二次调用 f.send(None), 执行:
- print('exit Future ...')
- print('exit bar ...')
- print('exit foo ...')
也就是说这样一个 foo 协程完整的调用过程就是如下过程:
- - foo print('enter foo ...')
- - bar print('enter bar ...')
- - future print('enter Future ...') # 以上是第一次 f.send(None) 执行的逻辑, 命名为 part1
- - future yield self ---------------------------------------------------------------
- - print('exit Future ...') # 一下是第二次 f.send(None) 执行的逻辑, 命名为 part2
- - bar print('exit bar ...')
- - foo print('exit foo ...')
加入我们把这两次 f.send(None) 调用的逻辑分别命名成 part1 和 part2, 那也就是说, 通过 future 这个对象, 准确的说是 yield 关键字, 真正的把 foo 协程要执行的完整逻辑分成了两部分 part1 和 patr2. 并且 foo 的协程状态会被挂起在 yield 处, 这样就要调用两次 f.send(None) 才能, 执行完 foo 协程, 而不是在例 2 中, 直接只调用一次 f.send(None) 就执行完了 foo 协程. 这就是 Future 对象的作用.
这里小结一下 Future 的作用
yield 起到了挂起协程的作用.
通过 yield 把 foo 协程的执行逻辑真正的分成了 part1 和 part2 两部分.
例 4:
- class Future:
- def __iter__(self):
- print('enter Future ...')
- print('foo 挂起在 yield 处')
- yield self
- print('foo 恢复执行')
- print('enter Future ...')
- return 'future'
- __await__ = __iter__
- class Task:
- def __init__(self, cor):
- self.cor = cor
- def _step(self):
- cor = self.cor
- try:
- result = cor.send(None)
- except Exception as e:
- pass
- async def foo():
- print('enter foo ...')
- await bar()
- print('exit foo ...')
- async def bar():
- future = Future()
- print('enter bar ...')
- await future
- print('exit bar ...')
- f = foo()
- task = Task(f)
- task._step()
- print('--' * 10)
- task._step()
执行结果:
- enter foo ...
- enter bar ...
- enter Future ...
foo 挂起在 yield 处
--------------------
foo 恢复执行
- exit Future ...
- exit bar ...
- exit foo ...
这个例子与例 3 不同在于, 现在有一个 Task 类, 我们把 f.send(None)d 的操作, 封装在了 Task 的_step 方法中, 调用 task._step() 等于是执行 part1 中的逻辑, 再次调用 task._step() 等于是执行 part2 中的逻辑. 现在不想手动的 task._step() 这样, 在看下面的例子
例 5:
- class Future:
- def __iter__(self):
- print('enter Future ...')
- print('foo 挂起在 yield 处')
- yield self
- print('foo 恢复执行')
- print('enter Future ...')
- return 'future'
- __await__ = __iter__
- class Task:
- def __init__(self, cor, *, loop=None):
- self.cor = cor
- self._loop = loop
- def _step(self):
- cor = self.cor
- try:
- result = cor.send(None)
- except StopIteration as e:
- self._loop.close()
- except Exception as e:
- pass
- class Loop:
- def __init__(self):
- self._stop = False
- def create_task(self, cor):
- task = Task(cor, loop = self)
- return task
- def run_until_complete(self, task):
- while not self._stop:
- task._step()
- def close(self):
- self._stop = True
- async def foo():
- print('enter foo ...')
- await bar()
- print('exit foo ...')
- async def bar():
- future = Future()
- print('enter bar ...')
- await future
- print('exit bar ...')
- if __name__ == '__main__':
- f = foo()
- loop = Loop()
- task = loop.create_task(f)
- loop.run_until_complete(task)
执行结果:
- enter foo ...
- enter bar ...
- enter Future ...
foo 挂起在 yield 处
foo 恢复执行
- exit Future ...
- exit bar ...
- exit foo ...
例 4 中我们实现了一个简单 Loop 类, 在 while 循环中调用 task._step 方法.
例 5:
- class Future:
- def __init__(self, *, loop=None):
- self._result = None
- self._callbacks = []
- def set_result(self, result):
- self._result = result
- callbacks = self._callbacks[:]
- self._callbacks = []
- for callback in callbacks:
- loop._ready.append(callback)
- def add_callback(self, callback):
- self._callbacks.append(callback)
- def __iter__(self):
- print('enter Future ...')
- print('foo 挂起在 yield 处')
- yield self
- print('foo 恢复执行')
- print('enter Future ...')
- return 'future'
- __await__ = __iter__
- class Task:
- def __init__(self, cor, *, loop=None):
- self.cor = cor
- self._loop = loop
- def _step(self):
- cor = self.cor
- try:
- result = cor.send(None)
- # 1. cor 协程执行完毕时, 会抛出 StopIteration, 说明 cor 执行完毕了, 这是关闭 loop
- except StopIteration as e:
- self._loop.close()
- # 2. 有异常时
- except Exception as e:
- """处理异常逻辑"""
- # 3. result 为 Future 对象时
- else:
- if isinstance(result, Future):
- result.add_callback(self._wakeup)
- # 立即调用, 让下一 loop 轮循环中立马执行 self._wakeup
- result.set_result(None)
- def _wakeup(self):
- self._step()
- class Loop:
- def __init__(self):
- self._stop = False
- self._ready = []
- def create_task(self, cor):
- task = Task(cor, loop = self)
- self._ready.append(task._step)
- return task
- def run_until_complete(self, task):
- assert isinstance(task, Task)
- while not self._stop:
- n = len(self._ready)
- for i in range(n):
- step = self._ready.pop()
- step()
- def close(self):
- self._stop = True
- async def foo():
- print('enter foo ...')
- await bar()
- print('exit foo ...')
- async def bar():
- future = Future(loop=loop)
- print('enter bar ...')
- await future
- print('exit bar ...')
- if __name__ == '__main__':
- f = foo()
- loop = Loop()
- task = loop.create_task(f)
- loop.run_until_complete(task)
执行结果:
- enter foo ...
- enter bar ...
- enter Future ...
foo 挂起在 yield 处
foo 恢复执行
- exit Future ...
- exit bar ...
- exit foo ...
到此为止, 我们构建了三个稍微复杂点的 Loop 类, Task, Future 类, 这 3 个类在整个协程执行流程的调度过程中有很强的相互作用关系.
Future
挂起协程的执行流程, 把协程的逻辑分为 part1 和 part2 两部分.
Task
把协程的 part1 和 part2 逻辑封装到 task._step 和 task._wakeup 方法中, 在不同的时机分别把它们注册到 loop 对象中, task._step 是创建 task 实例的时候就注册到了 loop 中, task._wakeup 则是在 task._setp 执行完挂在 yield future 处, 由于有 await future 语句的存在, 必然是返回一个 future 对象, 判断确实是一个 future 对象, 就把 task._wakeup 注册到 future 中, future.set_result() 则会在合适的时机被调用, 一旦它被调用, 就会把 future 中注册的 task._wakeup 注册到 loop 中, 然后就会在 loop 循环中调用 task._wakeup, 协程的 part2 的逻辑才得以执行, 最后抛出 StopIteration 异常.
Loop
在一个死循环中执行注册到 loop 中的 task._step 和 task._wakeup 方法, 完成对协程完整逻辑的执行.
虽然我们自己构建的这三个类的实现很简单, 但是这体现 asyncio 实现事件循环的核心原理, 我们实现 loop 中并没有模拟耗时等待以及对真正 IO 事件的监听, 对应于 asyncio 来说, 它也是构建了 Future, Task, Loop 这 3 个类, 只是功能要比我们自己构建的要复杂得多, loop 对象的 while 中通过 select(timeout) 函数的调用实现模拟耗时操作和实现了对网络 IO 事件的监听, 这样我们只要在写了一个执行一个 IO 操作时, 都会有一个 future 对象 await future, 通过 future 来挂起当前的协程, 比如想进行一个 socket 连接, 协程的伪代码如下:
- future = Future
- # 非阻塞调用, 需要 try...except...
- socket.connect((host, port))
- # 注册一个回调函数到 write_callbackselect 中, 只要 socket 发生可写事件, 就执行回调
- add_writer(write_callback, future)
- await future
- ...
当我们在调用 socket.connect((host, port)), 因为是非阻塞 socket, 会立马返回, 然后把这个 write_callback, future 注册成 select 的可写事件的回调函数, 这个回调函数什么时候被执行呢, 就是在 loop 循环的 select(timeout) 返回了可写事件时才会触发, 回调函数中会调用 future.set_result(), 也就是说 future.set_result 的触发时机是在 socket 连接成功时, select(timeout) 返回了可写事件时, future.set_result 的作用就是把协程的 part2 部分注册到 loop, 然后在下一轮的循环中立即调用, 使得协程的 await future 下面的语句得以继续执行.
这里这个例子举的很抽象, 小伙伴们可以对照着 asyncio 的源码来 debug, 这样再来理解这里说的这个例子就比较容易了.
下一篇会接收 asyncio 的 sleep 实现机制.
来源: https://www.cnblogs.com/naralv/p/11152613.html