干货 | Kotlin/Native 异步并发模型初探

2020-03-11 13:39:23 浏览数 (1)

作者简介

禹昂,携程移动端高级工程师。Kotlin 中文社区核心成员,官方文档译者。

一、前言

作为 Kotlin Multiplatform 体系重要组成部分之一的 Kotlin/Native ,目前还是一项处于 beta 阶段的技术。而 Kotlin/Native与 Kotlin/JVM 的异步并发模型也有着极大的不同,因此如果要实践 Kotlin Multiplatform,则事先对 Kotlin/Native的异步并发模型进行探究就显得很有必要。

相较于 Kotlin/Native,Kotlin/JVM 也许为更多的人所熟知。基于 JVM 的异步并发机制,Kotlin/JVM 提供了通过编译器与线程池实现的协程来完成异步并发任务。Kotlin/JVM 的协程既能完成异步请求,也能完成并行计算,并且由于协程中拥有挂起(suspend),Kotlin/JVM 就可以在协程而非线程的层面上来解决并发竞争的问题。

即当并发竞争出现的时候,这套机制只需将协程挂起而无需阻塞线程,而对于是否发生竞争的判断可以转移到原子操作上。这样的机制避免了 JVM重量级锁的出现,个人认为这确实是 Kotlin/JVM 的协程相对于传统 JDK 中异步并发 API 的一个优势(详见文末参考文档链接 1、2)。

但 Kotlin/Native 程序作为一种原生二进制程序,相当于是重新开发的一门语言,由于没有现成的类似于 JVM 提供的异步并发机制作为依赖,所以它必须实现一套自己的异步并发模型。由于 Kotlin 在编程范式上吸收了部分函数式编程的特性,因此 Kotlin/Native 的同步方案从设计思想上向函数式编程靠拢,即对象不变性,其宗旨就是如果对象本身不可变,那就不存在线程安全的问题。

Kotlin/Native 用于实现异步和并发的方案主要有三种。

1)基于宿主环境(操作系统)实现。例如与使用 POSIX C 编写原生程序一样。直接使用相关操作系统平台提供的 API 来自己开启线程,在 POSIX 标准的系统上,手动调用 pthread_create函数来创建线程。但是这样的代码实现违反了平台通用性的原则,例如,如果你要将你的程序移植到非 POSIX 标准的系统上,那异步并发方式就得全部改用相关平台的机制,可移植性太差,在编写多平台程序的时候这种方式基本上是行不通的。

2)Kotlin/Native 自身提供给了我们两套异步并发的 API,首先是协程,但 Kotlin/Native 的协程与 Kotlin/JVM的协程区别很大,Kotlin/Native 的协程是单线程的,也就是说它只能用来执行一些不占用 CPU 资源的异步并发任务,例如网络请求。但如果要利用CPU 多核的能力来进行并行计算,Native 版的协程就失去了作用,当然,官方说了要尽快解决这个问题,并且于 2019 年 12月中已经发布了 Native 多线程版协程的预览版本,这个会在后文详细讨论。

3)除了协程之外,官方在 Kotlin/Native 诞生之初就已经提供了另一套专门做并行任务的工具,即 Worker 。Worker 与 Kotlin/Native 的异步并发模型紧密相连,做到了既能利用 CPU 多核能力,又能保障线程安全(虽然做法略微粗暴)。这篇文章我们会先介绍基于 Worker 与对象子图的现有异步并发模型,最后再讨论当前预览版本的多线程协程。

注意,本文基于 Kotlin 1.3.61,Kotlin/Native 作为一个实验性项目,任何的版本变动都有可能造成 API 的破坏性变更。

二、原生并发模型:Worker 与对象子图(Subgraph)

这部分内容,官方文档较少,目前仅有一篇(见参考链接 3),而且其内容有一定滞后性,所以本文中的部分结论可能会与该文档不符,期待后续官方更新。

Worker 与线程类似,通过打印线程 id 进行验证发现,一个 Worker 基本对应一个线程。在编写程序时,如果需要开启线程,就应该创建一个 Worker 。Kotlin/Native 对跨线程/Worker 访问对象拥有严格的限制,因此对象在一定维度上又分为两种状态,即 Freeze(冻结)与 Unfreeze(非冻结)。

