【Kotlin 协程】Channel 通道 ③ ( CoroutineScope#produce 构造生产者协程 | CoroutineScope#actor 构造消费者协程 )

2023-03-30 18:32:46 浏览数 (1)

文章目录

  • 一、CoroutineScope#produce 构造生产者协程
    • 1、CoroutineScope#produce 函数原型
    • 2、代码示例
  • 二、CoroutineScope#actor 构造消费者协程
    • 1、CoroutineScope#actor 函数原型
    • 2、代码示例

一、CoroutineScope#produce 构造生产者协程


通过 CoroutineScope#produce 函数 , 可以快速构造一个 生产者协程 , 其返回值是 ReceiveChannel 实例对象 , 这样就可以在消费者协程中通过该 ReceiveChannel 实例获取并消费数据 ;

1、CoroutineScope#produce 函数原型

CoroutineScope#produce 函数原型 :

代码语言:javascript复制
/**
 * 启动一个新的协程,通过将值发送到通道来生成值流
 * 并返回对协程的引用作为[receichannnel]。这个结果
 * 对象可以用于[receive][receichannchannel .]接收此协程产生的]个元素。
 *
 * 协程的作用域包含[ProducerScope]接口,该接口实现
 * [CoroutineScope]和[SendChannel],这样协程就可以调用
 * [将][SendChannel。直接发送)。通道已关闭[SendChannel.close]
 * 当协程完成时。
 * 当其接收通道为[cancelled][receivecchannel .cancel]时,正在运行的协程将被取消。
 *
 * 协程上下文继承自这个[CoroutineScope]。可以使用[context]参数指定其他上下文元素。
 * 如果上下文没有任何dispatcher或其他[ContinuationInterceptor],则[Dispatchers. dispatcher]。使用“Default]”。
 * 父作业也继承自[CoroutineScope],但它也可以被重写
 * 使用相应的[context]元素。
 *
 * 此协程中任何未捕获的异常将以此异常作为原因和关闭通道
 * 结果通道将变成_failed_,因此此后任何试图从它接收的尝试都会抛出异常。
 *
 * 生成的通道类型取决于指定的[capacity]参数。
 * 详细信息请参见[Channel]接口文档。
 *
 * 请参阅[newCoroutineContext],以获得新创建的协程可用的调试工具的描述。
 *
 * **注意:这是一个实验性的api。**在父范围内作为孩子工作的制作人的行为
 * 取消和错误处理将来可能会更改。
 *
 * @param context   附加到[CoroutineScope。coroutineContext]协程的上下文。
 * @param capacity  通道缓冲区的容量(默认情况下没有缓冲区)。
 * @param block     协程代码。
 */
@ExperimentalCoroutinesApi
public fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> =
    produce(context, capacity, BufferOverflow.SUSPEND, CoroutineStart.DEFAULT, onCompletion = null, block = block)

2、代码示例

代码示例 :

代码语言:javascript复制
package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking {
            runBlocking {

                val receiveChannel: ReceiveChannel<Int> = GlobalScope.produce {
                    for (i in 0..3) {
                        delay(1000)
                        channel.send(i)
                        println("向通道中发送数据 $i")
                    }
                }

                // 数据消费者协程
                val consumer = GlobalScope.launch {
                    while (true) {
                        for(num in receiveChannel) {
                            println("从通道中获取数据 $num")
                        }
                    }
                }
            }
        }
    }
}

执行结果 :

代码语言:javascript复制
22:35:52.720 System.out   kim.hsl.coroutine     I  向通道中发送数据 0
22:35:52.721 System.out   kim.hsl.coroutine     I  从通道中获取数据 0
22:35:53.764 System.out   kim.hsl.coroutine     I  向通道中发送数据 1
22:35:53.765 System.out   kim.hsl.coroutine     I  从通道中获取数据 1
22:35:54.786 System.out   kim.hsl.coroutine     I  向通道中发送数据 2
22:35:54.787 System.out   kim.hsl.coroutine     I  从通道中获取数据 2
22:35:55.835 System.out   kim.hsl.coroutine     I  从通道中获取数据 3
22:35:55.839 System.out   kim.hsl.coroutine     I  向通道中发送数据 3

二、CoroutineScope#actor 构造消费者协程


通过 CoroutineScope#actor 函数 , 可以快速构造一个 消费者协程 ;

1、CoroutineScope#actor 函数原型

CoroutineScope#actor 函数原型 :

