我们先看一下源码(我已经进行过整理的源码,主要方面大家去理解里面的实现逻辑,若想看完整的源码建议大家可以自行查看本机安装的 tornado 版本中的源代码),在源码中我做了一些批注,这样有利于大家更好的去结合代码来深入了解 其内部的运作。
代码语言:javascript复制# -*- encoding: utf-8 -*-
# !/usr/bin/python
"""
@File : learn_Gen.py
@Time : 2020/09/05 13:20
@Author : haishiniu
@Software: PyCharm
"""
# gen 模块是 "协程" 的实现。
class Runner(object):
def __init__(self, gen, result_future, first_yielded):
self.gen = gen
self.result_future = result_future
self.future = _null_future
self.running = False
self.finished = False
self.had_exception = False
self.io_loop = IOLoop.current()
# 1 将first_yielded转换成Future,并保存到self.future
# 2 如果self.future已经完成,那么调用self.run
# 3如果self.future尚未完成,那么将self.future添加到IOLoop
# (把一个Future添加到IOLoop意味着,当Future完成时,会将设定的回调函数添加到IOLoop,在下次事件循环时,就会执行这个回调函数)
if self.handle_yield(first_yielded):
gen = result_future = first_yielded = None
self.run()
def handle_yield(self, yielded):
if 1:
pass
else:
try:
# 将协程yield的值转换成Future对象。
# 并保存到self.future
self.future = convert_yielded(yielded)
except BadYieldError:
# 如果转换失败,则将self.future指向一个新的Future对象,并将转换失败的异常信息,保存到这个Future对象
self.future = TracebackFuture()
self.future.set_exc_info(sys.exc_info())
if not self.future.done() or self.future is moment:
# 如果self.future没有完成,或者self.future是moment,那么当self.future完成时,将self.run(间接地)添加到IOLoop。
# 下次事件循环时,就会执行self.run 并返回False
def inner(f):
# Break a reference cycle to speed GC.
f = None # noqa
self.run()
self.io_loop.add_future(
self.future, inner)
return False
# 如果self.future完成了,那么直接返回True
return True
# 需要特别说明的:
# 在协程中,不仅可以yield Future、直接一个yield(相当于yield moment),
# 还可以 res1, res2 = yield [Future1, Future2]、
# eg: res_dic = yield {key1: Future1, key2: Future2, ...}。
def run(self):
# 如果Runner正在运行或者已经停止,则返回
if self.running or self.finished:
return
try:
self.running = True
while True:
# 如果future尚未完成,则返回
future = self.future
if not future.done():
return
self.future = None
try:
orig_stack_contexts = stack_context._state.contexts
exc_info = None
# 获取future的结果
try:
value = future.result()
except Exception:
self.had_exception = True
exc_info = sys.exc_info()
future = None
# 如果future执行失败,则使用异常信息重启生成器
if exc_info is not None:
try:
yielded = self.gen.throw(*exc_info)
finally:
# Break up a reference to itself
# for faster GC on CPython.
exc_info = None
# 如果future执行成功,则使用future的结果重启生成器
else:
yielded = self.gen.send(value)
if stack_context._state.contexts is not orig_stack_contexts:
self.gen.throw(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
# 如果生成器执行过程中出现了gen.Return或StopIteration异常,则:
# 1 设置Runner的停止标记
# 2 从异常对象中提取结果,并保存到代表协程执行结果的Future对象
self.finished = True
self.future = _null_future
...
self.result_future.set_result(_value_from_stopiteration(e))
self.result_future = None
...
return
except Exception:
# 如果生成器执行过程中出现了其他异常,则:
# 1 设置Runner的停止标记
# 2 将异常信息保存到代表协程执行结果的Future对象
self.finished = True
self.future = _null_future
self.result_future.set_exc_info(sys.exc_info())
self.result_future = None
self._deactivate_stack_context()
return
# 如果生成器成功yield,那么,
# 1 将yield的值转换成Future对象,并保存到self.future
# 2 如果self.future已经完成,那么继续下一次循环
# 3 如果self.future尚未完成,那么将self.future添加到IOLoop,并退出
if not self.handle_yield(yielded):
return
yielded = None
finally:
self.running = False
def convert_yielded(yielded):
# Lists and dicts containing YieldPoints were handled earlier.
# 如果 yielded是None,那么返回moment
if yielded is None:
return moment
# 如果yielded是list类型的,那么要求每个元素都是Future对象;
# 如果yielded是dict类型的,那么要求每个value都是Future对象;返回一个新的Future对象,当yielded中所有的Future对象,都完成时,
# 这个Future对象才完成。当yielded中的某些Future对象,出现异常时,
# 会将第一个异常信息,保存到这个Future对象中,其他的打印出来
elif isinstance(yielded, (list, dict)):
return multi(yielded)
# 如果yielded是Future类型的,那么直接返回
elif is_future(yielded):
return yielded
else:
# 否则,引发BadYieledError异常
raise BadYieldError("yielded unknown object %r" % (yielded,))
def multi(children, quiet_exceptions=()):
return multi_future(children, quiet_exceptions=quiet_exceptions)
def multi_future(children, quiet_exceptions=()):
if isinstance(children, dict):
keys = list(children.keys())
children = children.values()
else:
keys = None
children = list(map(convert_yielded, children))
assert all(is_future(i) for i in children)
unfinished_children = set(children)
future = Future()
if not children:
future.set_result({} if keys is not None else [])
def callback(f):
unfinished_children.remove(f)
if not unfinished_children:
result_list = []
for f in children:
try:
result_list.append(f.result())
except Exception as e:
if future.done():
if not isinstance(e, quiet_exceptions):
app_log.error("Multiple exceptions in yield list",
exc_info=True)
else:
future.set_exc_info(sys.exc_info())
if not future.done():
if keys is not None:
future.set_result(dict(zip(keys, result_list)))
else:
future.set_result(result_list)
listening = set()
for f in children:
if f not in listening:
listening.add(f)
f.add_done_callback(callback)
return future
class Return(Exception):
def __init__(self, value=None):
super(Return, self).__init__()
self.value = value
self.args = (value,)
以上就是tornado 中关于协程运行的核心逻辑,我们可以简单的总结一下:
1.gen.coroutine修饰的函数,就是一个“协程”,调用(或者叫启动)一个协程,会返回一个Future对象。
2.在协程中,可以通过抛出StopIteration或gen.Return异常,来终止协程的执行,并返回结果。
3.通常情况下,gen.coroutine修饰的函数都是生成函数,每次调用启动生成器,都应该: yield一个Future对象,当Future对象完成时,gen会使用Future对象的结果,重启生成器
4.抛出StopIteration或gen.Return异常,结束协程,并返回。
好,本期的分享到此结束,接下来我们会继续分享有关Tornado 的相关内容。