冻结的对象是编译期即可证明为不可变的对象,或者是手动显式添加 @SharedImmutable 注解的对象,系统默认这类对象不可变,可以在任意的线程/Worker 中访问,而非冻结对象通常不可在创建它之外的线程/Worker 中访问。Kotlin/Native通过给对象生成对象子图(subgraph)的方式,然后在运行时遍历对象子图来检测是否发生了跨线程/Worker 访问。

2.1 对象冻结

首先创建一个基本的 Kotlin/Native 工程,本文基于 macOS 10.15.1。

对象冻结,即一个对象被创建之后即与当前线程/Worker 绑定,在不加特殊标记的情况下,在其他线程/Worker 访问该对象(无论是读还是写)就会抛出异常。但是存在另外一种对象,它们在编译期即可被证明是不可变的,这类对象就被称为冻结的对象。因此冻结对象可以在任意线程内访问,目前冻结对象有:

  • 枚举类型
  • 不加特殊修饰的单例对象(即使用 object 关键字声明的)
  • 所有使用 val 修饰的原生类型变量与 String(这种情况也就包含了 const 修饰的常量)

如果我们要将其他类型的全局变量/成员变量声明为冻结的,可以使用注解 @SharedImmutable,它可以让变量的多线程访问通过编译,但如果运行时发生了对该变量的修改,程序就会抛出 IncorrectDereferenceException 异常。除此之外,官方还表示之后可能会增加对象动态冻结的情况,也就是说一个对象一开始不是冻结的,但在运行时从某一刻开始,就变为一个冻结对象,但是无论如何,一个已被冻结的对象都是不能被解除冻结的。

2.2 Worker 的基本用法

下面我们来看看如何在 Kotlin/Native 中开启子线程进行异步计算。

在 Kotlin/Native 中我们使用 Worker 来做这件事,一个 Worker 即代表一个线程(类 Unix 系统),但在用法上却接近 Java的 Future/Promise 或 Kotlin 协程中的 async/await。与传统的 Java 中使用 Thread 的多线程编程方式相比,Worker对参数的传入以及对执行结果的获取更为严格,下面看一个例子:

代码语言:javascript复制
fun main() {
    val worker = Worker.start(true, "worker1")
    println("Position 1, thread id: ${pthread_self()!!.rawValue.toLong()}")
    val future = worker.execute(TransferMode.SAFE, {
        println("Position 2, thread id: ${pthread_self()!!.rawValue.toLong()}")
        1   2
    }) {
        println("Position 3, thread id: ${pthread_self()!!.rawValue.toLong()}")
        (it   100).toString()
    }
    future.consume {
        println("Position 4, thread id: :${pthread_self()!!.rawValue.toLong()}")
        println("Result: $it")
    }
}

使用 Worker.start 函数我们就可以创建一个新的 Worker,然后调用它的 execute函数就可以在别的线程执行任务了。这个函数接收三个参数,第一个是对象转移模式(后面会讨论),第二个参数将扮演一个生产者的角色(为了简便,后文我们使用源码中的命名 producer 来称呼它),它会在外面的线程执行,producer的返回值将在 execute 的第三个参数(也是个 lambda 表达式,同样,后文我们用源码中的命名 job 来称呼它)中作为参数来提供。

而 job 中的代码会在别的线程中执行。最后 execute 函数的返回结果是一个 Future<T> 类型的对象,调用它的成员函数 consume即可在外部线程获得 job 执行的结果。

为了验证代码中的几个关键位置到底是在哪个线程中执行的,我们使用 posix 标准中的 pthread_self()函数打印线程 id,这段代码执行后的输出如下:

代码语言:javascript复制
Position 1, thread id: 4524555712
Position 2, thread id: 4524555712
Position 3, thread id: 123145337905152
Position 4, thread id: 4524555712
Result: 103

我们可以看到,位置 1、2、4 三处的线程 id 打印结果相同,即 producer、以及取得计算线程执行结果的consume 函数都在外部线程执行,而位置 3 打印的线程 id 与其他三处都不同,也就是说 job 是在后台线程中执行。

