Kotlin Primer·第七章·协程

2018-07-20 10:40:25 浏览数 (1)

7 协程

协程,协作代码段。关于定义这里不多讲了,协程提供了一种可以避免线程阻塞的能力,这是他的核心功能。在 kotlin 中使用协程,需要在gradle中引入协程库:

代码语言:javascript复制
//Android 工程使用
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:x.x.x"  
//Java 工程使用
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:x.x.x'

7.1 kotlin 协程使用

首先,你需要明白一点,协程是通过编码实现的一个任务。它和操作系统或者 JVM 没有任何关系,它的存在更类似于虚拟的线程。 以我在 qcon 上分享的一个例子来做介绍:

kotlin的语法会让很多人觉得launch()async()是两个协程方法。其实不然,真正的协程是launch()传入的闭包参数。当launch()调用的时候,会启动一个协程(本质上并不一定是立即启动,下一篇文章解释)。 sync()方法调用的时候又启动了一个协程,此刻外部协程的状态(包括CPU、方法调用、变量信息)会被暂存,进而切换到async()启动的协程执行。 在本例子中,launch()async()这两个方法都显式传入了两个参数:

1、第一个参数是一个协程的上下文,类型是CoroutineContextCoroutineContext不仅可以用于在协程跳转的时刻传递数据,同时最主要的功能,也是在本例中的作用是用于表明协程运行与恢复时的上下文环境。 例如launch()方法中的UI参数,他实际上是一个封装了HandleCoroutineContext对象。

代码语言:javascript复制
val UI = HandlerContext(Handler(Looper.getMainLooper()), "UI")

对应的还有Swing,当然在 Android 中是没有这个对象的,但在 Java 工程中是有的:

代码语言:javascript复制
object Swing : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        SwingContinuation(continuation)
}

2、第二个参数是一个lambda表达式,也就是协程体。kotlin 语法,当一个 lambda是函数的最后一个参数的时候,lambda可以写在圆括号外面。

7.2 suspend

一个协程方法(或闭包)必须被 suspend 修饰,同时 suspend 修饰的方法(或闭包)只能调用被suspend修饰过的方法(或闭包)。 suspend修饰后代码发生了怎样的变化?

我们知道,kotlin的闭包(lambda)在被编译后是转换成了内部类的对象,而一个被suspend修饰的闭包,就是一个特殊的内部类。例如这段例子:

代码语言:javascript复制
fun test(){
    launch {
        val job = async {
            "string"
        }
        println("========${job.await()}")
    }
}

当他被编译以后,launch()传入的闭包会被编译成下面的样子:

代码语言:javascript复制
final class Main$test$1 extends CoroutineImpl implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {

    public final Continuation<Unit> create(@NotNull CoroutineScope $receiver, @NotNull Continuation<? super Unit> continuation) {
    }
    
    public final Object invoke(@NotNull CoroutineScope $receiver, @NotNull Continuation<? super Unit> continuation) {
    }
    
    public final Object doResume(@Nullable Object obj, @Nullable Throwable th) {
    }
}

而如果是一个普通方法被suspend修饰了以后,则只是会多出一个参数,例如一个普通的test()无参无内容方法用suspend修饰了以后会被编译成这样:

代码语言:javascript复制
public final Object test(Continuation<? super Unit> continuation) {
    return Unit.INSTANCE;
}

可以看到不论怎样,都会具备一个Continuation的对象。而这个Continuation就是真正的kotlin的协程。

7.3 协程挂起与恢复

理解了 suspend 做的事情以后,我们再来看 kotlin 的协程。 上面的代码中涉及到一个协程切换的情况。就是在launch()调用的时候,启动了一个协程就是suspend修饰的闭包参数。在launch()启动的协程内,async()又启动了一个协程。实际上协程的切换,就是一个挂起当前协程,启动新协程的过程。

7.3.1 协程的启动流程

挂起是指什么意思?首先要知道协程的启动流程。 launch()的源码是这样的:

代码语言:javascript复制
public fun launch(
    context: CoroutineContext = DefaultDispatcher,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    //省略一些列的初始化后
    start(block, coroutine, coroutine)
    return coroutine
}

我们看到声明,start 是一个枚举对象,默认值是DEFAULT,这里实际上是调用了枚举的invoke()方法。 最终会调到createCoroutineUnchecked,这是一个扩展方法,他的声明如下:

代码语言:javascript复制
public fun <R, T> (suspend R.() -> T).createCoroutineUnchecked(
        receiver: R, completion: Continuation<T>
): Continuation<Unit> =
    if (this !is CoroutineImpl)
        buildContinuationByInvokeCall(completion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, completion)
        }
    else
        (this.create(receiver, completion) as CoroutineImpl).facade

看到这段代码中,通过判断this是不是CoroutineImpl来做不同的操作。而this是什么,是一个有suspend修饰的闭包R.() -> T,也就是上面launch()的参数传入的闭包。

还记得suspend修饰的闭包在编译后会变成什么吗?刚好是一个CoroutineImpl类的对象。 因此这里是调用了闭包的create()方法,最终将闭包创建成了Continuation对象并返回。 这也就验证了前面我讲的:Continuation就是真正的kotlin的协程 最后在创建好协程对象后,又会调用协程Continuationresume()方法(代码在上面提到的扩展方法里,就不列出了)而协程的resume()方法又会调用回编译后suspend闭包转换成的那个类里面的doResume方法(后面讲协程恢复的时候还会讲这里)。 所以绕了一圈又回来了。

