抽丝剥茧分析asyncio事件调度的核心原理

2023-06-07,,

先来看一下一个简单的例子

例1:

async def foo():
print('enter foo ...')
await bar()
print('exit foo ...') async def bar():
print('enter bar ...')
print('exit bar ...') f = foo()
try:
f.send(None)
except StopIteration as e:
print(e.value)

例2:

async def foo():
print('enter foo ...')
try:
bar().send(None)
except StopIteration as e:
pass
print('exit foo ...') async def bar():
print('enter bar ...')
print('exit bar ...') f = foo()
try:
f.send(None)
except StopIteration as e:
print(e.value)

也就是说 await bar() 等价于这个

try:
bar().send(None)
except StopIteration as e:
pass

更进一步来讲,await 协程的嵌套就跟函数调用一样,没什么两样。

def foo():
print('enter foo ...')
bar()
print('exit foo ...') def bar():
print('enter bar ...')
print('exit bar ...') foo()

理解了跟函数调用一样就可以看看成是这样:

执行f.send(None)时其实就是执行

print('enter foo ...')		

    print('enter bar ...')
print('exit bar ...') print('exit foo ...')

例3:

class Future:

    def __iter__(self):
print('enter Future ...')
yield self
print('foo 恢复执行')
print('exit Future ...') __await__ = __iter__ async def foo():
print('enter foo ...')
await bar()
print('exit foo ...') async def bar():
future = Future()
print('enter bar ...')
await future
print('exit bar ...') f = foo()
try:
f.send(None)
print('foo 挂起在yield处 ')
print('--'*10)
f.send(None)
except StopIteration as e:
print(e.value)

执行结果:

enter foo ...
enter bar ...
enter Future ...
foo 挂起在yield处
--------------------
foo 恢复执行
exit Future ...
exit bar ...
exit foo ...
None

Future是一个Awaitable对象,实现了__await__方法,await future 实际上是会进入到future.__await__方法中也就是future.__iter__方法中的逻辑,执行到 yield self 处foo协程才真正被挂起,返回future对象本身,f.send(None)才真正的执行完毕,

第一次调用f.send(None),执行:

  print('enter foo ...')
print('enter bar ...')
print('enter Future ...')

被挂起

第二次调用f.send(None),执行:

  print('exit Future ...')
print('exit bar ...')
print('exit foo ...')

也就是说这样一个foo协程完整的调用过程就是如下过程:

- foo print('enter foo ...')
- bar print('enter bar ...')
- future print('enter Future ...')   # 以上是第一次f.send(None)执行的逻辑,命名为part1
- future yield self ---------------------------------------------------------------
- future print('exit Future ...')   # 以下是第二次f.send(None)执行的逻辑,命名为part2
- bar print('exit bar ...')
- foo print('exit foo ...')

加入我们把这两次f.send(None)调用的逻辑分别命名成part1和part2,那也就是说,通过future这个对象,准确的说是yield关键字,真正的把foo协程要执行的完整逻辑分成了两部分part1和patr2。并且foo的协程状态会被挂起在yield处,这样就要调用两次f.send(None)才能,执行完foo协程,而不是在例2中,直接只调用一次f.send(None)就执行完了foo协程。这就是Future对象的作用。

重点:没有 await future 的协程是没有灵魂的协程,并不会被真正的挂起,只需一次 send(None) 调用即可执行完毕,只有有 await future

的协程才是真正可以被挂起的协程,需要执行两次 send(None) 才能执行完该协程的完整逻辑。

这里小结一下Future的作用

    yield 起到了挂起协程的作用。

    通过 yield 把 foo 协程的执行逻辑真正的分成了 part1 和 part2 两部分。

例4:

class Future:

    def __iter__(self):
print('enter Future ...')
print('foo 挂起在yield处 ')
yield self
print('foo 恢复执行')
print('exit Future ...')
return 'future' __await__ = __iter__ class Task: def __init__(self, cor):
self.cor = cor def _step(self):
cor = self.cor
try:
result = cor.send(None)
except Exception as e:
pass async def foo():
print('enter foo ...')
await bar()
print('exit foo ...') async def bar():
future = Future()
print('enter bar ...')
await future
print('exit bar ...') f = foo() task = Task(f)
task._step()
print('--' * 10)
task._step()

执行结果:

enter foo ...
enter bar ...
enter Future ...
foo 挂起在yield处
--------------------
foo 恢复执行
exit Future ...
exit bar ...
exit foo ...

这个例4与例3不同在于,现在有一个Task类,我们把f.send(None)d的操作,封装在了 Task 的 _step 方法中,调用 task._step() 等于是执行 part1 中的逻辑,再次调用

task._step() 等于是执行part2中的逻辑。现在不想手动的 两次调用task._step() 方法,我们写一个简单的Loop类来帮忙完成对task._step的多次调用,请看下面这个例子。

例5:

class Future:

    def __iter__(self):
print('enter Future ...')
print('foo 挂起在yield处 ')
yield self
print('foo 恢复执行')
print('exit Future ...')
return 'future' __await__ = __iter__ class Task: def __init__(self, cor, *, loop=None):
self.cor = cor
self._loop = loop def _step(self):
cor = self.cor
try:
result = cor.send(None)
except StopIteration as e:
self._loop.close()
except Exception as e:
pass class Loop: def __init__(self):
self._stop = False def create_task(self, cor):
task = Task(cor, loop = self)
return task def run_until_complete(self, task):
while not self._stop:
task._step() def close(self):
self._stop = True async def foo():
print('enter foo ...')
await bar()
print('exit foo ...') async def bar():
future = Future()
print('enter bar ...')
await future
print('exit bar ...') if __name__ == '__main__': f = foo()
loop = Loop()
task = loop.create_task(f)
loop.run_until_complete(task)