以上就是 Worker 的基本用法,但这其中有几个点需要注意,job 作为一个 lambda 表达式,不能随意捕捉上下文中的变量,进入 job 的参数必须从 producer 传入(producer 的返回值即为 job 的参数)。考虑一种情况,如果我们在主线程中得到了一个结果,然后想将它传递给 Worker,很自然的我们可能会写出如下代码:

代码语言:javascript复制
fun main() {
    val worker = Worker.start(true, "worker1")
    val testData = TestData()
    val future = worker.execute(TransferMode.SAFE, { testData }) {
        it
    }
    future.consume { println(it.index) }
}


data class TestData(var index: Int = 0)

但这段代码会在运行时抛出 IncorrectDereferenceException 异常,因为 testData 虽然是用 val修饰的,但它不是 String 或原生类型,因此它不是一个被冻结的对象。仔细分析一下这段代码,在主线程中 testData对象初始化之后,紧接着会执行 producer 内的代码,当 producer 执行完毕后,异步的 job内的代码就会开始执行,但是主线程依然可以引用到 testData,这时就会发生并发访问的问题。那么如何避免这个问题?修改代码:

代码语言:javascript复制
fun main() {
    val worker = Worker.start(true, "worker1")
    var testData: TestData? = TestData()
    val future = worker.execute(TransferMode.SAFE, {
        val result = testData!!
        testData = null
        result
    }) {
        it
    }
    future.consume { println(it.index) }
}


data class TestData(var index: Int = 0)

我们只需在 producer 返回前解除对需要传递的对象的引用,代码就可以正常运行,但上面这段代码只是一个为了便于理解的例子,在真正的软件开发当中,我们只需要将需要传递的值不向 producer 作用域之外暴露即可。

现在我们回过头来看看 execute 的第一个参数,它代表对象转移校验模式,是一个枚举类型,共有 SAFE 与 UNSAFE两个值可选,在上面的示例中,我们都使用的是 SAFE 模式,现在我们把它更换为 UNSAFE 模式并编写一个典型的并发写程序:

代码语言:javascript复制
fun main() {
    val worker = Worker.start(true, "worker1")
    val testData = TestData()
    val future = worker.execute(TransferMode.UNSAFE, { testData }) { data ->
        repeat(20000) { data.index   }
        data
    }
    repeat(20000) { testData.index   }
    future.consume { println(it.index) }
}


data class TestData(var index: Int = 0)

在 UNSAFE 模式下,testData 作为一个非冻结的对象也能任意传递到子线程中,如果这段代码中的线程调用是安全的,那么最终打印输出的结果应该是 40000,但很可惜,如果多次运行这段代码,每次它的打印输出结果都会不同,且小于

40000。也就是说 UNSAFE 模式下,Worker 不做任何线程安全的校验(无论是编译期还是运行时)。

这个结论与我预先猜测的不同,在源代码的注释中,对于 UNSAFE 是这样描述的:"Skip reachibility check, can lead to mysterious crashes in an application."。所以我预先猜测的是,如果没有发生事实上的多线程竞争,程序会正常运行,但是一旦发生多线程竞争,程序会抛出异常并崩溃。

但测试结果却不是这样,一旦使用 UNSAFE 模式,代码就变得和在 Java 中编写不加任何同步机制的并发访问代码一样不安全,任何的潜在风险都不会被显式的表现出来,因此 UNSAFE 模式的注释中,官方也写了下面这句话:"USE UNSAFE MODE ONLY IF ABSOLUTELY SURE WHAT YOU'RE DOING!!!"。

在这里我给出的建议是,如果能用语言机制规避的风险,就不要交给"人",因此,在 99.99% 的情况下,都应该尽量使用 SAFE模式,虽然 SAFE 模式对于对象的传递在语法上有更严格的限制,但是如果为了图方便使用 UNSAFE,在代码发生修改之后的潜在风险非常之大。

2.3 对象子图

这一小节主要讨论一个概念,即我们该怎样理解 Kotlin/Native 是如何检测一个对象是否在多个线程/Worker 中是可访问的?

