Kotlin协程-协程的内部概念Continuation

2021-05-17 12:22:53 浏览数 (1)

一个协程的生命周期是这样的,

-------             ----------- | START |----------------------->| SUSPENDED | -------             -----------                  |  ^                  V  |                 ------------ completion invoked -----------                | RUNNING |------------------->| COMPLETED |                 ------------            -----------

而协程的重点是可以在挂起和运行两个状态中切换。实现这个能力的关键在于协程实现了continuation接口。

可重入性

之前的分析里说过continuation接口,这篇着重分析它的设计逻辑,

代码语言:javascript复制
public interface Continuation<in T> {
    public val context: CoroutineContext
    public fun resumeWith(result: Result<T>)
}

resumeWith是协程的重点,每次切出去到suspend状态,再进入running状态都是通过resumeWith接口。我们所写的每一个coroutine,都会continuation接口。

有意思的是,它是什么时候实现的,怎么实现的?

在launch{}的源码里可以看到有个block参数,这个block就是我们所写的协程代码,

代码语言:javascript复制
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit //我们写的代码
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

block具体是个什么东西,得看编译后的字节码。从协程源码里猜测,它肯定是一个实现了continuation的类,因此它才能有可重入性。

把编译后的字节码用jd-gui打开可以看到,我们所写的协程会给编译器插入代码,实现SuspendLambda接类,

在编译后的字节码里见不到resumeWith()函数,说明这个函数必定是在SuspendLambda中有实现。

而SuspendLabmda是在基础包里的,ContinuationImpl.kt文件下,

代码语言:javascript复制
internal abstract class SuspendLambda(
    public override val arity: Int,
    completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
    constructor(arity: Int) : this(arity, null)

    public override fun toString(): String =
        if (completion == null)
            Reflection.renderLambdaToString(this) // this is lambda
        else
            super.toString() // this is continuation
}

如果从这里往上追进去,会在 BaseContinuationImpl 类下面发现 resumeWith()接口。

代码语言:javascript复制
internal abstract class BaseContinuationImpl(
    // This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
    // it has a public getter (since even untrusted code is allowed to inspect its call stack).
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    // This implementation is final. This fact is used to unroll resumeWith recursion.
    public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param) //调用class里的invokeSuspend

而且最终会调用 invokeSuspend。这个调用链可以看出我们所写的协程的可重入性是怎么实现的了。

从block到coroutine

上面的分析,只展现了一个block目前所具有的特点。虽然它具有了可重入性,但它还没有可被拦截的能力,也就是Intercept。什么时候变成可以拦截呢。在Cancellable.kt里,可以看到,

代码语言:javascript复制
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
    }

createCoroutineUnintercepted创建了一个coroutine。顾名思义,这时候才是一个可以拦截的coroutine。但这个函数又再次是 kotlin 标准库里的。它在 IntrinsicsNative.kt 中,

代码语言:javascript复制
@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
        receiver: R,
        completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(receiver, probeCompletion) //调用create函数
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}

现在它会去调用create函数。而create函数又在哪呢?

再一次,kotlin编译器在编译过程帮我们插入了这段代码。从设计上来看,create其实是做了一次封装,把需要的对象通过参数传进去。

现在的block距离真正意义上的coroutine,还差一个可派发性。虽然它已经具有了可重入,可拦截,还差一点。

拦截器-Interceptor

block的可派发性是在 Cancellable.kt 的 intercept() 函数赋予的。上面创建完的block,是一个继承了SuspendLambda的block,而继承树上和intercept()有关的是ContinuationImpl,

代码语言:javascript复制
internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)

    public override val context: CoroutineContext
        get() = _context!!

    @Transient
    private var intercepted: Continuation<Any?>? = null

    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }

最后这里,是真正给block赋予可派发性的地方。context[ContinuationInterceptor]获取的是最开始给上下文设定的Dispatcher,不管是 DefaultScheduler,还是EventLoop,他们都有个公有的父类CoroutineDispatcher。而interceptContinuation的唯一实现就在这个类里。

代码语言:javascript复制
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
    DispatchedContinuation(this, continuation)

这里可以看到它做了一次代理模式,把Dispatcher封装了进去,作为 DispatchedContinuation的一个成员。

至此一个block就完成了它的整个创建过程。从一个block,到支持可重入,到支持可拦截,最后支持可派发。

所以会看到虽然协程的外部概念很清晰,只是一个 coroutine,但在协程内部,实际上支撑它的还有Continuation,Dispatch,Intercept。还是挺复杂的。

0 人点赞