文章目录
- 一、CoroutineScope#produce 构造生产者协程
- 1、CoroutineScope#produce 函数原型
- 2、代码示例
- 二、CoroutineScope#actor 构造消费者协程
- 1、CoroutineScope#actor 函数原型
- 2、代码示例
一、CoroutineScope#produce 构造生产者协程
通过 CoroutineScope#produce 函数 , 可以快速构造一个 生产者协程 , 其返回值是 ReceiveChannel 实例对象 , 这样就可以在消费者协程中通过该 ReceiveChannel 实例获取并消费数据 ;
1、CoroutineScope#produce 函数原型
CoroutineScope#produce 函数原型 :
代码语言:javascript复制/**
* 启动一个新的协程,通过将值发送到通道来生成值流
* 并返回对协程的引用作为[receichannnel]。这个结果
* 对象可以用于[receive][receichannchannel .]接收此协程产生的]个元素。
*
* 协程的作用域包含[ProducerScope]接口,该接口实现
* [CoroutineScope]和[SendChannel],这样协程就可以调用
* [将][SendChannel。直接发送)。通道已关闭[SendChannel.close]
* 当协程完成时。
* 当其接收通道为[cancelled][receivecchannel .cancel]时,正在运行的协程将被取消。
*
* 协程上下文继承自这个[CoroutineScope]。可以使用[context]参数指定其他上下文元素。
* 如果上下文没有任何dispatcher或其他[ContinuationInterceptor],则[Dispatchers. dispatcher]。使用“Default]”。
* 父作业也继承自[CoroutineScope],但它也可以被重写
* 使用相应的[context]元素。
*
* 此协程中任何未捕获的异常将以此异常作为原因和关闭通道
* 结果通道将变成_failed_,因此此后任何试图从它接收的尝试都会抛出异常。
*
* 生成的通道类型取决于指定的[capacity]参数。
* 详细信息请参见[Channel]接口文档。
*
* 请参阅[newCoroutineContext],以获得新创建的协程可用的调试工具的描述。
*
* **注意:这是一个实验性的api。**在父范围内作为孩子工作的制作人的行为
* 取消和错误处理将来可能会更改。
*
* @param context 附加到[CoroutineScope。coroutineContext]协程的上下文。
* @param capacity 通道缓冲区的容量(默认情况下没有缓冲区)。
* @param block 协程代码。
*/
@ExperimentalCoroutinesApi
public fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> =
produce(context, capacity, BufferOverflow.SUSPEND, CoroutineStart.DEFAULT, onCompletion = null, block = block)
2、代码示例
代码示例 :
代码语言:javascript复制package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
runBlocking {
runBlocking {
val receiveChannel: ReceiveChannel<Int> = GlobalScope.produce {
for (i in 0..3) {
delay(1000)
channel.send(i)
println("向通道中发送数据 $i")
}
}
// 数据消费者协程
val consumer = GlobalScope.launch {
while (true) {
for(num in receiveChannel) {
println("从通道中获取数据 $num")
}
}
}
}
}
}
}
执行结果 :
代码语言:javascript复制22:35:52.720 System.out kim.hsl.coroutine I 向通道中发送数据 0
22:35:52.721 System.out kim.hsl.coroutine I 从通道中获取数据 0
22:35:53.764 System.out kim.hsl.coroutine I 向通道中发送数据 1
22:35:53.765 System.out kim.hsl.coroutine I 从通道中获取数据 1
22:35:54.786 System.out kim.hsl.coroutine I 向通道中发送数据 2
22:35:54.787 System.out kim.hsl.coroutine I 从通道中获取数据 2
22:35:55.835 System.out kim.hsl.coroutine I 从通道中获取数据 3
22:35:55.839 System.out kim.hsl.coroutine I 向通道中发送数据 3
二、CoroutineScope#actor 构造消费者协程
通过 CoroutineScope#actor 函数 , 可以快速构造一个 消费者协程 ;
1、CoroutineScope#actor 函数原型
CoroutineScope#actor 函数原型 :
代码语言:javascript复制/**
* 启动从其邮箱通道接收消息的新协程
* 并返回对其邮箱通道的引用作为[SendChannel]。由此产生的
* 对象可以用来[发送][SendChannel。向这个协程发送]条消息。
*
* 协程的作用域包含[ActorScope]接口,该接口实现
* [CoroutineScope]和[receichannnel],这样协程就可以调用
* [接受][ReceiveChannel。直接接收)。通道已关闭[SendChannel.close]
* 当协程完成时。
*
* 协程上下文继承自[CoroutineScope],可以使用[context]参数指定其他上下文元素。
* 如果上下文既没有任何dispatcher,也没有任何其他[ContinuationInterceptor],那么[Dispatchers. dispatcher .]使用“Default]”。
* 父作业也继承自[CoroutineScope],但它也可以被重写
* 带有相应的[context]元素。
*
* 默认情况下,协程立即被安排执行。
* 其他选项可以通过“start”参数指定。详见[coroutinstart]。
* 可选参数[start]可设置为[coroutinstart]。启动协程_lazy。在这种情况下,
* 它将在第一条消息上隐式启动
* 【发送】【SendChannel。发送到此演员的邮箱通道。
*
* 此协程中未捕获的异常将以此异常作为原因和关闭通道
* 结果通道变成_failed_,因此任何发送到该通道的尝试都会抛出异常。
*
* 生成的通道类型取决于指定的[capacity]参数。
* 详见[Channel]接口文档。
*
* 参见[newCoroutineContext] [CoroutineScope。newCoroutineContext]用于描述可用于新创建的协程的调试工具。
*
* 使用演员
*
* 角色构建器的典型用法如下:
*
* ```
* val c = actor {
* // initialize actor's state
* for (msg in channel) {
* // process message here
* }
* }
* // send messages to the actor
* c.send(...)
* ...
* // stop the actor when it is no longer needed
* c.close()
* ```
*
* ###停止并取消演员
*
* 当参与者的收件箱通道[关闭][SendChannel.]关闭]它向参与者发送一个特殊的“关闭令牌”。
* 参与者仍然处理已经发送的所有消息,然后“' for (msg in channel) '”循环终止
* 演员完成了。
*
* 如果需要在不处理已经发送给它的所有消息的情况下中止参与者,则
* 它将与父job一起创建:
*
* ```
* val job = Job()
* val c = actor(context = job) { ... }
* ...
* // abort the actor
* job.cancel()
* ```
*
* 当演员的父工作被[取消][工作。取消],那么演员的工作就取消了。这意味着
* " ' for (msg in channel) ' "和其他可取消的挂起函数抛出[CancellationException]和actor
* 在不处理剩余消息的情况下完成。
*
* **注意:此API将在未来的更新中随着复杂角色的引入而过时
* 参见[issue #87](https://github.com/Kotlin/kotlinx.coroutines/issues/87)。
*
* @param context 附加到[CoroutineScope。coroutineContext]协程的上下文。
* @param capacity 通道缓冲区的容量(默认情况下没有缓冲区)。
* @param start 协程启动选项。缺省值为[coroutinstart . default]。
* @param onCompletion 参与者协程的可选完成处理程序(参见[Job.invokeOnCompletion])
* @param block 协程代码。
*/
@ObsoleteCoroutinesApi
public fun <E> CoroutineScope.actor(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0, // todo: Maybe Channel.DEFAULT here?
start: CoroutineStart = CoroutineStart.DEFAULT,
onCompletion: CompletionHandler? = null,
block: suspend ActorScope<E>.() -> Unit
): SendChannel<E> {
val newContext = newCoroutineContext(context)
val channel = Channel<E>(capacity)
val coroutine = if (start.isLazy)
LazyActorCoroutine(newContext, channel, block) else
ActorCoroutine(newContext, channel, active = true)
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
coroutine.start(start, coroutine, block)
return coroutine
}
2、代码示例
代码示例 :
代码语言:javascript复制package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
runBlocking {
runBlocking {
// 数据消费者协程
val sendChannel: SendChannel<Int> = GlobalScope.actor<Int> {
while (true) {
val num = receive()
println("从通道中获取数据 $num")
}
}
// 数据生产者协程
val producer = GlobalScope.launch {
for (i in 0..3) {
sendChannel.send(i)
println("向通道中发送数据 $i")
}
}
}
}
}
}
执行结果 :
代码语言:javascript复制22:43:12.093 System.out kim.hsl.coroutine I 从通道中获取数据 0
22:43:12.095 System.out kim.hsl.coroutine I 向通道中发送数据 0
22:43:12.096 System.out kim.hsl.coroutine I 向通道中发送数据 1
22:43:12.097 System.out kim.hsl.coroutine I 从通道中获取数据 1
22:43:12.098 System.out kim.hsl.coroutine I 从通道中获取数据 2
22:43:12.098 System.out kim.hsl.coroutine I 向通道中发送数据 2
22:43:12.099 System.out kim.hsl.coroutine I 从通道中获取数据 3
22:43:12.102 System.out kim.hsl.coroutine I 向通道中发送数据 3