7.3.2 协程的挂起

明白了启动流程以后,再来看挂起就清晰多了。我们看到代码(删去了一些判空和初始化):

代码语言:javascript复制
    public final Object doResume(Object obj, Throwable th) {
        StringBuilder append;
        Object await;
        Deferred job;
        switch (this.label) {
            case 0:
                job = DeferredKt.async$default(null, null, (Function2) new 1(null), 3, null);
                append = new StringBuilder().append("========");
                this.L$0 = job;
                this.L$1 = append;
                this.label = 1;
                await = job.await(this);
                 if (await == coroutine_suspended) {
                    return coroutine_suspended;
                }
                break;
            case 1:
                StringBuilder stringBuilder = (StringBuilder) this.L$1;
                job = (Deferred) this.L$0;
                if (th == null) {
                    append = stringBuilder;
                    await = obj;
                    break;
                }
                throw th;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
        System.out.println(append.append((String) await).toString());
        return Unit.INSTANCE;
    }

switch()内的label变量,他就是标识符,典型的状态机设计。当前在执行的协程有多少个可能的状态,就会有多少位。 当前例子中只有两种状态:要么初始状态,要么因为async()的调用被挂起。

首先看到label为0时的代码

代码语言:javascript复制
job = DeferredKt.async$default(null, null, (Function2) new 1(null), 3, null);
append = new StringBuilder().append("========");
this.L$0 = job;
this.L$1 = append;
this.label = 1;
await = job.await(this);
 if (await == coroutine_suspended) {
    return coroutine_suspended;
}

首先是job的声明并返回job对象,它对应的kotlin代码是上面的

代码语言:javascript复制
val job = async {
    "string"
}

接着是StringBuilderappend(),这个应该不用说了。 之后我们看到,有两个临时的全局变量L$0L$1,他们是用来存储当前协程的临时变量所生成的对象,由语法分析后判断当前协程只需要两个临时变量就能保存所有变量的信息了,所以就只生成了两个。 再之后label状态被设置为1,表示即将进入下一个协程了。 job.await()启动了一个协程,这个方法返回了一个Objectcoroutine_suspended表示协程还在执行还没执行完。 因此,这里的逻辑就是启动一个协程,如果这个协程是可以立刻执行完的,那就返回结果;否则直接return结束当前方法,等待下一次状态改变被触发,而这个结束当前方法,处于等待的时刻,就是被挂起的时候。

7.3.3 内部协程的切换

在协程方法async()返回的是Deferred接口类型的对象,这个接口也继承了Job接口,是它的子类。 在本例中,async()返回的实际对象是DeferredCoroutine这个类的对象,它实现了Deferred接口,更重要的是,它实现了await()接口方法。 还是看代码:

代码语言:javascript复制
private open class DeferredCoroutine<T>(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<T>(parentContext, active), Deferred<T> {
    suspend override fun await(): T = awaitInternal() 
}

await()其实是awaitInternal()的代理,它通过一个lock-free循环,保证一定等到异常或者一个叫startInternal()的方法执行完成才会返回。startInternal()方法的作用是在启动类型start=LAZY时,保证协程初始化完成,所以在本例中是没有意义的。在本例中有意义的是紧跟着这个方法后面调用的awaitSuspend()

代码语言:javascript复制
private suspend fun awaitSuspend(): Any? = suspendCancellableCoroutine { cont ->
    cont.disposeOnCompletion(invokeOnCompletion {
        val state = this.state
        check(state !is Incomplete)
        if (state is CompletedExceptionally)
            cont.resumeWithException(state.exception)
        else
            cont.resume(state)
    })
}

这个方法中cont就是调用await()时传入的外部协程的对象. disposeOnCompletion()方法会调用invokeOnCompletion()方法返回的DisposableHandle对象的dispose()方法,去等待job中的内容执行完成。但如果job中的代码在invokeOnCompletion()方法返回之前就已经执行完,就会返回一个NonDisposableHandle对象表示不需要再等待了。 然后执行闭包中的代码,去根据job内的代码是否发生了异常去返回对应的结果,这个结果就是state。 最终,又由外部协程cont调用了父类的resume()方法或者resumeWithException()方法(出异常时)。

7.3.4 协程的恢复

最终,与协程的启动流程中提及的一样,Continuation中的resume()方法会调用到suspend闭包转换成的类的doResume()方法。

代码语言:javascript复制
override fun resume(value: Any?) {
    processBareContinuationResume(completion!!) {
        doResume(value, null)
    }
}

而这里的参数value,就是协程在恢复时候传入的,内部协程执行后的结果。 这时,我们再看到状态机中的label 1的代码:

代码语言:javascript复制
StringBuilder stringBuilder = (StringBuilder) this.L$1;
job = (Deferred) this.L$0;
if (th == null) {
    append = stringBuilder;
    await = obj;
    break;
}
throw th;

很清晰了,就是恢复之前挂起时保存起来的一系列变量的值,最后的if语句中的obj,就是前面子协程运行后的结果传递到resume的参数中的value

0 人点赞