代码语言:javascript复制
/**
 * 启动从其邮箱通道接收消息的新协程
 * 并返回对其邮箱通道的引用作为[SendChannel]。由此产生的
 * 对象可以用来[发送][SendChannel。向这个协程发送]条消息。
 *
 * 协程的作用域包含[ActorScope]接口,该接口实现
 * [CoroutineScope]和[receichannnel],这样协程就可以调用
 * [接受][ReceiveChannel。直接接收)。通道已关闭[SendChannel.close]
 * 当协程完成时。
 *
 * 协程上下文继承自[CoroutineScope],可以使用[context]参数指定其他上下文元素。
 * 如果上下文既没有任何dispatcher,也没有任何其他[ContinuationInterceptor],那么[Dispatchers. dispatcher .]使用“Default]”。
 * 父作业也继承自[CoroutineScope],但它也可以被重写
 * 带有相应的[context]元素。
 *
 * 默认情况下,协程立即被安排执行。
 * 其他选项可以通过“start”参数指定。详见[coroutinstart]。
 * 可选参数[start]可设置为[coroutinstart]。启动协程_lazy。在这种情况下,
 * 它将在第一条消息上隐式启动
 * 【发送】【SendChannel。发送到此演员的邮箱通道。
 *
 * 此协程中未捕获的异常将以此异常作为原因和关闭通道
 * 结果通道变成_failed_,因此任何发送到该通道的尝试都会抛出异常。
 *
 * 生成的通道类型取决于指定的[capacity]参数。
 * 详见[Channel]接口文档。
 *
 * 参见[newCoroutineContext] [CoroutineScope。newCoroutineContext]用于描述可用于新创建的协程的调试工具。
 *
 * 使用演员
 *
 * 角色构建器的典型用法如下:
 *
 * ```
 * val c = actor {
 *     // initialize actor's state
 *     for (msg in channel) {
 *         // process message here
 *     }
 * }
 * // send messages to the actor
 * c.send(...)
 * ...
 * // stop the actor when it is no longer needed
 * c.close()
 * ```
 *
 * ###停止并取消演员
 *
 * 当参与者的收件箱通道[关闭][SendChannel.]关闭]它向参与者发送一个特殊的“关闭令牌”。
 * 参与者仍然处理已经发送的所有消息,然后“' for (msg in channel) '”循环终止
 * 演员完成了。
 *
 * 如果需要在不处理已经发送给它的所有消息的情况下中止参与者,则
 * 它将与父job一起创建:
 *
 * ```
 * val job = Job()
 * val c = actor(context = job) {  ... }
 * ...
 * // abort the actor
 * job.cancel()
 * ```
 *
 * 当演员的父工作被[取消][工作。取消],那么演员的工作就取消了。这意味着
 * " ' for (msg in channel) ' "和其他可取消的挂起函数抛出[CancellationException]和actor
 * 在不处理剩余消息的情况下完成。
 *
 * **注意:此API将在未来的更新中随着复杂角色的引入而过时
 * 参见[issue #87](https://github.com/Kotlin/kotlinx.coroutines/issues/87)。
 *
 * @param context       附加到[CoroutineScope。coroutineContext]协程的上下文。
 * @param capacity      通道缓冲区的容量(默认情况下没有缓冲区)。
 * @param start         协程启动选项。缺省值为[coroutinstart . default]。
 * @param onCompletion  参与者协程的可选完成处理程序(参见[Job.invokeOnCompletion])
 * @param block         协程代码。
 */
@ObsoleteCoroutinesApi
public fun <E> CoroutineScope.actor(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0, // todo: Maybe Channel.DEFAULT here?
    start: CoroutineStart = CoroutineStart.DEFAULT,
    onCompletion: CompletionHandler? = null,
    block: suspend ActorScope<E>.() -> Unit
): SendChannel<E> {
    val newContext = newCoroutineContext(context)
    val channel = Channel<E>(capacity)
    val coroutine = if (start.isLazy)
        LazyActorCoroutine(newContext, channel, block) else
        ActorCoroutine(newContext, channel, active = true)
    if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
    coroutine.start(start, coroutine, block)
    return coroutine
}

2、代码示例

代码示例 :

代码语言:javascript复制
package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking {
            runBlocking {
                // 数据消费者协程
                val sendChannel: SendChannel<Int> = GlobalScope.actor<Int> {
                    while (true) {
                        val num = receive()
                        println("从通道中获取数据 $num")
                    }
                }

                // 数据生产者协程
                val producer = GlobalScope.launch {
                    for (i in 0..3) {
                        sendChannel.send(i)
                        println("向通道中发送数据 $i")
                    }
                }
            }
        }
    }
}

执行结果 :

代码语言:javascript复制
22:43:12.093 System.out   kim.hsl.coroutine     I  从通道中获取数据 0
22:43:12.095 System.out   kim.hsl.coroutine     I  向通道中发送数据 0
22:43:12.096 System.out   kim.hsl.coroutine     I  向通道中发送数据 1
22:43:12.097 System.out   kim.hsl.coroutine     I  从通道中获取数据 1
22:43:12.098 System.out   kim.hsl.coroutine     I  从通道中获取数据 2
22:43:12.098 System.out   kim.hsl.coroutine     I  向通道中发送数据 2
22:43:12.099 System.out   kim.hsl.coroutine     I  从通道中获取数据 3
22:43:12.102 System.out   kim.hsl.coroutine     I  向通道中发送数据 3

0 人点赞