在官方文档中提到了对象子图(subgraph)的概念,详见参考链接 3。但是由于其资料较少,以下是我的个人理解:

"在我们使用 Worker 的时候, Worker 会将 producer 返回的对象进行包装,生成一个对象子图(subgraph),我们可以将对象子图理解为一个对象,或是将它理解为一个对象头(因为这看起来有点类似在 TCP/IP报文头上添加 HTTP 报文头的感觉),它与原对象相互引用。每次在线程中访问对象的时候,都会通过 O(N) 复杂度的算法(官方未说明具体算法)来检测该对象是否在多个线程内可见。上面讨论的对象冻结,也是通过对象子图来实现的。"

对象子图在某些特殊的情况下可以与对象分离,从而让我们可以自由的让对象在多个线程间访问,这虽然不安全,但也是如果我们要使用其它同步机制(例如一些平台相关的同步机制或协程的 Mutex)必须要进行的步骤,有关对象子图分离的内容将在3.3 小节与协程的 Mutex 一起详细介绍。

2.4 单例与全局变量

对于单例与全局变量来说(成员变量也类似),在 Worker 中对其进行直接的访问是无法避免的,我们不能每次都通过 producer将单例或全局变量传递给 Worker 之后就将其置空,因此在 Kotlin/Native 中,单例与全局变量有着特别的规则。

先来介绍一下 @ThreadLocal 注解,编写一个示例:

代码语言:javascript复制
@ThreadLocal
val testData = TestData()


fun main() {
    val worker = Worker.start(true, "worker1")
    val future = worker.execute(TransferMode.UNSAFE, {}) {
        println(  testData.index)
    }
    future.consume { println(testData.index) }
}


data class TestData(var index: Int = 0)

运行这段代码的输出如下:

代码语言:javascript复制
1
0

被添加了 @ThreadLocal 注解的全局变量会在每个线程中维护一个单独的副本,即在线程中对其进行修改对于其他线程是不可见的。在上面这个例子中,我们在 Worker 内对 testData.index 进行了自增操作,然而在主线程中则感知不到它的变化。

我们在讨论对象冻结的时候提到过 @SharedImmutable 注解,现在我们使用 @SharedImmutable 替换 @ThreadLocal然后运行程序,程序崩溃并抛出 InvalidMutabilityException 异常,如果我们再将 testData.index这一行中的 去掉,程序正常运行,这说明,对于开发者"手动"冻结的对象,并发的读取不会有问题,但是一旦其中一个线程/Worker要对变量进行修改,就会抛出 InvalidMutabilityException 异常。

对于单例(使用 object 关键字声明的),在不加任何特别注解的情况下,它都是冻结的,你可以认为它是一个默认添加了 @SharedImmutable注解的全局变量,但如果有特别的需要,也可以给单例添加 @ThreadLocal 注解,让它变成一个线程局部的可变变量,关于单例的代码示例不再给出。

三、预览版的多线程协程

在上面的章节中,我们介绍的 Worker 与对象子图是在 Kotlin/Native 在诞生之初就已经定型的异步并发模型,而 Kotlin/Native上的协程长久以来都只支持单线程,这就使得 Native 版的协程相对于 JVM 版功能大打折扣,但好消息是,近期在协程的官方 Github仓库(kotlinx.coroutines)的 issue#462(参考链接 5)中,Kotlin 官方团队的 Roman Elizarov 提到了已经发布了第一个多线程协程的预览版本,这也让 Kotlin/Native的开发者们看到了官方支持多线程协程的决心。

但需要说明的是,当前多线程版本的协程仅仅是一个早期预览版,从目前的体验情况来看,后续的改动一定会不小,因此本文仅仅是做一个尝试,Native 上的多线程协程的最终形态还要等正式版推出之后才能确定。

若要导入当前主分支版本的协程,可以添加如下依赖:

代码语言:javascript复制
dependencies {
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core-native:1.3.3"
}

如果您想尝鲜预览版的多线程协程,则可以添加如下依赖:

代码语言:javascript复制
dependencies {
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core-native:1.3.3-native-mt"
}

3.1 Default 与 Main 调度器的指向发生破坏性变更