执行结果:

enter foo ...
enter bar ...
enter Future ...
foo 挂起在yield处
foo 恢复执行
exit Future ...
exit bar ...
exit foo ...

例5中我们实现了一个简单 Loop 类,在while循环中调用task._step方法。

例6:

class Future:

    def __init__(self, *, loop=None):
self._result = None
self._callbacks = [] def set_result(self, result):
self._result = result
callbacks = self._callbacks[:]
self._callbacks = []
for callback in callbacks:
loop._ready.append(callback) def add_callback(self, callback):
self._callbacks.append(callback) def __iter__(self):
print('enter Future ...')
print('foo 挂起在yield处 ')
yield self
print('foo 恢复执行')
print('exit Future ...')
return 'future' __await__ = __iter__ class Task: def __init__(self, cor, *, loop=None):
self.cor = cor
self._loop = loop def _step(self):
cor = self.cor
try:
result = cor.send(None)
# 1. cor 协程执行完毕时,会抛出StopIteration,说明cor执行完毕了,这是关闭loop
except StopIteration as e:
self._loop.close()
# 2. 有异常时
except Exception as e:
"""处理异常逻辑"""
# 3. result为Future对象时
else:
if isinstance(result, Future):
result.add_callback(self._wakeup)
# 立即调用,让下一loop轮循环中立马执行self._wakeup
result.set_result(None) def _wakeup(self):
self._step() class Loop: def __init__(self):
self._stop = False
self._ready = []
def create_task(self, cor):
task = Task(cor, loop = self)
self._ready.append(task._step)
return task def run_until_complete(self, task): assert isinstance(task, Task) while not self._stop:
n = len(self._ready)
for i in range(n):
step = self._ready.pop()
step()
def close(self):
self._stop = True async def foo():
print('enter foo ...')
await bar()
print('exit foo ...') async def bar():
future = Future(loop=loop)
print('enter bar ...')
await future
print('exit bar ...') if __name__ == '__main__': f = foo()
loop = Loop()
task = loop.create_task(f)
loop.run_until_complete(task)

执行结果:

enter foo ...
enter bar ...
enter Future ...
foo 挂起在yield处
foo 恢复执行
exit Future ...
exit bar ...
exit foo ...

在例6中,我们构建了3个稍微复杂点类,Loop类,Task, Future类,这3个类在整个协程执行流程的调度过程中有很强的相互作用关系。

Future

挂起协程的执行流程,把协程的逻辑分为part1和part2两部分。

Task

把协程的part1和part2逻辑封装到task._step和task._wakeup方法中,在不同的时机分别把它们注册到loop对象中,task._step是创建task实例的时候就注册到了loop中,task._wakeup则是在task._setp执行完挂在yield future处,由于有await future语句的存在,必然是返回一个future对象,判断确实是一个future对象,就把task._wakeup注册到future中,future.set_result()则会在合适的时机被调用,一旦它被调用,就会把future中注册的task._wakeup注册到loop中,然后就会在loop循环中调用task._wakeup,协程的part2的逻辑才得以执行,最后抛出StopIteration异常。

Loop

在一个死循环中执行注册到loop中的task._step和task._wakeup方法,完成对协程完整逻辑的执行。

虽然我们自己构建的这三个类的实现很简单,但是这体现asyncio实现事件循环的核心原理,我们实现loop中并没有模拟耗时等待以及对真正IO事件的监听,对应于asyncio来说,它也是构建了Future, Task, Loop这3个类,只是功能要比我们自己构建的要复杂得多,loop对象的while中通过select(timeout)函数的调用实现模拟耗时操作和实现了对网络IO事件的监听,这样我们只要在写了一个执行一个IO操作时,都会有一个future对象 await future,通过future来挂起当前的协程,比如想进行一个socket连接,协程的伪代码如下:

future = Future
# 非阻塞调用,需要try...except...
socket.connect((host, port))
# 注册一个回调函数到write_callbackselect中,只要socket发生可写事件,就执行回调
add_writer(write_callback, future)
await future
...

当我们在调用socket.connect((host, port)),因为是非阻塞socket,会立马返回,然后把这个write_callback, future注册成select的可写事件的回调函数,这个回调函数什么时候被执行呢,就是在loop循环的select(timeout)返回了可写事件时才会触发,回调函数中会调用future.set_result(),也就是说future.set_result的触发时机是在socket连接成功时,select(timeout)返回了可写事件时,future.set_result的作用就是把协程的part2部分注册到loop,然后在下一轮的循环中立即调用,使得协程的await future下面的语句得以继续执行。

由于我这里没有贴asyncio的loop,task,future对象的源码,所以这个例子看起来会很抽象,在上一篇asyncio中贴了这几个类的源码,想详细了解的可以查看我的上一篇文章《asyncio系列之简单协程的基本执行流程分析》。小伙伴们也可以对照着asyncio的源码来debug,这样再来理解这里说的这个例子就比较容易了。

下一篇将介绍asyncio.sleep()的实现机制。

抽丝剥茧分析asyncio事件调度的核心原理的相关教程结束。

《抽丝剥茧分析asyncio事件调度的核心原理.doc》

下载本文的Word格式文档,以方便收藏与打印。