在使用 Python 的 asyncio 库实现异步编程的过程中,协程与事件循环这两个概念可以说有着千丝万缕的联系,常常是形影不离的出现,如胶似漆般的存在,asyncio 库到底是如何调度协程的? 下面以 Python 3.8 中的 asyncio.sleep
定时器为例研究一手 asyncio 的源码实现。
几个主要的概念
首先需要对 asyncio 中的几个主要函数和模块做一个初步认识:
asyncio.run
是启动事件循环的入口,接收一个协程作为参数。asyncio.BaseEventLoop
就是事件循环基类了,子类常用的是_UnixSelectorEventLoop
,但核心调度逻辑都在基类中,其中最主要的是run_forever
函数用来启动事件循环;另一个主要的函数是create_task
,用来创建一个Task
对象并放到事件循环中,准备在下一次循环时执行。asyncio.events.Handle
和asyncio.events.TimerHandle
是放到loop
中的处理对象,其中_callback
属性保存的是一个回调函数,处理对象执行时调用的就是这个函数,回调函数参数放在_args
属性中。asyncio.futures.Future
作为一个事件在未来完成的占位符,当事件完成后可通过Future.set_result
方法将事件的结果设置进去。asyncio.tasks.Task
是Future
类的子类,可以理解为是对协程的包装,在Future
基础上增加了启动协程和恢复协程的能力,主要逻辑在Task.__step
函数中。
从简单例子开始
先从最简单的一段代码开始
这段代码启动一个 main
协程,协程输出两行内容后完成结束,这里先不加入任何 await
异步操作,主要看一下事件循环是怎样初始化和启动的,只保留了关键代码。
loop 的初始化
首先看 asyncio.run
函数,内容比较简单,初始化一个事件循环 loop
,然后调用 loop.run_until_complete(main)
启动并传入 main
协程。
Task 的初始化
接着来到 asyncio.base_events.BaseEventLoop.run_until_complete
,首先调用了 asyncio.tasks.ensure_future
函数,目的是将传入的 main
协程转换成一个 Task
对象,在创建 Task
的过程中会将 Task
对象加入到 loop
的队列中,之后调用 self.run_forever
启动事件循环。
确切的说应该是将 Task.__step
函数包装到 Handle
对象中,之后加入到 loop
的队列中,稍后会看到这个细节。
再看一下 Task.__init__
,其中 _coro
保存了传入的协程 coro
对象,实际上可以将 Task
视为一个协程的包装,在初始化的后面调用了 loop.call_soon(self.__step, context=self._context)
函数,将 Task
对象自己的 __step
函数加入到 loop
队列,当 loop
启动后便会执行这个函数。
再看一下 loop.call_soon
做了什么,接受一个 callback
参数,在这里就代表 Task.__step
,接着会调用 _call_soon
函数,在 _call_soon
函数中初始化了 events.Handle
对象,然后将 handle
对象加入到 loop._ready
队列中。
在看一眼 Handle
的初始化,主要就是将 callback
保存下来,并且用 args
表示 callback
的参数。
Handle 的一个主要的函数是 _run
,当 loop
启动后会从 loop._ready
队列中取出 Handle
执行,执行的就是 _run
函数,_run
函数中 self._context.run(self._callback, *self._args)
其实就是在原有 context
环境下执行回调函数并传入 args
参数。
到这里先总结一下,通过 asyncio.run(main())
添加了一个协程,然后将协程 main
包装成 Task
,并将 Task.__step
包装成 Handle
放到 loop._ready
队列中,接下来就是真正启动 loop
了。
loop 的启动
asyncio.base_events.BaseEventLoop.run_until_complete
,在封装完 main
协程后会先添加一个回调函数 _run_until_complete_cb
,回调函数会在 main
协程执行完后执行,内容就是将 loop
设置成关闭。
接着的 run_forever
函数就是启动 loop
了。
run_forever
中做了一些初始检查和设置,然后进入 while
循环并在循环中调用 _run_once
,_run_once
就是一次事件循环的核心调度逻辑了。
loop 调度的核心逻辑
核心调度逻辑在 _run_once
中。loop
主要有两个队列存放协程任务对应的 Handle
,一个是 _scheduled
用来存放定时类协程,它是一个最小堆实现的优先队列,例如使用 asyncio.sleep
就会存进去一个 TimerHandle
对象;另一个是 _ready
用来存放准备好执行的协程,而 _scheduled
中有准备好的协程会取出来放入 _ready
中,loop
最终执行 Handle
都是从 _ready
中取出的。
_run_once
中做的事情分四个部分,第一部分是清理 _scheduled
;第二部分是调用多路复用 IO 并处理就绪的描述符;第三部分是将到期的 _scheduled
转移到 _ready
;第四部分遍历 _ready
并逐一启动处理函数 handle._run
;
Handle._run
没啥说的,直接调用 Handle._callback
,并且将 Handle._args
作为参数传进去。
还记得 loop
是怎么启动的吗?将 main
协程包装成 Task
,在创建 Task
时将 Task.__step
作为 callback
生成了一个 Handle
并放到了 loop._ready
中,所以这里 Handle._run
其实执行的就是根据 main
协程生成出来的 Task.__step
。Task.__step
是协程启动和协程暂停恢复的关键
协程的启动
Task._coro
属性保存了协程,通过 result = coro.send(None)
启动协程,由此进入到 main
协程中,打印出 main start
和 main end
。
之后 main
协程结束,抛出 StopIteration
异常,调用 super().set_result(exc.value)
给 Task._result
设置结果并将 _state
标记成 _FINISHED
,之后调用 __schedule_callbacks
触发 Task
上注册的回调函数,在这里 mian
协程注册的就是 _run_until_complete_cb
用来结束 loop
的,将回调函数放在传给 loop.call_soon
等待下一轮事件循环来触发。
到这里就可能看到一个协程是如何传给 loop
并启动的了,也知道了 loop
的大概流程。下面在 main
中加入 asyncio.sleep
看看定时器是如何调度的。
asyncio.sleep 如何定时
main
中加入一个 asyncio.sleep
看看定时是如何实现的
loop
的初始化和启动还是一样的,直接看看 Task.__step
是如何调度的,其中调用 result = coro.send(None)
会启动协程,首先输出 main start
,然后调用 asyncio.sleep(3)
协程的挂起
首先常见一个空的 Future
对象 future
,然后调用的 loop.call_later(delay, futures._set_result_unless_cancelled, future, result)
,然后一路向下调用 loop.call_at
,最后生成了一个 TimerHandle
对象 push 进 loop._scheduled
堆中。
TimerHandle
其实就比 Handle
多了个 _when
属性表示何时可以恢复运行,当时间到了会调用 TimerHandle._run
执行 TimerHandle
的 callback
,也就是 _set_result_unless_cancelled(future, result)
用来给 future
设置结果。
asyncio.sleep
的函数签名是 asyncio.sleep(delay, result=None)
,一般不传第二个参数所以结果是 None
,如果传的话之后会将结果设置到 future
对象里面。
asyncio.sleep
函数的最后将 future
返回并挂起自己,控制权又交还给 Task.__step
中 result = coro.send(None)
的位置,result
接到的就是 future
对象。
result 接到 future 后向下执行到 result.add_done_callback(self.__wakeup, context=self._context)
给 future
设置一个回调函数 Task.__wakeup
,到这里本轮循环就结束了。
到目前为止 loop
的状态是 _scheduled
堆中有一个 TimerHandle
对象,对象的 _when
表示剩余启动的秒数,对象的 _callback
指向的是 futures._set_result_unless_cancelled
参数是一个 future
,这个 future
的 callbacks
回调列表中有一个 main
协程生成的 Task.__wakeup
。
协程的恢复
本轮循环结束,下一轮循环时会检查 loop._scheduled
发现 TimerHandle
已经到期,将其放到 loop._ready
队列中,紧接着就取出执行 TimerHandle._run
,也就是执行 futures._set_result_unless_cancelled(future, None)
,其实就是给 future
设置结果、标记完成、执行 future
的回调函数。
还记得 future
是怎么来的以及 future
里面是啥吗?future
是在 asyncio.sleep
时生成并通过 await
返回的,返回给 Task.__step
后通过 add_done_callback(self.__wakeup)
为其添加了一个回调函数。
所以到此为止干的事儿就是遍历 future
的 callbacks
逐一通过 loop.call_soon()
添加到 loop
中,等待下一轮事件循环执行,这里添加的就是 main
Task
的 __wakeup
函数。
进入下一轮循环,loop._ready
中有一个 Handle
,其内部的 _coro
代表的是 main
Task
的 __wakeup
,取出来执行 Handle._run
实际上就是执行 main
Task.__wakeup
。
__wakeup
也很简单就是确认 future
是已完成状态并调用 __step
,控制权有交给了之前挂起的 main
Task
。
当 Task.__step
再一次执行到 result = coro.send(None)
时,便会恢复之前的 sleep
协程接着执行 return
,回到了 main
函数中,继续执行并输出 main end
最后完成,抛出 StopIteration
异常,被 Task.__step
捕获,整个协程结束,之后事件循环做收尾工作也关闭,事件循环也关闭,到这里整个程序就结束了。
总结
asyncio 中的定时通过 asyncio.sleep
实现,原理是在事件循环中维护一个最小堆实现的优先队列 _scheduled
,其中保存的都是定时任务处理对象 Handle
,越早到期 Handle
就会越早被取出来并加入到 loop._ready
队列,在下一轮循环时取出并从挂起的位置恢复执行。
由于协程代码在执行时会切换控制权导致代码逻辑跳来跳去,有时会被绕晕,借助定时器的调度可以让整个事件循环的逻辑更加清晰。