在主分支的协程中,Dispatchers 下的两个调度器 Dispatchers.Main 与 Dispatchers.Default指向同一个线程,即主线程(程序最初初始化的线程)。而在多线程版的协程中 Dispatchers.Default变更为指向一个后台单线程,我们通过如下代码示例即可验证:

代码语言:javascript复制
fun main() {
    println("Position 1, thread id: ${pthread_self()!!.rawValue.toLong()}")
    GlobalScope.launch(Dispatchers.Default) {
        println("Position 2, thread id: ${pthread_self()!!.rawValue.toLong()}")
    }
    GlobalScope.launch(Dispatchers.Main) {
        println("Position 3, thread id: ${pthread_self()!!.rawValue.toLong()}")
    }
    CFRunLoopRun() // Create Darwin main thread loop
}

注意,Dispatchers.Default 是单线程而不是多线程组成的线程池的说法详见参考链接 4,可自行验证。

输出打印如下:

代码语言:javascript复制
Position 1, thread id: 4664880576
Position 2, thread id: 123145451188224
Position 3, thread id: 4664880576

如打印结果所示,位置 1 与 3 的线程 id 相同,而位置 2 则与前面两者不同,这说明了经 Dispatchers.Default调度的协程运行在一个后台线程中。在这里 main 函数体与经 Dispatchers.Main调度后的协程都运行在主线程内。不过这里有一点需要注意 Dispatchers.Main调度器在所有 Darwin(即全部 Apple 平台:iOS、macOS、watchOS、tvOS 等等)上调度方式改用了平台相关的RunLoop,在上面的示例中,我们使用 CFRunLoopRun 函数开启了主线程循环,所以 Dispatchers.Main调度器才会有效,如果我们使用协程的 runBlocking 函数开启主线程循环,则 Dispatchers.Main调度器在 Darwin 平台上将失效。考虑以下代码示例:

代码语言:javascript复制
fun main() = runBlocking {
    launch(Dispatchers.Main) { 
        println("Run on the main thread")
    }
    Unit
}

上面这段代码在主分支的协程中所有的 Native 平台上都可以正常打印,但在多线程版协程中,如果目标平台为Darwin,则协程内部的打印输出将永远不会生效,但在 Linux、Windows 等平台上仍可以正常打印。这实际上是一个进步,如果我们要编写移动端的多平台程序,我们会更希望 Dispatchers.Main 在 iOS 上切换到 UI 主线程。

3.2 利用 CPU 多核能力的主要方式:newSingleThreadContext() 函数

Dispatchers.Default 调度器虽然可以将您当前在协程中执行的异步代码切换到后台线程,但它与 Kotlin/JVM上的 Dispatchers.Default 线程池实现相比,仍然力有不足。如果您想充分利用 CPU 的多核性能,Native 的 Dispatchers.Default

仍然不能满足您的需求。但是当前预览版本的多线程协程中仍然没有线程池的实现,因此我们必须手动创建其他的多线程上下文。

在主分支版本的协程上,程序无法引用到 newSingleThreadContext() 函数,它曾经是 Kotlin/JVM独有的,但当前 Kotlin/Native 的预览版的多线程协程中,newSingleThreadContext() 是我们使用 CPU 多核能力的主力调度器,见如下代码示例:

代码语言:javascript复制
@UseExperimental(ExperimentalCoroutinesApi::class)
fun main() = runBlocking {
    println(pthread_self()!!.rawValue.toLong())
    launch(newSingleThreadContext("1")) {
        println(pthread_self()!!.rawValue.toLong())
    }
    launch(newSingleThreadContext("2")) {
        println(pthread_self()!!.rawValue.toLong())
    }
    Unit
}

输出打印如下:

代码语言:javascript复制
4703317440
123145445687296
123145446223872

每一个 newSingleThreadContext() 都会创建一个新的线程,所以真正正确的用法是我们每次都应该把 newSingleThreadContext()创建的 CoroutineContext 保存起来然后重复使用,当我们不再需要一个由 newSingleThreadContext()产生的 CoroutineContext 时,我们应该手动将其回收以释放资源,如下所示:

