这个系列我做了协程和Flow开发者的一系列文章的翻译,旨在了解当前协程、Flow、LiveData这样设计的原因,从设计者的角度,发现他们的问题,以及如何解决这些问题,pls enjoy it。
很久以前,coroutines被引入到Kotlin,它们是轻量级的。我们可以启动大量的coroutine,我们需要一种方法在这些coroutine之间进行通信,而不会遇到可怕的 "可变共享状态 "问题。
因此,Channel被添加为一个协程间的通信原语。Channel是很好的。Channel支持在不同内核之间进行一对一、一对多、多对一和多对多的通信,并且每个发送到Channel的值都会被接收一次。
你不能使用Channel来分发事件或状态更新,以允许多个订阅者独立地接收并对其作出反应。
因此,BroadcastChannel接口被引入,它的实现是带Buffer的ConflatedBroadcastChannel。它们在一段时间内为我们提供了很好的服务,但是它们被证明是一个设计的死胡同。现在,从kotlinx-coroutines 1.4版本开始,我们引入了一个更好的解决方案--shared flows。请继续阅读完整的故事。
Flows are simple
在库的早期版本中,我们只有Channel,我们试图将异步序列的各种转换实现为函数,将一个Channel作为参数,返回另一个Channel作为结果。这意味着,例如,一个过滤运算符将在它自己的coroutine中运行。
这样一个操作符的性能远远不够好,尤其是与写一个if语句相比。事后看来,这并不奇怪,因为Channel是一个同步原语。任何Channel,即使是为单个生产者和单个消费者优化的实现,都必须支持并发的通信程序,它们之间的数据传输需要同步,这在现代多核系统中是很昂贵的。当你开始在异步数据流的基础上构建你的应用架构时,自然会出现对转换的需求,而Channel成本也开始累积。
Kotlin Flow的简单设计允许有效地实现转换操作。在基本的情况下,值的发射、转换和收集都在同一个循环程序中进行,不需要任何同步。
只有当需要在不同的程序中发射和收集数值时,才会引入流的同步性。
https://elizarov.medium.com/kotlin-flows-and-coroutines-256260fb3bdb
Flows are cold
然而,流量通常是冷的--由flow { ... }构建器函数创建的Flow是一个被动的实体。考虑一下下面的代码。
代码语言:javascript复制val coldFlow = flow {
while (isActive) {
emit(nextEvent)
}
}
流程本身没有任何形式的计算作为支撑,在开始收集之前,它本身没有任何状态。每个收集器的coroutine都会执行它自己的发射代码的实例。关于 "cold flow,hot channel "的故事描述了Kotlin flow背后的原因,并展示了它们比Channel更适合的使用情况--返回按需计算的异步值流。
但你如何处理像用户行为、外部设备事件、状态更新等事情?它们的运行是独立于是否有任何代码对它们感兴趣的。它们应该支持应用程序内部的多个观察者。这些是所谓的事件的热源。
Shared flows
这就是shared flow的概念的来源。一个shared flow的存在,不管它是否被收集。shared flow的收集者被称为订阅者。一个shared flow的所有订阅者都会收到相同的数值序列。它有效地像一个 "广播频道 "一样工作,没有大部分的频道开销。它使广播频道的概念变得过时。
本质上,shared flow是一个轻量级的广播事件总线,你可以在你的应用架构中创建和使用。
代码语言:javascript复制class BroadcastEventBus {
private val _events = MutableSharedFlow<Event>()
val events = _events.asSharedFlow() // read-only public view
suspend fun postEvent(event: Event) {
_events.emit(event) // suspends until subscribers receive it
}
}
它有可调整的参数,如为新的订阅者保留和重放的旧事件的数量,以及为快速发射器和慢速订阅者提供缓冲的extraBufferCapacity。
一个shared flow的所有订阅者都在自己的上下文中异步地收集事件。发射器并不等待,直到订阅者完成对事件的处理。然而,当shared flow的缓冲区满了,发射器会暂停,直到缓冲区有空间。在缓冲区溢出时,发射器的这种暂停提供了背压,在收集器无法跟上时减缓发射。通过BufferOverlow参数支持处理缓冲区溢出的其他策略。
State flows
处理缓冲区溢出的一个流行方法是放弃最旧的事件,只保留最近的、最新的事件。特别是,它是在一个应用程序中对状态变量进行建模的一个好方法。它是如此广泛的使用情况,以至于它有自己专门的StateFlow类型,作为ConflatedBroadcastChannel的替代,后者也已经过时了。
代码语言:javascript复制class StateModel {
private val _state = MutableStateFlow(initial)
val state = _state.asStateFlow() // read-only public view
fun update(newValue: Value) {
_state.value = newValue // NOT suspending
}
}
可以把val x: StateFlow是var x的一个异步和可观察的对应物。T . 它的最近值总是可用的,事实上,最近的值是唯一重要的,所以更新它总是可以不暂停的。
有了状态流,复杂Channel和简单流之间的性能差异变得非常明显。状态流的实现具有无分配的更新,而混杂的广播Channel则不是这样的。
A use-case for channels
随着不同类型的shared flow量取代了不同类型的广播频道,流行的问题是普通的、常规的频道会发生什么?由于许多原因,它们将继续存在。其中一个原因是,Channel是用于实现许多复杂流量操作的低级基元。
但是,Channel也有其应用案例。Channel被用来处理那些必须被精确处理一次的事件*(详见下面的附注)。这种情况发生在有一种事件类型的设计中,这种事件通常有一个订阅者,但间歇性地(在启动或某种重新配置期间)根本没有订阅者,而且有一个要求,即所有发布的事件必须保留,直到有订阅者出现。
代码语言:javascript复制class SingleShotEventBus {
private val _events = Channel<Event>()
val events = _events.receiveAsFlow() // expose as flow
suspend fun postEvent(event: Event) {
_events.send(event) // suspends on buffer overflow
}
}
在第一个例子中,BroadcastEventBus是用SharedFlow编写的,而SingleShotEventBus是用Channel编写的,它们都把事件公开为Flow,但它们有一个重要的区别。
在shared flow中,事件被广播给未知数量(零或更多)的订阅者。在没有订阅者的情况下,任何发布的事件都会被立即放弃。这是一种设计模式,用于必须立即处理或根本不处理的事件。
在Channel中,每个事件被传递给一个订阅者。试图在没有订阅者的情况下发布事件,一旦Channel缓冲区变满就会暂停,等待订阅者出现。发布的事件不会被丢弃。
请注意,有Channel的SingleShotEventBus实现只在没有取消的情况下对每个发布的事件精确地处理一次。当流的订阅者被取消时,事件可能无法被传递。详情请参见Channel中未交付的元素的文档。
Bottom line
了解两者的区别,适当地使用shared flows和Channel。它们都很有用,被设计成可以很好地一起工作。然而,广播Channel是过去的过时的人工制品,它们将在未来被废弃和删除。
原文链接:https://elizarov.medium.com/shared-flows-broadcast-channels-899b675e805c