再谈协程之suspend到底挂起了啥

2021-11-12 14:49:50 浏览数 (1)

点击上方蓝字关注我,知识会给你力量

Kotlin编译器会给每一个suspend函数生成一个状态机来管理协程的执行。

Coroutines简化了Android上的异步操作。正如文档中所解释的,我们可以用它们来管理异步任务,否则可能会阻塞主线程,导致你的应用程序Crash。

Coroutines也有助于用命令式的代码取代基于回调的API。

作为例子,我们先看看这个使用回调的异步代码。

代码语言:javascript复制
// Simplified code that only considers the happy path
fun loginUser(userId: String, password: String, userResult: Callback<User>) {
  // Async callbacks
  userRemoteDataSource.logUserIn { user ->
    // Successful network request
    userLocalDataSource.logUserIn(user) { userDb ->
      // Result saved in DB
      userResult.success(userDb)
    }
  }
}

这些回调可以使用coroutines转换为顺序的函数调用。

代码语言:javascript复制
suspend fun loginUser(userId: String, password: String): User {
  val user = userRemoteDataSource.logUserIn(userId, password)
  val userDb = userLocalDataSource.logUserIn(user)
  return userDb
}

在coroutines代码中,我们给函数添加了suspend修饰符。这将告诉编译器,这个函数需要在一个coroutine内执行。作为一个开发者,你可以把suspend函数看作是一个普通的函数,但它的执行可能被挂起,并在某个时候恢复。

❝简而言之,suspend就是一种编译器生成的回调。 ❞

与回调不同的是,coroutines提供了一种在线程之间切换和处理异常的简单方法。

但是,当我们把函数标记为suspend时,编译器实际上在幕后做了什么?

Suspend到底做了什么

回到loginUser的suspend函数,注意它调用的其他函数也是suspend函数。

代码语言:javascript复制
suspend fun loginUser(userId: String, password: String): User {
  val user = userRemoteDataSource.logUserIn(userId, password)
  val userDb = userLocalDataSource.logUserIn(user)
  return userDb
}
// UserRemoteDataSource.kt
suspend fun logUserIn(userId: String, password: String): User
// UserLocalDataSource.kt
suspend fun logUserIn(userId: String): UserDb

简而言之,Kotlin编译器将使用有限状态机(我们将在后面介绍)把suspend函数转换为优化版本的回调实现。你说对了,编译器会帮你写这些回调,它们的本质,依然是回调!

Continuation的真面目

suspend函数之间的通信方式是使用Continuation对象。一个Continuation只是一个带有一些额外信息的通用回调接口。正如我们稍后将看到的,它将代表一个suspend函数的生成状态机。

让我们看一下它的定义。

代码语言:javascript复制
interface Continuation<in T> {
  public val context: CoroutineContext
  public fun resumeWith(value: Result<T>)
}
  • context是在continuation中使用的CoroutineContext。
  • resumeWith用一个Result来恢复Coroutine的执行,这个Result可以包含一个导致suspend的计算结果的值或者是一个异常。

❝注意:从Kotlin 1.3开始,你还可以使用扩展函数resume(value: T)和resumeWithException(exception: Throwable),它们是resumeWith调用的特殊版本。 ❞

编译器将使用函数签名中的额外参数completion(Continuation类型)替换suspend修饰符,该参数将用于将suspend函数的结果传达给调用它的coroutine。

代码语言:javascript复制
fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
  val user = userRemoteDataSource.logUserIn(userId, password)
  val userDb = userLocalDataSource.logUserIn(user)
  completion.resume(userDb)
}

为了简单起见,我们的例子将返回Unit而不是User。User对象将在添加的Continuation参数中被 "返回"。

suspend函数的字节码实际上返回 Any? 因为它是 (T | COROUTINE_SUSPENDED)的联合类型。这允许函数在可以时同步返回。

❝注意:如果你用suspend修饰符标记一个不调用其他suspend函数的函数,编译器也会添加额外的Continuation参数,但不会对它做任何事情,函数体的字节码看起来就像一个普通函数。 ❞

你也可以在其他地方看到Continuation接口。

  • 当使用suspendCoroutine或suspendCancellableCoroutine将基于回调的API转换为coroutine时(你应该总是倾向于使用这种方法),你直接与Continuation对象交互,以恢复在运行时被suspend的作为参数传递的代码块。
  • 你可以使用suspend函数上的startCoroutine扩展函数来启动一个coroutine。它接收一个Continuation对象作为参数,当新的coroutine完成时,无论是结果还是异常,都会被调用。

切换不同的Dispatchers

你可以在不同的Dispatchers之间进行交换,在不同的线程上执行计算。那么Kotlin如何知道在哪里恢复一个暂停的计算?