代码语言:javascript复制
@UseExperimental(ExperimentalCoroutinesApi::class)
fun main() = runBlocking {
    println(pthread_self()!!.rawValue.toLong())
    val coroutineContext = newSingleThreadContext("1")
    val job = launch(coroutineContext) {
        println(pthread_self()!!.rawValue.toLong())
    }
    job.join()
    coroutineContext.close()
}

此外,由于 Kotlin/Native 中积极推行 Worker 取代线程的概念,因此通过 newSingleThreadContext()产生的 CoroutineContext 可以直接通过成员属性 worker 引用到该线程对应的 Worker,如下所示:

代码语言:javascript复制
@UseExperimental(ExperimentalCoroutinesApi::class)
fun main() = runBlocking {
    println(pthread_self()!!.rawValue.toLong())
    val coroutineContext = newSingleThreadContext("1")
    val job = launch(coroutineContext) {
        println(pthread_self()!!.rawValue.toLong())
    }
    job.join()
    coroutineContext.worker.execute(TransferMode.SAFE, {}) {
        "Hello Multi-thread"
    }.consume { 
        println(it)
    }
    coroutineContext.close()
}

3.3 对象子图分离与失效的 Mutex

协程构建器(例如 launch、async 等)的参数 lambda 表达式可以任意捕捉上下文变量,它将默认捕捉的变量都是冻结的(这里指的是局部变量),即,如果协程所运行的线程与外部线程不同,且如果发生修改这些捕捉过来的变量时,则程序都会抛出 InvalidMutabilityException 异常。

但是在协程中,我们有协程自己的基于挂起实现的锁 Mutex,因此如果要使用 Mutex 来保证并发安全,第一步要做的就是让变量的更改摆脱Worker-对象子图机制,完全将并发风险暴露出来,然后才能通过将有风险的代码包裹在 Mutex 锁的作用域内来充分利用 Mutex。

然而,在协程构建器与 Worker 的 execute 函数不同,不能将协程本身设置为 UNSAFE 模式,因此这里需要将对象子图暂时分离,然后在协程构建器内再将其重新绑定。用法如下面的代码示例所示:

代码语言:javascript复制
fun main() = runBlocking {
    val testData = TestData()
    val bareTestData = DetachedObjectGraph(TransferMode.UNSAFE) { testData }
    val job = launch(Dispatchers.Default) {
        val outTestData = bareTestData.attach()
        repeat(20000) { outTestData.index   }
    }
    repeat(20000) { testData.index   }
    job.join()
    println(testData.index)
}


data class TestData(var index: Int = 0)

为了便于理解代码,我们可以用下图更直观的解释对象子图,以及对象子图分离的过程:

虽说叫做对象子图分离,但是在用法上却更类似于包装,我们使用 DetachedObjectGraph<T>类来包装一个对象,即可实现对象子图分离。DetachedObjectGraph<T> 的构造函数接收两个参数,第一个是对象转移校验模式TransferMode,可以看到,如果要达成我们的目的,这里必须使用 UNSAFE 模式,第二个参数则类似于 execute函数的 producer。然后我们在需要使用它的协程中再调用 DetachedObjectGraph<T> 类的扩展函数attach,即可以拿到原对象。DetachedObjectGraph<T> 类的另一个构造函数重载接收一个 COpaquePointer?类型的参数(代表一个指针),感兴趣的读者可以自行尝试。

这段代码的运行后的打印输出结果与上文展示的 execute 函数的 UNSAFE 模式如出一辙,最终输出的值一定小于 40000(如果并发安全的话会输出 40000 整)。

然后,我们将上面的代码添加到协程的并发安全机制 Mutex 中来,示例代码如下所示:

代码语言:javascript复制
fun main() = runBlocking {
    val testData = TestData()
    val bareTestData = DetachedObjectGraph(TransferMode.UNSAFE) { testData }
    val mutex = Mutex()
    val job = launch(Dispatchers.Default) {
        val outTestData = bareTestData.attach()
        repeat(20000) {
            mutex.withLock { outTestData.index   }
        }
    }
    repeat(20000) {
        mutex.withLock { testData.index   }
    }
    job.join()
    println(testData.index)
}

