【Kotlin 协程】Flow 流组合 ( Flow#zip 组合多个流 | 新组合流的元素收集间隔与被组合流元素发射间隔的联系 )

2023-03-30 18:31:15 浏览数 (1)

文章目录

  • 一、Flow 流组合
    • 1、Flow#zip 组合多个流
    • 2、新组合流的元素收集间隔与被组合流元素发射间隔的联系

一、Flow 流组合


1、Flow#zip 组合多个流

调用 Flow#zip 函数 , 可以将两个 Flow 流合并为一个流 ;

Flow#zip 函数原型 :

代码语言:javascript复制
/**
 * 将来自当前流(' this ')的值压缩到[其他]流,使用提供的[transform]函数应用到每对值。
 * 在剩下的流上调用一个流完成和取消时,生成的流就会完成。
 *
 * 可以用下面的例子来演示:
 * ```
 * val flow = flowOf(1, 2, 3).onEach { delay(10) }
 * val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }
 * flow.zip(flow2) { i, s -> i.toString()   s }.collect {
 *     println(it) // Will print "1a 2b 3c"
 * }
 * ```
 *
 * ### 缓冲
 *
 * 上游流在同一协程中按顺序收集,而不进行任何缓冲
 * [other]流被并发收集,就像使用' buffer(0) '一样。参见[buffer]操作符中的文档
 * 为解释。您可以根据需要使用对[buffer]操作符的额外调用,以获得更多并发性。
 */
public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(this, other, transform)

代码示例 :

代码语言:javascript复制
package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking {
            val numFlow = (1..3).asFlow()
            val strFlow = flowOf("One", "Two", "Three")
            // 合并两个 Flow 流
            numFlow.zip(strFlow) { num, str ->
                "num = $num, str = $str"
            }.collect {
                println(it)
            }
        }
    }
}

执行结果 :

代码语言:javascript复制
2022-12-26 16:39:29.428 30002-30002/kim.hsl.coroutine I/System.out: num = 1, str = One
2022-12-26 16:39:29.429 30002-30002/kim.hsl.coroutine I/System.out: num = 2, str = Two
2022-12-26 16:39:29.433 30002-30002/kim.hsl.coroutine I/System.out: num = 3, str = Three

2、新组合流的元素收集间隔与被组合流元素发射间隔的联系

假如两个 Flow 流的 元素发射 不同步 , 则 先发射的元素 , 需要等待对应顺序的 后发射的元素到来 ;

在下面的代码中 , numFlow 的发射元素间隔为 100ms , strFlow 发射元素间隔为 1000ms , 则 numFlow 元素收集到之后 , 需要等待 strFlow 元素收集 , 也就是 二者合并后的间隔以 慢的为准 , 合并后的流 发射间隔为 1000ms ;

代码示例 :

代码语言:javascript复制
package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking {
            val numFlow = (1..3).asFlow().onEach { delay(100) }
            val strFlow = flowOf("One", "Two", "Three").onEach { delay(1000) }
            // 合并两个 Flow 流
            numFlow.zip(strFlow) { num, str ->
                "num = $num, str = $str"
            }.collect {
                println(it)
            }
        }
    }
}

执行结果 :

代码语言:javascript复制
2022-12-26 16:48:43.931 31838-31838/kim.hsl.coroutine I/System.out: num = 1, str = One
2022-12-26 16:48:44.949 31838-31838/kim.hsl.coroutine I/System.out: num = 2, str = Two
2022-12-26 16:48:45.956 31838-31838/kim.hsl.coroutine I/System.out: num = 3, str = Three

0 人点赞