Rouse
读完需要
10
分钟
速读仅需 4 分钟
在Android应用开发中,数据流是一个至关重要的概念。而在Jetpack库中,SharedFlow
和 StateFlow
是两个处理数据流的利器,它们基于协程,提供了一种响应式的编程方式。本文将深入探讨这两个类的原理,以及在实际开发中的使用技巧。
原理分析
SharedFlow
和 StateFlow
基于协程构建,它们利用协程的轻量级特性,在异步操作中更加高效。
SharedFlow
使用了一种基于事件溯源的机制,当有新的事件产生时,将事件添加到共享的事件序列中,然后通知所有订阅者。而 StateFlow
则维护了一个可变的状态,并在状态发生变化时通知所有观察者。
热流与冷流
热流和冷流是关于数据流的两个基本概念,它们描述了数据流何时开始以及如何传递事件的方式。
- 热流是一种主动的数据流。它在创建时就开始发射事件,无论是否有观察者订阅。即使没有观察者,热流也会持续产生事件。当观察者订阅时,它只是加入了已经运行的数据流,开始接收当前已经产生的事件。
- 冷流是一种被动的数据流。它在有观察者订阅时才开始发射事件。每个观察者都会获得相同的事件序列,而不会受到其他观察者的影响。
SharedFlow
和 StateFlow
都是热流。即没有观察者,数据会持续更新,与LiveData
类似。其中MutableSharedFlow
与MutableStateFlow
是它们的可变类型。
热流的示例
代码语言:javascript复制import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val hotFlow = MutableSharedFlow<Int>()
launch {
repeat(5) {
delay(1000)
hotFlow.emit(it)
}
}
// 观察者1
hotFlow.collect {
println("Observer 1: $it")
}
// 观察者2
delay(3000) // 观察者2延迟3秒后订阅
hotFlow.collect {
println("Observer 2: $it")
}
delay(5000) // 为了保持主线程运行
}
在这个例子中,hotFlow
是一个热流,它在启动后每隔一秒产生一个事件。观察者1从一开始就订阅,而观察者2在3秒后订阅,观察者2不会接收到观察者1在订阅之前已经接收的事件。
冷流的示例
代码语言:javascript复制import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val coldFlow = flow {
emit("Line 1")
delay(1000)
emit("Line 2")
delay(1000)
emit("Line 3")
}
// 观察者1
coldFlow.collect {
println("Observer 1: $it")
}
// 观察者2
delay(2000) // 观察者2延迟2秒后订阅
coldFlow.collect {
println("Observer 2: $it")
}
delay(5000) // 为了保持主线程运行
}
在这个例子中,coldFlow
是一个冷流,它在有观察者订阅时才开始发射事件。观察者1从一开始就订阅,而观察者2在2秒后订阅,但它能够接收到从开始运行的事件序列。
MutableSharedFlow
MutableSharedFlow
是一种可变的、用于创建共享流的类。下面是MutableSharedFlow
的一些主要构造函数参数及其默认值:
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : MutableSharedFlow<T> { /*...*/ }
replay
: 表示在订阅时从流中回放的元素数量。默认值为0
,表示不回放任何元素。如果设置为正整数n
,则在订阅时将向新订阅者回放最近的n
个元素。extraBufferCapacity
: 表示额外的缓冲容量,用于存储订阅者尚未消耗的元素。默认值为0
,表示不使用额外的缓冲容量。设置为正整数m
时,会在内部使用一个带有额外m
容量的缓冲区。onBufferOverflow
: 表示在缓冲区溢出时的处理策略。默认值为BufferOverflow.SUSPEND
,表示当缓冲区溢出时暂停发射,等待订阅者消费。其他选项还包括BufferOverflow.DROP_OLDEST
和BufferOverflow.DROP_LATEST
,它们分别表示在缓冲区溢出时丢弃最老的元素或最新的元素。
使用示例:
代码语言:javascript复制val sharedFlow = MutableSharedFlow<Int>(replay = 10, extraBufferCapacity = 5, onBufferOverflow = BufferOverflow.DROP_OLDEST)
在这个示例中,创建了一个带有回放数量为10、额外缓冲容量为5、缓冲溢出处理策略为丢弃最老元素的MutableSharedFlow
。这里的参数值是可根据具体需求进行调整的。
MutableStateFlow
MutableStateFlow
的构造函数有一个默认参数,即初始状态值。以下是 MutableStateFlow
构造函数:
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
构造函数中的 value
参数表示 MutableStateFlow
的初始状态值。在创建 MutableStateFlow
时,需要提供这个初始状态值。
以下是一个示例,演示如何创建一个带有初始状态值的 MutableStateFlow
:
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val initialState = "Initial State"
val stateFlow = MutableStateFlow(initialState)
// 观察者
val job = launch {
stateFlow.collect { value ->
println("Received: $value")
}
}
// 修改状态
stateFlow.value = "New State"
// 等待观察者执行
job.join()
}
在这个例子中,initialState
是 MutableStateFlow
的初始状态值,通过构造函数传递给 MutableStateFlow
。然后,通过修改 stateFlow.value
,可以更新 MutableStateFlow
的状态值。
SharedFlow、StateFlow与LiveData的区别
StateFlow
就是SharedFlow
的一种特殊类型,特点有三:
- 它的replay容量为 1;即可缓存最近的一次粘性事件,如果想避免粘性事件问题,使用
SharedFlow
,replay默认值0。 - 初始化时必须给它设置一个初始值
- 每次发送数据都会与上次缓存的数据作比较,只有不一样才会发送。它还可直接访问它自己的value参数获取当前结果值,在使用上与
LiveData
相似。
与LiveData
的不同点
StateFlow
必须在构建的时候传入初始值,LiveData
不需要;StateFlow
默认是防抖的,即相同值不更新,LiveData
默认不防抖;StateFlow
默认没有和生命周期绑定
简单示例
为了帮助大家更好地理解,以下是使用 SharedFlow
和 StateFlow
的简单示例:
// SharedFlow 示例
val sharedFlow = MutableSharedFlow<String>()
// 订阅
sharedFlow.collect { value ->
println("Received: $value")
}
// 发送数据
sharedFlow.emit("Hello, SharedFlow!")
// StateFlow 示例
val stateFlow = MutableStateFlow("Initial State")
// 订阅
stateFlow.collect { value ->
println("Current State: $value")
}
// 更新状态
stateFlow.value = "New State"
高级使用技巧
- 错误处理
在订阅流时,考虑添加错误处理机制,以确保在流中出现错误时能够得到适当的处理,防止错误传播导致应用崩溃。
代码语言:javascript复制sharedFlow.catch { exception ->
// 处理错误
}.collect { value ->
// 处理正常数据
}
- 流的完成出来
使用onCompletion
来处理流的完成事件,可以在流完成时执行一些清理工作。
sharedFlow.onCompletion { cause ->
if (cause != null) {
// 处理流异常完成的情况
} else {
// 处理正常完成的情况
}
}.collect { value ->
// 处理正常数据
}
- 共享的冷流
使用SharingStarted.WhileSubscribed
来创建共享的冷流,确保只有至少一个订阅者时,共享流才会激活。这在事件通知的场景中非常有用。
val sharedFlow = flowOf(1, 2, 3).shareIn(viewModelScope, SharingStarted.WhileSubscribed())
- 背压策略
在使用buffer
或conflate
等背压策略时,注意根据实际场景选择合适的策略,以平衡性能和内存的消耗。
sharedFlow
.buffer(Channel.CONFLATED) // 或者 buffer(size = n)
.collect { value ->
// 处理数据
}
- 过滤重复的状态
使用distinctUntilChanged
来过滤掉重复的状态,确保只在状态发生变化时通知订阅者。
stateFlow
.distinctUntilChanged()
.collect { state ->
// 处理状态变化
}
实践运用
全局主题模式管理
假设我们需要在应用中管理全局的主题模式,我们可以使用 StateFlow。
代码语言:javascript复制object ThemeManager {
private val _themeStateFlow = MutableStateFlow(Theme.Light)
val themeStateFlow: StateFlow<Theme> get() = _themeStateFlow
fun setTheme(theme: Theme) {
viewModelScope.launch {
_themeStateFlow.value = theme
}
}
}
在上述示例中,ThemeManager
使用 MutableStateFlow
来创建一个管理主题模式的 StateFlow。当主题模式发生变化时,通过 setTheme
方法来更新 StateFlow,所有订阅者都会收到最新的主题模式。
在需要订阅主题模式的地方,可以这样使用:
代码语言:javascript复制class ThemedFragment : Fragment() {
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
viewModelScope.launch {
ThemeManager.themeStateFlow.collect { theme ->
// 根据主题模式更新 UI
}
}
}
}
即时聊天应用
当涉及到共享数据状态的场景时,SharedFlow
通常是一个合理的选择。假设我们要实现一个即时聊天应用,多个页面或组件需要获取最近的聊天消息。
object ChatManager {
private val _chatMessagesFlow = MutableSharedFlow<ChatMessage>(replay = 5, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_LATEST)
val chatMessagesFlow: SharedFlow<ChatMessage> get() = _chatMessagesFlow
fun sendChatMessage(message: String, sender: String) {
val chatMessage = ChatMessage(message, sender, System.currentTimeMillis())
viewModelScope.launch {
_chatMessagesFlow.emit(chatMessage)
}
}
}
在这个示例中,ChatManager
使用 MutableSharedFlow
来创建一个实时通知聊天消息变化的 SharedFlow
。当有新的聊天消息时,通过 sendChatMessage
方法更新 SharedFlow
,所有订阅者都能获取到最近的数据序列。
在需要订阅聊天消息的地方,可以这样使用:
代码语言:javascript复制class ChatFragment : Fragment() {
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
viewModelScope.launch {
ChatManager.chatMessagesFlow.collect { chatMessage ->
// 处理收到的聊天消息,更新 UI
}
}
}
}
结语
通过本文的介绍,相信读者已经对SharedFlow
和StateFlow
有了更深入的了解。在实际应用中,提高Android应用的开发效率。
点个在看你最好看