Continuation有一个子类型,叫做DispatchedContinuation,它的resume函数可以对CoroutineContext中可用的Dispatcher进行调度调用。除了Dispatchers.Unconfined的isDispatchNeeded函数覆盖(在dispatch之前调用)总是返回false,所有Dispatcher都会调用dispatch。

在协程中,有个不成文的约定,那就是,suspend函数默认是不阻塞线程的,也就是说,suspend函数的调用者,不用为suspend函数运行在哪个线程而担心,suspend函数会自己处理它工作的线程,不大部分时候,都是通过withContext来进行切换的。

生成状态机

❝免责声明:文章其余部分所展示的代码将不完全符合编译器所生成的字节码。它将是足够准确的Kotlin代码,使你能够理解内部真正发生的事情。这种表示法是由Coroutines 1.3.3版本生成的,在该库的未来版本中可能会发生变化。 ❞

Kotlin编译器将识别函数何时可以在内部suspend。每个suspend point都将被表示为有限状态机中的一个状态。这些状态由编译器用标签表示,前面示例中的suspend函数在编译后,会产生类似下面的伪代码。

代码语言:javascript复制
fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
  // Label 0 -> first execution
  val user = userRemoteDataSource.logUserIn(userId, password)
  // Label 1 -> resumes from userRemoteDataSource
  val userDb = userLocalDataSource.logUserIn(user)
  // Label 2 -> resumes from userLocalDataSource
  completion.resume(userDb)
}

为了更好地表示状态机,编译器将使用一个when语句来实现不同的状态。

代码语言:javascript复制
fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
  when(label) {
    0 -> { // Label 0 -> first execution
        userRemoteDataSource.logUserIn(userId, password)
    }
    1 -> { // Label 1 -> resumes from userRemoteDataSource
        userLocalDataSource.logUserIn(user)
    }
    2 -> { // Label 2 -> resumes from userLocalDataSource
        completion.resume(userDb)
    }
    else -> throw IllegalStateException(...)
  }
}

❝编译器将suspend函数编译成带有Continuation参数的方法叫做CPS(Continuation-Passing-Style)变换。 ❞

这段代码是不完整的,因为不同的状态没有办法分享信息。编译器会在函数中使用相同的Continuation对象来做这件事。这就是为什么Continuation的泛型是Any? 而不是原始函数的返回类型(即User)。

此外,编译器将创建一个私有类,1)持有所需的数据,2)递归地调用loginUser函数以恢复执行。你可以看看下面这个生成的类的近似值。

❝免责声明:注释不是由编译器生成的。我添加它们是为了解释它们的作用,并使跟随代码更容易理解。 ❞

代码语言:javascript复制
fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
  class LoginUserStateMachine(
    // completion parameter is the callback to the function 
    // that called loginUser
    completion: Continuation<Any?>
  ): CoroutineImpl(completion) {
    // Local variables of the suspend function
    var user: User? = null
    var userDb: UserDb? = null
    // Common objects for all CoroutineImpls
    var result: Any? = null
    var label: Int = 0
    // this function calls the loginUser again to trigger the
    // state machine (label will be already in the next state) and
    // result will be the result of the previous state's computation
    override fun invokeSuspend(result: Any?) {
      this.result = result
      loginUser(null, null, this)
    }
  }
  ...
}

由于invokeSuspend将仅用Continuation对象的信息来再次调用loginUser,loginUser函数签名中的其余参数都变成了空值。在这一点上,编译器只需要添加如何在状态之间转移的信息。

它需要做的第一件事是知道1)这是函数第一次被调用,或者2)函数已经从之前的状态恢复。它通过检查传入的continuation是否是LoginUserStateMachine类型来实现。

代码语言:javascript复制
fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
  ...
  val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)
  ...
}

如果是第一次,它将创建一个新的LoginUserStateMachine实例,并将收到的完成实例作为一个参数存储起来,这样它就能记住如何恢复调用这个实例的函数。如果不是这样,它将只是继续执行状态机(suspend函数)。

现在,让我们看看编译器为在状态间移动和在状态间共享信息而生成的代码。

代码语言:javascript复制
/* Copyright 2019 Google LLC. 
   SPDX-License-Identifier: Apache-2.0 */
fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
    ...

    val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)

    when(continuation.label) {
        0 -> {
            // Checks for failures
            throwOnFailure(continuation.result)
            // Next time this continuation is called, it should go to state 1
            continuation.label = 1
            // The continuation object is passed to logUserIn to resume 
            // this state machine's execution when it finishes
            userRemoteDataSource.logUserIn(userId!!, password!!, continuation)
        }
        1 -> {
            // Checks for failures
            throwOnFailure(continuation.result)
            // Gets the result of the previous state
            continuation.user = continuation.result as User
            // Next time this continuation is called, it should go to state 2
            continuation.label = 2
            // The continuation object is passed to logUserIn to resume 
            // this state machine's execution when it finishes
            userLocalDataSource.logUserIn(continuation.user, continuation)
        }
        ... // leaving out the last state on purpose
    }
}

