一个协程的生命周期是这样的,
------- ----------- | START |----------------------->| SUSPENDED | ------- ----------- | ^ V | ------------ completion invoked ----------- | RUNNING |------------------->| COMPLETED | ------------ -----------
而协程的重点是可以在挂起和运行两个状态中切换。实现这个能力的关键在于协程实现了continuation接口。
可重入性
之前的分析里说过continuation接口,这篇着重分析它的设计逻辑,
代码语言:javascript复制public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}
resumeWith是协程的重点,每次切出去到suspend状态,再进入running状态都是通过resumeWith接口。我们所写的每一个coroutine,都会continuation接口。
有意思的是,它是什么时候实现的,怎么实现的?
在launch{}的源码里可以看到有个block参数,这个block就是我们所写的协程代码,
代码语言:javascript复制public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit //我们写的代码
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
block具体是个什么东西,得看编译后的字节码。从协程源码里猜测,它肯定是一个实现了continuation的类,因此它才能有可重入性。
把编译后的字节码用jd-gui打开可以看到,我们所写的协程会给编译器插入代码,实现SuspendLambda接类,
在编译后的字节码里见不到resumeWith()函数,说明这个函数必定是在SuspendLambda中有实现。
而SuspendLabmda是在基础包里的,ContinuationImpl.kt文件下,
代码语言:javascript复制internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
constructor(arity: Int) : this(arity, null)
public override fun toString(): String =
if (completion == null)
Reflection.renderLambdaToString(this) // this is lambda
else
super.toString() // this is continuation
}
如果从这里往上追进去,会在 BaseContinuationImpl 类下面发现 resumeWith()接口。
代码语言:javascript复制internal abstract class BaseContinuationImpl(
// This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
// it has a public getter (since even untrusted code is allowed to inspect its call stack).
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param) //调用class里的invokeSuspend
而且最终会调用 invokeSuspend。这个调用链可以看出我们所写的协程的可重入性是怎么实现的了。
从block到coroutine
上面的分析,只展现了一个block目前所具有的特点。虽然它具有了可重入性,但它还没有可被拦截的能力,也就是Intercept。什么时候变成可以拦截呢。在Cancellable.kt里,可以看到,
代码语言:javascript复制internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
createCoroutineUnintercepted创建了一个coroutine。顾名思义,这时候才是一个可以拦截的coroutine。但这个函数又再次是 kotlin 标准库里的。它在 IntrinsicsNative.kt 中,
代码语言:javascript复制@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(receiver, probeCompletion) //调用create函数
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}
现在它会去调用create函数。而create函数又在哪呢?
再一次,kotlin编译器在编译过程帮我们插入了这段代码。从设计上来看,create其实是做了一次封装,把需要的对象通过参数传进去。
现在的block距离真正意义上的coroutine,还差一个可派发性。虽然它已经具有了可重入,可拦截,还差一点。
拦截器-Interceptor
block的可派发性是在 Cancellable.kt 的 intercept() 函数赋予的。上面创建完的block,是一个继承了SuspendLambda的block,而继承树上和intercept()有关的是ContinuationImpl,
代码语言:javascript复制internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
最后这里,是真正给block赋予可派发性的地方。context[ContinuationInterceptor]获取的是最开始给上下文设定的Dispatcher,不管是 DefaultScheduler,还是EventLoop,他们都有个公有的父类CoroutineDispatcher。而interceptContinuation的唯一实现就在这个类里。
代码语言:javascript复制public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
这里可以看到它做了一次代理模式,把Dispatcher封装了进去,作为 DispatchedContinuation的一个成员。
至此一个block就完成了它的整个创建过程。从一个block,到支持可重入,到支持可拦截,最后支持可派发。
所以会看到虽然协程的外部概念很清晰,只是一个 coroutine,但在协程内部,实际上支撑它的还有Continuation,Dispatch,Intercept。还是挺复杂的。