Kotlin 协程 通道 Channel 介绍

2023-07-13 16:07:36 浏览数 (3)

前言

在学习了Flow流之后,本篇继续学习通道相关的知识。

在协程中,通道是指提供了一种在Flow中传输值的方法。

提供了一种便捷的方法使得单个值可以在多个协程之间进行相互传输。

其实就类似我们学I/O流的时候,讲解的通道是一样的意思。

1. Channel 通道介绍

一个Channel是一个和BlockingQueue 非常类似的概念。区别在于它代替了阻塞的put操作并提供了挂起的send,还代替了阻塞的take操作,并提供了挂起的receive操作

简而言之:putsendtakereceive

示例:

代码语言:javascript复制
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel

fun main() = runBlocking {
    val channel = Channel<Int>() //定义了一个通道,用来接收Int数据
    launch {
        // 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 3 次整数的平方并发送
        for (x in 1..3) channel.send(x * x)
    }
    // 这里我们打印了 5 次被接收的整数:
    repeat(3) { println(channel.receive()) }
    println("结束!")
}
//输出
1
4
9
结束!

在上面的示例中,我们使用了sendreceive函数。

2. 关闭通道-close

和消息队列不同,一个Channel可以通过被关闭来表明没有更多的元素将会进入通道。

然后接收者可以定期的使用for循环来从Channel中接收元素。

一个close()操作,就是向Channel发送了一个特殊的关闭指令。这个当这个关闭操作被

Channel收到的时候,通道就进入了迭代停止状态。也就是说之后通道将不会有数据更新了。它能够保证所有先前发送出去的元素都在Channel收到close消息前被接收到。

示例:

代码语言:javascript复制
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x * x)
        channel.close() // 我们结束发送
    }
// 这里我们使用 `for` 循环来打印所有被接收到的元素(直到通道被关闭)
    for (y in channel) println(y)
    println("结束!")
}
//输出
1
4
9
16
25
结束!

3. 构建通道生产者

Channel是按照 生产者-消费者模式进行构建的。

示例:

代码语言:javascript复制
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.channels.produce

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
    for (x in 1..3) send(x * x)
}

fun main() = runBlocking {
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("结束!")
}
//输出
1
4
9
结束!

produce 是一个便捷的协程构造器,可以很容易地在生产者端正确工作。

我们使用了consumeEach在消费者端替代了for循环。可以达到上面的一样的效果。

4. 管道的概念

管道是一种一个协程在Flow中开始生产可能无穷多个元素的模式

示例:

代码语言:javascript复制
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.channels.produce

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x  ) // 在流中开始从 1 生产无穷多个整数
}

fun main() = runBlocking {
    val squares = produceNumbers()
    squares.consumeEach { println(it) }
    println("结束!")
}
//输出
080
632081
632082
632083
...

将会无限输出下去直到Int存储不够为止,因为上面的示例中while是一个死循环。

我们如果配合上取消等操作一起。示例如下:

代码语言:javascript复制
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x  ) // 在流中开始从 1 生产无穷多个整数
}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
}

fun main() = runBlocking {
    val numbers = produceNumbers() // 从 1 开始生成整数
    val squares = square(numbers) // 整数求平方
    repeat(5) {
        println(squares.receive()) // 输出前五个
    }
    println("结束!") // 至此已完成
    coroutineContext.cancelChildren() // 取消子协程
}
//输出
1
4
9
16
25
结束!

所有创建了协程的函数被定义在了CoroutineScope的扩展上。所以我们可以依靠结构化并发来确保没有常驻在我们的应用程序中的全局协程。

如果到这里还是比较迷茫的话,很正常。我们继续不断地使用才能明白。

示例:在协程中使用一个管道来生产素数

代码语言:javascript复制
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x  ) // 开启了一个无限的整数流
}


fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x) //过滤了来源于流中的数字,删除了所有可以被给定素数整除的数字。
}

fun main() = runBlocking {
    var cur = numbersFrom(2) //让我们从2 开始不断循环的生产整数
    repeat(10) {  //我们打印前10秒的数据结果
        val prime = cur.receive() //将管道数据挂起,
        println(prime)  //输出管道数据
        cur = filter(cur, prime)  //过滤管道中的可以被定素数整除的数字。
    }
    coroutineContext.cancelChildren()
}
//输出
2
3
5
7
11
13
17
19
23
29

我们可以使用iterator协程构建器来构建一个相似的管道。

使用 :

  • iterator 替换 produce
  • yield 替换 send
  • next 替换 receive
  • Iterator 替换 ReceiveChannel 来摆脱协程作用域,你将不再需要 runBlocking

(管道的概念还是没有理解透彻。以后理解透彻后再单独出一篇吧。)

5. 扇出

多个协程也许会接收相同的通道,在它们之间进行分布式工作。数据的发出叫做扇出

示例:启动一个定期产生整数的协程对象(每秒10个数值),再启动五个处理器协程接收信息。并工作一秒 。示例:

代码语言:javascript复制
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(950)
    producer.cancel() // 取消协程生产者并将它们全部杀死。
}

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1 // 从1开始
    while (true) {
        send(x  ) // 生产下一个数值
        delay(100) // 阻塞100毫秒
    }
}

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("生产者 #$id 接收者 $msg")
    }
}
//输出
生产者 #0 接收者 1
生产者 #0 接收者 2
生产者 #1 接收者 3
生产者 #2 接收者 4
生产者 #3 接收者 5
生产者 #4 接收者 6
生产者 #0 接收者 7
生产者 #1 接收者 8
生产者 #2 接收者 9

