在安卓或者kotlin平台上使用协程是很简单的一件事情。举一个最简单的例子,不依赖安卓平台的协程代码,
代码语言:javascript复制fun main() {
GlobalScope.launch {
delay(1000L) // 非阻塞的等待 1 秒钟(默认时间单位是毫秒)
println("World!") // 在延迟后打印输出
}
delay(100L)
println("Hello,") // 协程已在等待时主线程还在继续
Thread.sleep(2000L) // 阻塞主线程 2 秒钟来保证 JVM 存活
println("out launch done")
}
这里启动了一个常见的coroutine,GlobalScope.launch启动的协程在协程上下文中会派发给底层的线程池去执行。它会经历创建->拦截->暂停->resume->暂停->resume—>完成的生命周期。
代码语言:javascript复制flowchat
st=>start: 创建
p=>operation: 暂停
r=>operation: 继续
e=>end: 结束
con=>condition: 完成?
st->p->r->con
con(yes)->e
con(no)->p
协程的生命周期是在一系列的逻辑中实现的,背后是 Context-Dispatcher-Scheduler 的支持。这些代码没有很深的技术,用的都是常见的软件设计思想,梳理这部分逻辑大概用了两天时间,过程中主要需要保持两条清晰的线索,一个是协程的生命周期,一个是生命周期背后支撑的逻辑概念。
创建协程
launch/async 协程的创建有两个常用接口launch和async,两个接口的内部实现基本一致。以launch来说,它的源码在 Builders.common.kt
代码语言:javascript复制public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,//默认的立即启动方式
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)//创建context
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)//创建立即启动的coroutine
coroutine.start(start, coroutine, block)
return coroutine
}
launch会返回一个Job对象,Job提供了一种类似Future的实现,可以在协程运行完成后返回结果。
返回coroutine之前会调用 coroutine.start()方法,
代码语言:javascript复制coroutine.start(start, coroutine, block)
这行代码会把协程加入到队列中。代码调用的是 AbstractCoroutine.kt的 start方法
代码语言:javascript复制public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
从start(block, receiver, this)开始是派发流程。此时的coroutine已经拥有了协程上下文,和默认的派发器和调度器。
CoroutineStart是一个枚举类。start干了啥?为什么一个枚举类的值可以直接当函数使用?这是因为它使用了kotlin的语言特性--操作符重载,CoroutineStart枚举类的invoke方法被重载了,所以可以直接用 start 去执行代码。操作符重载的代码在 CoroutineStart 中。
代码语言:javascript复制@InternalCoroutinesApi
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>) =
when (this) {
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(completion) //start实际执行的是这行代码
CoroutineStart.ATOMIC -> block.startCoroutine(completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(completion)
CoroutineStart.LAZY -> Unit // will start lazily
}
派发
经过创建的协程就进入了派发流程,Dispatcher会将它依据规则加入到对应队列里。关于Dispatcher等一下会再说是什么时候创建的什么东西,这里先记住有个Dispatcher就行。
代码语言:javascript复制block.startCoroutineCancellable(completion)
从上面这行代码的startCoroutineCancellable跟进去来到 Cancellable.kt,它的代码很简单。
代码语言:javascript复制@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
createCoroutineUnintercepted是一个expect函数,意味着它会有一个actual的实现。但我们在kotlinx的代码中是找不到actual实现的,它的actual实现在Kotlin中,后面我们会分析这块代码。
现在只要记住createCoroutineUnintercepted,最终会调用下面这个create接口就行
上面的代码哪里来的?
我们写的协程代码,会经过kotlinc的编译,而这些代码就是在编译期插入的。
createCoroutineUnintercepted调用了create接口后,会得到一个 Continuation 的实现。在开篇说过,Continuation是一个带resumeWith()的接口,
代码语言:javascript复制public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}
因此这里create之后返回的,实际是个ContinuationImpl实现。代码在 ContinuationImpl.kt中,ContinuationImpl比较特殊,它不在kotlinx项目里,而在kotlin-stdlib标准库。
kotlin的协程架构着实有点蛋疼,这种有些在标准库,有些在kotlinx里的方式让人捉摸不透。
代码语言:javascript复制@SinceKotlin("1.3")
// State machines for named suspend functions extend from this class
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
...
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
上面省略了一些代码。得到了ContinuationImpl实现后会调用它的intercepted()方法。重点是这行代码,
代码语言:javascript复制context[ContinuationInterceptor]
这里会拿到当前上下文Context中的派发器对象,默认的实现是CoroutineDispatcher。这个东西是哪里来的,回到最上面的 launch接口,第一行是 newCoroutineContext,就是从这里来的。
接着在 CoroutineDispatcher 中,会调用 interceptContinuation() 方法返回一个DispatchedContinuation对象。
代码语言:javascript复制public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
顾名思义,DispatchedContinuation代表现在的coroutine,不仅实现了continuation接口,同时还通过代理的方式持有了Dispatcher。
再接着看Cancellable。intercept之后,我们的协程就处于拦截/暂停/挂起状态,在协程里的概念叫suspend。接着执行resumeCancellableWith()。
目前的corutine是 DispatchedContinuation,resumeCancellableWith的实现在它的代码中,
代码语言:javascript复制inline fun resumeCancellableWith(result: Result<T>) {
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)//进入派发流程
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatchedWith(result)
}
}
}
}
里面的dispatcher,是在创建的时候传入的 Dispatcher实现。这里通过代理模式,调用它的派发函数。之后就进入了派发流程。
kotlin协程的常用派发器有两个,EventLoop和DefaultScheduler,关于EventLoop我们后面会讲,它比较特殊,因为它的设计是为了阻塞当前线程,完成一系列coroutine。
DefaultScheduler的实现在 Dispatcher.kt。相关的类还有 Dispatchers.common.kt,Dispatchers.kt。他们之间的关系是 Dispatches.common.kt是公用类,它指导了所有平台的协程需要实现的公共接口。而不同的平台,比如jvm,js,native,他们的具体实现都叫Disaptchers.kt,分别放在不同的包下面。
Dispatches(多了个s)定义了几种派发类型,之前说过,Default,MAIN,Unconfine,IO。我们关注的是Default,其他三个的逻辑可以参考Default的实现。
Dispatcher的创建时机在 newCoroutineContext(),也就是launch的第一行。它的实现在 CoroutineContext.kt里(jvm包下),
代码语言:javascript复制internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool
/**
* Creates context for the new coroutine. It installs [Dispatchers.Default] when no other dispatcher nor
* [ContinuationInterceptor] is specified, and adds optional support for debugging facilities (when turned on).
*
* See [DEBUG_PROPERTY_NAME] for description of debugging facilities on JVM.
*/
@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext { //创建context
val combined = coroutineContext context
val debug = if (DEBUG) combined CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug Dispatchers.Default else debug
}
创建context的时候会用到 Dispatchers.Default,最终它会回去调用上面那句createDefaultDispatcher()。从而拿到 DefaultScheduler 单例。
jvm平台的Dispatcher.Default是这样的
代码语言:javascript复制public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
createDefaultDispatcher()的实现刚刚上面介绍了。
然后进去Dispatcher看,在CoroutineContinuation调用了disaptcher.dispatch(),调用的是哪个函数。
代码语言:javascript复制override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
DefaultExecutor.dispatch(context, block)
}
coroutineScheduler就是下面要说到的调度器了。现在coroutine还处于suspend
状态,接下来就要进入调度逻辑了。
调度
默认的调度实现是 CoroutineScheduler,在CoroutineScheduler.kt下。它的diaptch()函数,
代码语言:javascript复制fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
trackTask() // this is needed for virtual time support
val task = createTask(block, taskContext) //封装任务
// try to submit the task to the local queue and act depending on the result
val currentWorker = currentWorker() //获取当前线程
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch) //加入worker本地执行队列
if (notAdded != null) {
if (!addToGlobalQueue(notAdded)) {//加入全局队列
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
// Checking 'task' instead of 'notAdded' is completely okay
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
signalCpuWork() //执行CPU密集型协程
} else {
// Increment blocking tasks anyway
signalBlockingWork(skipUnpark = skipUnpark) //执行阻塞型协程
}
}
在调度器里面有两个新的概念,Worker和Queue。所谓Worker其实就是Thread,跟java的Thread是同一个东西。Queue是任务队列,它又分两种队列,一个是Worker内部的localQueue,一个是Scheduler里的globalQueue。虽然 globalQueue 又分 blocking 和 cpu,但这里可以简单理解为 globalQueue里面放的是阻塞型IO任务。
回到Worker,它有个内部成员 localQueue,
代码语言:javascript复制internal inner class Worker private constructor() : Thread() {
init {
isDaemon = true
}
// guarded by scheduler lock, index in workers array, 0 when not in array (terminated)
@Volatile // volatile for push/pop operation into parkedWorkersStack
var indexInArray = 0
set(index) {
name = "$schedulerName-worker-${if (index == 0) "TERMINATED" else index.toString()}"
field = index
}
constructor(index: Int) : this() {
indexInArray = index
}
inline val scheduler get() = this@CoroutineScheduler
@JvmField
val localQueue: WorkQueue = WorkQueue() //本地队列
localQueue是存在于每个worker的,也就是说,不管开了多少个线程,每个线程都持有一个属于自己的队列。Worker在创建完毕之后就进入运行状态,直到它的状态被设置为销毁为止。
代码语言:javascript复制private fun createNewWorker(): Int {
synchronized(workers) {
...
val worker = Worker(newIndex)
workers[newIndex] = worker
require(newIndex == incrementCreatedWorkers())
worker.start()
return cpuWorkers 1
}
}
省略了部分代码。在创建完worker之后,对象会加入到一个数组里,这个数组属于调度器。之后就会调用start()方法了。worker会看是否有可以执行的任务,有的话就取出来做,没有的话就进入park状态。park是线程调度里一个不是很常见的概念,这部分可以再仔细研究。
下面是执行部分的逻辑。
执行
在Worker的run()函数会调用runWorker()函数,
代码语言:javascript复制private fun runWorker() {
var rescanned = false
while (!isTerminated && state != WorkerState.TERMINATED) {
val task = findTask(mayHaveLocalTasks)
// Task found. Execute and repeat
if (task != null) {
rescanned = false
minDelayUntilStealableTaskNs = 0L
executeTask(task) //执行
跳到 executeTask(),
代码语言:javascript复制private fun executeTask(task: Task) {
val taskMode = task.mode
idleReset(taskMode)
beforeTask(taskMode)
runSafely(task)
afterTask(taskMode)
}
idleReset,beforeTask和afterTask做的是一些状态设置和回调。主要的执行是 runSafely(),
代码语言:javascript复制fun runSafely(task: Task) {
try {
task.run() //真正的执行
} catch (e: Throwable) {
val thread = Thread.currentThread()
thread.uncaughtExceptionHandler.uncaughtException(thread, e)
} finally {
unTrackTask()
}
}
task是个啥?之前在intercept()返回的DispatchedContinuation,它继承了 DispatchedTask(),这里的task就是它了。在 DispatchedTask.kt里,
代码语言:javascript复制internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
internal abstract val delegate: Continuation<T>
internal abstract fun takeState(): Any?
internal open fun cancelResult(state: Any?, cause: Throwable) {}
@Suppress("UNCHECKED_CAST")
internal open fun <T> getSuccessfulResult(state: Any?): T =
state as T
internal fun getExceptionalResult(state: Any?): Throwable? =
(state as? CompletedExceptionally)?.cause
public final override fun run() {
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
withCoroutineContext(context, delegate.countOrElement) {
val exception = getExceptionalResult(state)
val job = if (resumeMode.isCancellableMode) context[Job] else null
/*
* Check whether continuation was originally resumed with an exception.
* If so, it dominates cancellation, otherwise the original exception
* will be silently lost.
*/
if (exception == null && job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) continuation.resumeWithException(exception)
else continuation.resume(getSuccessfulResult(state)) //调用continuation的resume
}
}
最后一行是调用continuation的地方。这里的continuation又是在最开始创建DispatchedContinuation那里传进来的。它实际是个 BaseContinuationImpl 对象,
代码语言:javascript复制internal abstract class BaseContinuationImpl(
// This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
// it has a public getter (since even untrusted code is allowed to inspect its call stack).
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param) //真正调用我们写的代码的地方
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
上面的 invokeSuspend()才是真正调用我们写的协程的地方。到这里就是真正的执行流程了。
整个流程下来非常绕,有些代码在标准库,而有些又在协程库,山路十八弯。
invokeSuspend()是在编译期插入的,比如下面这段代码
代码语言:javascript复制fun main() {
GlobalScope.launch {
println("Hello!")
delay(100L) // 非阻塞的等待 1 秒钟(默认时间单位是毫秒)
println("World!") // 在延迟后打印输出
}
Thread.sleep(400L) // 阻塞主线程 2 秒钟来保证 JVM 存活
println("out launch done")
}
非常简单,只起了一个协程的情况。在编译后会变成下面这样
它实际是个状态机,每次挂起和resume都会发生状态切换,根据状态执行不同的case。
结束
协程结束的时机是在coroutine返回的不是 COROUTINE_SUSPENDED 的时候。invokeSuspend的case中,遇到挂起函数会返回COROUTINE_SUSPENDED,而在ContinuationImpl中收到它则直接返回。
代码语言:javascript复制 public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return //直接返回
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
所以当最后一个case的时候,返回的是Unit.INSTANCE。此时协程就真正的地执行完毕了。