花点时间浏览一下上面的代码,看看你是否能发现与前面的代码片断的不同之处。让我们看看编译器生成了什么。

  • when语句的参数是LoginUserStateMachine实例中的Label。
  • 每次处理一个新的状态时,都会有一个检查,以防这个函数suspend时发生异常。
  • 在调用下一个suspend函数(即logUserIn)之前,LoginUserStateMachine实例的Label将被更新为下一个状态。
  • 当在这个状态机内部有一个对另一个suspend函数的调用时,continuation的实例(LoginUserStateMachine类型)被作为一个参数传递。要调用的suspend函数也已经被编译器转化了,它是另一个像这样的状态机,它把一个continuation对象也作为参数!当那个suspend函数的状态机完成后,它将恢复这个状态机的执行。

最后一个状态是不同的,因为它必须恢复调用这个函数的执行,正如你在代码中看到的,它对存储在LoginUserStateMachine中的cont变量(在构造时)调用resume。

代码语言:javascript复制
/* Copyright 2019 Google LLC. 
   SPDX-License-Identifier: Apache-2.0 */
fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
    ...

    val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)

    when(continuation.label) {
        ...
        2 -> {
            // Checks for failures
            throwOnFailure(continuation.result)
            // Gets the result of the previous state
            continuation.userDb = continuation.result as UserDb
            // Resumes the execution of the function that called this one
            continuation.cont.resume(continuation.userDb)
        }
        else -> throw IllegalStateException(...)
    }
}

正如你所看到的,Kotlin编译器为我们做了很多事情!从这个suspend函数功能来举例。

代码语言:javascript复制
suspend fun loginUser(userId: String, password: String): User {
  val user = userRemoteDataSource.logUserIn(userId, password)
  val userDb = userLocalDataSource.logUserIn(user)
  return userDb
}

编译器为我们生成了下面这一切。

代码语言:javascript复制
/* Copyright 2019 Google LLC. 
   SPDX-License-Identifier: Apache-2.0 */
fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {

    class LoginUserStateMachine(
        // completion parameter is the callback to the function that called loginUser
        completion: Continuation<Any?>
    ): CoroutineImpl(completion) {
        // objects to store across the suspend function
        var user: User? = null
        var userDb: UserDb? = null

        // Common objects for all CoroutineImpl
        var result: Any? = null
        var label: Int = 0

        // this function calls the loginUser again to trigger the 
        // state machine (label will be already in the next state) and 
        // result will be the result of the previous state's computation
        override fun invokeSuspend(result: Any?) {
            this.result = result
            loginUser(null, null, this)
        }
    }

    val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)

    when(continuation.label) {
        0 -> {
            // Checks for failures
            throwOnFailure(continuation.result)
            // Next time this continuation is called, it should go to state 1
            continuation.label = 1
            // The continuation object is passed to logUserIn to resume 
            // this state machine's execution when it finishes
            userRemoteDataSource.logUserIn(userId!!, password!!, continuation)
        }
        1 -> {
            // Checks for failures
            throwOnFailure(continuation.result)
            // Gets the result of the previous state
            continuation.user = continuation.result as User
            // Next time this continuation is called, it should go to state 2
            continuation.label = 2
            // The continuation object is passed to logUserIn to resume 
            // this state machine's execution when it finishes
            userLocalDataSource.logUserIn(continuation.user, continuation)
        }
        2 -> {
            // Checks for failures
            throwOnFailure(continuation.result)
            // Gets the result of the previous state
            continuation.userDb = continuation.result as UserDb
            // Resumes the execution of the function that called this one
            continuation.cont.resume(continuation.userDb)
        }
        else -> throw IllegalStateException(...)
    }
}

Kotlin编译器将每个suspend函数转化为一个状态机,在每次函数需要suspend时使用回调进行优化。

现在你知道了编译器在编译时到底做了什么,你就可以更好地理解为什么一个suspend函数在它执行完所有工作之前不会返回。另外,你也会知道,代码是如何在不阻塞线程的情况下进行suspend的——这是因为,当函数恢复时需要执行的信息被存储在Continuation对象中!

参考资料:https://medium.com/androiddevelopers/the-suspend-modifier-under-the-hood-b7ce46af624f

向大家推荐下我的网站 https://xuyisheng.top/ 点击原文一键直达

专注 Android-Kotlin-Flutter 欢迎大家访问

往期推荐

  • ‍ 再谈协程之CoroutineContext我能玩一年
  • 再谈协程之Callback写出协程范儿
  • 闲言碎语——第四期
  • 再谈协程之Lifecycle潜行者

本文原创公众号:群英传,授权转载请联系微信(Tomcat_xu),授权后,请在原创发表24小时后转载。

< END >

作者:徐宜生

0 人点赞