11 | Tornado源码分析:Gen 对象(下)

2020-09-09 14:31:38 浏览数 (1)

我们先看一下源码(我已经进行过整理的源码,主要方面大家去理解里面的实现逻辑,若想看完整的源码建议大家可以自行查看本机安装的 tornado 版本中的源代码),在源码中我做了一些批注,这样有利于大家更好的去结合代码来深入了解 其内部的运作。

代码语言:javascript复制
# -*- encoding: utf-8 -*-
# !/usr/bin/python
"""
@File    : learn_Gen.py
@Time    : 2020/09/05 13:20
@Author  : haishiniu
@Software: PyCharm
"""
# gen 模块是 "协程" 的实现。
class Runner(object):
    def __init__(self, gen, result_future, first_yielded):
        self.gen = gen
        self.result_future = result_future
        self.future = _null_future
        self.running = False
        self.finished = False
        self.had_exception = False
        self.io_loop = IOLoop.current()
        # 1 将first_yielded转换成Future,并保存到self.future
        # 2 如果self.future已经完成,那么调用self.run
        # 3如果self.future尚未完成,那么将self.future添加到IOLoop
        # (把一个Future添加到IOLoop意味着,当Future完成时,会将设定的回调函数添加到IOLoop,在下次事件循环时,就会执行这个回调函数)
        if self.handle_yield(first_yielded):
            gen = result_future = first_yielded = None
            self.run()

    def handle_yield(self, yielded):
        if 1:
            pass
        else:
            try:
                # 将协程yield的值转换成Future对象。
                # 并保存到self.future
                self.future = convert_yielded(yielded)
            except BadYieldError:
                # 如果转换失败,则将self.future指向一个新的Future对象,并将转换失败的异常信息,保存到这个Future对象
                self.future = TracebackFuture()
                self.future.set_exc_info(sys.exc_info())

        if not self.future.done() or self.future is moment:
            # 如果self.future没有完成,或者self.future是moment,那么当self.future完成时,将self.run(间接地)添加到IOLoop。
            # 下次事件循环时,就会执行self.run 并返回False
            def inner(f):
                # Break a reference cycle to speed GC.
                f = None # noqa
                self.run()
            self.io_loop.add_future(
                self.future, inner)
            return False
        # 如果self.future完成了,那么直接返回True
        return True

# 需要特别说明的:
# 在协程中,不仅可以yield Future、直接一个yield(相当于yield moment),
# 还可以 res1, res2 = yield [Future1, Future2]、
# eg: res_dic = yield {key1: Future1, key2: Future2, ...}。

    def run(self):
        # 如果Runner正在运行或者已经停止,则返回
        if self.running or self.finished:
            return
        try:
            self.running = True
            while True:
                # 如果future尚未完成,则返回
                future = self.future
                if not future.done():
                    return
                self.future = None
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    exc_info = None
                    # 获取future的结果
                    try:
                        value = future.result()
                    except Exception:
                        self.had_exception = True
                        exc_info = sys.exc_info()
                    future = None

                    # 如果future执行失败,则使用异常信息重启生成器
                    if exc_info is not None:
                        try:
                            yielded = self.gen.throw(*exc_info)
                        finally:
                            # Break up a reference to itself
                            # for faster GC on CPython.
                            exc_info = None
                    # 如果future执行成功,则使用future的结果重启生成器
                    else:
                        yielded = self.gen.send(value)

                    if stack_context._state.contexts is not orig_stack_contexts:
                        self.gen.throw(
                            stack_context.StackContextInconsistentError(
                                'stack_context inconsistency (probably caused '
                                'by yield within a "with StackContext" block)'))
                except (StopIteration, Return) as e:
                    # 如果生成器执行过程中出现了gen.Return或StopIteration异常,则:
                    # 1 设置Runner的停止标记
                    # 2 从异常对象中提取结果,并保存到代表协程执行结果的Future对象
                    self.finished = True
                    self.future = _null_future
                    ...
                    self.result_future.set_result(_value_from_stopiteration(e))
                    self.result_future = None
                    ...
                    return
                except Exception:
                    # 如果生成器执行过程中出现了其他异常,则:
                    # 1 设置Runner的停止标记
                    # 2 将异常信息保存到代表协程执行结果的Future对象
                    self.finished = True
                    self.future = _null_future
                    self.result_future.set_exc_info(sys.exc_info())
                    self.result_future = None
                    self._deactivate_stack_context()
                    return

                # 如果生成器成功yield,那么,
                # 1 将yield的值转换成Future对象,并保存到self.future
                # 2 如果self.future已经完成,那么继续下一次循环
                # 3 如果self.future尚未完成,那么将self.future添加到IOLoop,并退出
                if not self.handle_yield(yielded):
                    return
                yielded = None
        finally:
            self.running = False

def convert_yielded(yielded):
    # Lists and dicts containing YieldPoints were handled earlier.

    # 如果 yielded是None,那么返回moment
    if yielded is None:
        return moment

    # 如果yielded是list类型的,那么要求每个元素都是Future对象;
    # 如果yielded是dict类型的,那么要求每个value都是Future对象;返回一个新的Future对象,当yielded中所有的Future对象,都完成时,
    # 这个Future对象才完成。当yielded中的某些Future对象,出现异常时,
    # 会将第一个异常信息,保存到这个Future对象中,其他的打印出来
    elif isinstance(yielded, (list, dict)):
        return multi(yielded)

    # 如果yielded是Future类型的,那么直接返回
    elif is_future(yielded):
        return yielded

    else:
        # 否则,引发BadYieledError异常
        raise BadYieldError("yielded unknown object %r" % (yielded,))

def multi(children, quiet_exceptions=()):
    return multi_future(children, quiet_exceptions=quiet_exceptions)

def multi_future(children, quiet_exceptions=()):
    if isinstance(children, dict):
        keys = list(children.keys())
        children = children.values()
    else:
        keys = None
    children = list(map(convert_yielded, children))
    assert all(is_future(i) for i in children)
    unfinished_children = set(children)

    future = Future()
    if not children:
        future.set_result({} if keys is not None else [])

    def callback(f):
        unfinished_children.remove(f)
        if not unfinished_children:
            result_list = []
            for f in children:
                try:
                    result_list.append(f.result())
                except Exception as e:
                    if future.done():
                        if not isinstance(e, quiet_exceptions):
                            app_log.error("Multiple exceptions in yield list",
                                          exc_info=True)
                    else:
                        future.set_exc_info(sys.exc_info())
            if not future.done():
                if keys is not None:
                    future.set_result(dict(zip(keys, result_list)))
                else:
                    future.set_result(result_list)

    listening = set()
    for f in children:
        if f not in listening:
            listening.add(f)
            f.add_done_callback(callback)
    return future

class Return(Exception):
    def __init__(self, value=None):
        super(Return, self).__init__()
        self.value = value
        self.args = (value,)

以上就是tornado 中关于协程运行的核心逻辑,我们可以简单的总结一下:

1.gen.coroutine修饰的函数,就是一个“协程”,调用(或者叫启动)一个协程,会返回一个Future对象。

2.在协程中,可以通过抛出StopIteration或gen.Return异常,来终止协程的执行,并返回结果。

3.通常情况下,gen.coroutine修饰的函数都是生成函数,每次调用启动生成器,都应该: yield一个Future对象,当Future对象完成时,gen会使用Future对象的结果,重启生成器

4.抛出StopIteration或gen.Return异常,结束协程,并返回。

好,本期的分享到此结束,接下来我们会继续分享有关Tornado 的相关内容。

0 人点赞