很可惜,当前预览版的多线程协程的 Mutex 存在 bug,一旦两个协程发生事实上的 Mutex 锁竞争,Mutex 就会将协程一直挂起而不恢复,这会导致我们永远看不到输出结果,如果将上面的代码剔除掉与 Native 有关的部分(例如对象子图分离),然后拿到 Kotlin/JVM上运行,可以正常得到输出:"40000",剔除与 Native 相关部分的代码如下所示:

代码语言:javascript复制
fun main() = runBlocking {
    val testData = TestData()
    val mutex = Mutex()
    val job = launch(Dispatchers.Default) {
        repeat(20000) {
            mutex.withLock { testData.index   }
        }
    }
    repeat(20000) {
        mutex.withLock { testData.index   }
    }
    job.join()
    println(testData.index)
}

这说明 Mutex 的功能在后续有待修复。

除了 Mutex 外,官方还有另一种建议使用的实现并发安全的机制——基于 actor 协程构建器与 Channel的消息机制。但该机制由于目前 actor 协程构建器在 Kotlin/Native 上不可用也暂时无济于事。

四、总结

在本文中我们一共体验了两套 Kotlin/Native 中实现异步与并发的方式,Worker-对象子图模式虽然可以确保并发安全,但是其做法较为粗暴,但目前来说 Worker-对象子图模型仍然是较为成熟的一套实现异步与并发的机制。

多线程版的协程由于处在预览版,因此问题也非常的多,目前已知的问题包括:

1)Dispatchers.Default 调度器功能有限,与 Kotlin/JVM 版的差距太大,但官方资料(参考链接 4)提到后续 Dispatchers.Default 有可能会变更为多线程版本。

2)基于协程挂起实现的锁 Mutex 存在 Bug,当前会造成协程的长时间挂起且不恢复。

3)官方资料(参考链接 4)中提到,当前预览版的多线程协程存在内存泄漏。

4)由于 Dispatchers.Default 与 Dispatchers.Main 调度器指向的线程发生了破坏性变更,如果您之前已经在工程中使用了主分支的单线程版线程,可能会面临代码迁移的问题。

当然,协程与已存在的 Worker-对象子图模型之间也并不协调,就如同上面的示例,如果要使用协程的并发安全机制保证并发安全,就必须进行对象子图分离。然而对象子图的概念在 Kotlin/JVM 上并不存在,这会导致使用协程编写的代码不能做到真正的平台无关。

从长远来看,协程-挂起机制是 Kotlin 的核心,如果后续 kotlinx.io库完整实现了基于 suspend 的 I/O,那么协程就可以一统Kotlin 上的所有异步并发场景,因此,Worker-对象子图模型与多线程的协程之间会如何调和的更优雅,还有待官方后续的完善。

当前,Kotlin/Native 已经经过了接近三年左右的实验性阶段,进入了一个"相对稳定"的状态,据说 2020 年发布的 Kotlin 1.4会让 Kotlin/Native 进入正式版,如果想要试验 Kotlin/Native在线上产品中是否可行,个人认为,只要经过大量且完备的测试(虽然做起来并不容易),以目前状况来看,是值得一试的,但预览版的多线程协程则不同,它处在一个非常非常早期的预览阶段,想要在线上产品中使用,还要等待官方后续推出更加稳定的版本。

参考文档

参考链接 1:Kotlin 编译器实现协程的主要工作是 CPS 变换与状态机,官方 KEEP:

https://github.com/Kotlin/KEEP/blob/master/proposals/coroutines.md

参考链接 2:Java 计划在 JDK 15 中添加类似协程的异步并发工具,即 Project Loom:https://wiki.openjdk.java.net/display/loom/Main#Main-Design

参考链接 3:Kotlin/Native 关于异步并发模型的官方文档:https://kotlinlang.org/docs/reference/native/concurrency.html

参考链接 4:Roman Elizarov 编写的关于多线程版 Native 协程的官方资料:https://github.com/Kotlin/kotlinx.coroutines/blob/native-mt/kotlin-native-sharing.md

参考链接 5:关于 Native 多线程协程的 issue:issue#462 https://github.com/Kotlin/kotlinx.coroutines/issues/462

0 人点赞