注意,取消生产者协程将关闭它的通道,从而最终终止处理器协程正在执行的此通道上的迭代。

上面示例中的这个 for 循环是安全完美地使用多个协程的。如果其中一个处理器协程执行失败,其它的处理器协程仍然会继续处理通道,而通过 consumeEach 编写的处理器始终在正常或非正常完成时消耗(取消)底层通道。

6. 扇入

多个协程可以发送到同一个通道,叫做扇入。

示例:让我们创建一个字符串的通道,和一个在这个通道中以指定的延迟反复发送一个指定字符串的挂起函数。

代码语言:javascript复制
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch { sendString(channel, "zin", 200L) }
    launch { sendString(channel, "YAN!", 500L) }
    repeat(6) { // 接收前六个
        println(channel.receive())
    }
    coroutineContext.cancelChildren() // 取消所有子协程来让主协程结束
}

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}
//输出
zin
zin
YAN!
zin
zin
YAN!

简而言之:一对多输出 扇出。多对一输入,扇入。

你将数据之间用线段链接起来,就是比较形象的扇子了。

7. 通道缓冲

在上面的示例中,所有的通道都是没有缓冲区的。而无缓冲的Channel在发送者和接收者相遇时传输元素(简称:对接)。如果发送先被调用,那么通道会挂起等待通道中的消息被接收。如果先调用接收,那它将被挂起直到通道中出现消息发送。

Channel工厂函数与produce构建器通过一个可选参数capacity来指定缓冲区大小。

缓冲允许发送者在被挂起前发送多个元素。只有当缓冲区被填满时通道才会被挂起阻塞等待被接收。

示例:创建一个带缓冲的通道

代码语言:javascript复制
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
    val channel = Channel<Int>(4) // 启动带缓冲的通道
    val sender = launch { // 启动发送者协程
        repeat(10) {
            println("发送消息 $it") // 在每一个元素发送前打印它们
            channel.send(it) // 将在缓冲区被占满时挂起
        }
    }
// 没有接收到东西……只是等待……
    delay(1000)
    sender.cancel() // 取消发送者协程
}
//输出
发送消息 0
发送消息 1
发送消息 2
发送消息 3
发送消息 4

在上面的示例中,前四个元素被加入到缓冲区。当发送者想发射第五个元素的时候,将会被挂起。直到被接收。

8. 通道公平性

Channel之中,发送和接收操作是公平的。并且尊重调用它们的多个协程。

遵守先进先出原则

示例:

代码语言:javascript复制
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

data class Ball(var hits: Int)

fun main() = runBlocking {
    val table = Channel<Ball>() // 一个共享的 通道(桌子)
    launch { player("zin", table) }
    launch { player("yan", table) }
    table.send(Ball(0)) // zinyan
    delay(1000) // 延迟 1 秒钟
    coroutineContext.cancelChildren() // 结束,取消它们
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // 在循环中接收zin 和yan
        ball.hits  
        println("$name $ball")
        delay(300) // 等待一段时间
        table.send(ball) // 将结果发送出去
    }
}
//输出
zin Ball(hits=1)
yan Ball(hits=2)
zin Ball(hits=3)
yan Ball(hits=4)

9. 计时器通道 ticker

带计时器的通道是一种特殊的会合通道。每次经过特定的延迟都会从该通道进行消费并产生Unit

它被用来构建分段来创建复杂的基于时间的Produce管道和进行窗口化操作以及其他时间相关的处理。

通过ticker来构建这种通道。

示例:

代码语言:javascript复制
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
    //创建计时器通道
    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0)
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("初始化元素,并开始使用: $nextElement") // 没有初始延迟
// 所有的后续元素都进行100毫秒延迟  delayMillis=100 定义的
    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() }
    println("下一个元素在50毫秒内准备就绪: $nextElement")
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("下一个元素在100毫秒内准备就绪: $nextElement")
    // 模拟大量消费延迟
    println("消费者暂停150毫秒")
    delay(150)
    // 下一个元素立即可用
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("下一个元素在消费者暂停延迟后立即使用: $nextElement")
    // 请注意,`receive` 调用之间的暂停被考虑在内,下一个元素的到达速度更快
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("消耗元素在停止150毫秒后,下一个元素在50毫秒内准备就绪: $nextElement")
    tickerChannel.cancel() // 表明不再需要更多的元素
}
//输出
初始化元素,并开始使用: kotlin.Unit
下一个元素在50毫秒内准备就绪: null
下一个元素在100毫秒内准备就绪: kotlin.Unit
消费者暂停150毫秒
下一个元素在消费者暂停延迟后立即使用: kotlin.Unit
消耗元素在停止150毫秒后,下一个元素在50毫秒内准备就绪: kotlin.Unit

ticker 知道可能的消费者暂停,并且默认情况下会调整下一个生成的元素如果发生暂停则延迟,试图保持固定的生成元素率。

给可选的 mode 参数传入TickerMode.FIXED_DELAY 可以保持固定元素之间的延迟。

大概的学习就到这里了。

参考链接:通道 - Kotlin 语言中文站 (kotlincn.net)

0 人点赞