协程间通信Channel及WaitGroup
在进程篇的学习中,我们花过很大的篇幅讲过进程间的通信问题。但是在协程中,这个问题其实并不是很重要,为什么呢?因为从基础的理论我们就知道,协程是基于线程的,而线程在同一个进程中是共享内存的,通信其实并不会有太大的问题。而进程因为有隔离问题的存在,所以进程之间的通信我们就讲了很多。关于协程的通信,Swoole 直接就提供了一个 Channel 功能来帮助我们实现。*标题With写错了
Channel
Channel ,其实可以理解为就是一个消息队列,只不过它是协程间的消息队列,多个协程可以通过 push 和 pop 操作来生产和消费消息。Channel 是基于协程的,所以说它是没有办法跨进程使用的,我们后面要讲的 并发调用 和 连接池 都是基于 Channel 的。
Channel 支持多生产者协程和多消费者协程,底层自动实现了协程的切换和调度。它与 Array 很类似,仅占用内存,没有额外别的资源申请,因此也就没有 IO 消耗,效率速度可想而知。它在底层使用 PHP 的引用计数实现,没有内存的拷贝问题,巨大字符和数组也不会产生额外的消耗,也是零拷贝技术的实现。
所谓零拷贝就是说,传统的 IO 标准操作会在系统内核地址空间的缓冲区和应用程序地址空间定义的缓冲区之间进行传输,效率提升了但传输过程中数据需要在缓冲区进行拷贝复制。而零拷贝则避免了不同存储块之间的拷贝,能够更加有效利用系统资源,极大提升性能。
最后汇总一句话,Channel 性能 diao 炸天。而且,Channel 在 Go 语言中是非常非常重要的一个能力,掌握好 Swoole 中的 Channel ,再学习 Go ,或者之前已经学习过 Go ,再来看这里的话,都会非常亲切。对于协程编程,Channel 一定要牢牢掌握。
代码语言:javascript复制SwooleCoroutinerun(function(){
$channel = new SwooleCoroutineChannel(1);
go(function() use ($channel){
for($i = 0; $i < 3; $i ) {
$channel->push(['rand' => rand(1000, 9999), 'index' => $i]);
echo "{$i}n";
}
$channel->close();
});
go(function() use($channel){
while(1){
co::sleep(1);
$data = $channel->pop();
if($channel->errCode == SWOOLE_CHANNEL_CLOSED){
break;
}
var_dump($data);
}
});
});
//0
//1
//array(2) {
// ["rand"]=>
// int(8020)
// ["index"]=>
// int(0)
//}
//2
//array(2) {
// ["rand"]=>
// int(5072)
// ["index"]=>
// int(1)
//}
//array(2) {
// ["rand"]=>
// int(5950)
// ["index"]=>
// int(2)
//}
使用也非常简单,上面的代码即使我不说你也应该能看明白。首先实例化一个 SwooleCoroutineChannel 对象,它的构造参数是设置队列的容量,比如我们设置为 1 则队列只能有一条数据。当这一条数据没有被消费的时候,后续的 push() 会挂起并等待队列有空间继续存放数据。同理,pop() 也会在队列为空的时候挂起等待,它的参数就是等待时间,超过时间了则会结束,默认情况下是 -1 ,表示会一直等待。
当我们操作完添加的协程之后,调用 close() 关闭队列,然后在消费端查看 Channel 队列是否已经关闭,如果关闭了就退出循环,最终程序执行结束。
具体的调用顺序我们从注释中来看,首先是打印的 0 和 1 ,貌似一次塞进了两条数据,但其实我们的 Channel 容量只有 1 ,只是说协程2已经消费了,但在打印出来效果上看却是 1 先输出了,这是并发执行的特点,如果同时进行,出现的顺序是不一定的。消费了一条数据之后第一个协程又打印出来了 2 ,这时队列添加操作结束,调用 close() 关闭队列。之后第二个协程会继续消费完队列。想看清楚的话,可以在第二个协程的 while 循环中加一个 co::sleep(1);
看看效果。
官网给出的例子是另一种形式,根据 pop 过期时间的。
代码语言:javascript复制use SwooleCoroutine;
use SwooleCoroutineChannel;
use function SwooleCoroutinerun;
run(function(){
$channel = new Channel(1);
Coroutine::create(function () use ($channel) {
for($i = 0; $i < 10; $i ) {
Coroutine::sleep(1.0);
$channel->push(['rand' => rand(1000, 9999), 'index' => $i]);
echo "{$i}n";
}
});
Coroutine::create(function () use ($channel) {
while(1) {
$data = $channel->pop(2.0);
if ($data) {
var_dump($data);
} else {
assert($channel->errCode === SWOOLE_CHANNEL_TIMEOUT);
break;
}
}
});
});
总共就这么点东西,是不是很简单(简单个毛线)。push() 和 pop() 直接在底层就为我们实现了挂起和执行的调度操作。其实大家也能猜到,内部同样是 yield() 和 resume() 的来回切换操作。更具体的应用大家可以学习一下 Go 语言中的 Channel ,前面也说过了在 Go 中主要就是依靠 Channel 来进行协程管理,非常强大。
多协程生产消费及其它方法属性
接下来我们直接在一个例子中演示多协程操作 Channel 以及 Channel 的一些其它的相关方法和属性。
代码语言:javascript复制SwooleCoroutinerun(function () {
$channel = new chan(2);
$chan2 = new chan(2); // 总控,有两个生产者,两个消费者,要知道何时关闭 $channel
go(function () use ($channel, $chan2) {
for ($i = 0; $i < 3; $i ) {
co::sleep(rand(1,2));
$channel->push(['rand' => rand(1000, 9999), 'index' => $i]);
echo "入 channel 队协程:" . co::getCid() . ",下标:{$i}n";
}
echo "入 chan2 队协程:" . co::getCid();
$chan2->push(1);
});
go(function () use ($channel, $chan2) {
for ($i = 1; $i < 4; $i ) {
co::sleep(rand(1,2));
$channel->push(['rand' => rand(1000, 9999), 'index' => $i * 10]);
echo "入 channel 队协程:" . co::getCid() . ",下标:{$i}n";
}
echo "入 chan2 队协程:" . co::getCid();
$chan2->push(1);
});
echo "================", PHP_EOL;
var_dump($channel->stats());
var_dump($channel->length());
var_dump($channel->isEmpty());
var_dump($channel->isFull());
var_dump($channel->capacity);
var_dump($channel->errCode);
echo "================", PHP_EOL;
go(function () use ($channel) {
while (1) {
co::sleep(rand(2,3));
if ($channel->errCode == SWOOLE_CHANNEL_CLOSED) {
break;
}
$data = $channel->pop();
if($data == false){
break;
}
echo "%%%%%n";
echo " cid:", co::getCid(), "消费 channel !n";
var_dump($data);
echo "%%%%%n";
}
});
go(function () use ($channel) {
while (1) {
co::sleep(rand(2,3));
if ($channel->errCode == SWOOLE_CHANNEL_CLOSED) {
break;
}
$data = $channel->pop();
if($data == false){
break;
}
echo "%%%%%n";
echo " cid:", co::getCid(), "消费 channel !n";
var_dump($data);
echo "%%%%%n";
}
});
for ($i = $chan2->capacity; $i > 0; $i--) {
$chan2->pop();
echo " 主线程消费 chan2: {$i} !n";
}
$channel->close();
});
//================
//array(3) {
// ["consumer_num"]=>
// int(0)
// ["producer_num"]=>
// int(0)
// ["queue_num"]=>
// int(0)
//}
//int(0)
//bool(true)
//bool(false)
//int(2)
//int(0)
//================
//入 channel 队协程:2,下标:0
//入 channel 队协程:3,下标:1
//入 channel 队协程:3,下标:2
//%%%%%
// cid:5消费 channel !
//array(2) {
// ["rand"]=>
// int(2792)
// ["index"]=>
// int(0)
//}
//%%%%%
//%%%%%
// cid:4消费 channel !
//array(2) {
// ["rand"]=>
// int(7298)
// ["index"]=>
// int(10)
//}
//%%%%%
//入 channel 队协程:2,下标:1
//入 channel 队协程:2,下标:2
//入 chan2 队协程:2 主线程消费 chan2: 2 !
//%%%%%
// cid:4消费 channel !
//array(2) {
// ["rand"]=>
// int(3729)
// ["index"]=>
// int(20)
//}
//%%%%%
//入 channel 队协程:3,下标:3
//入 chan2 队协程:3 主线程消费 chan2: 1 !
//%%%%%
// cid:5消费 channel !
//array(2) {
// ["rand"]=>
// int(3590)
// ["index"]=>
// int(1)
//}
//%%%%%
//%%%%%
// cid:4消费 channel !
//array(2) {
// ["rand"]=>
// int(2667)
// ["index"]=>
// int(2)
//}
//%%%%%
//%%%%%
// cid:5消费 channel !
//array(2) {
// ["rand"]=>
// int(3430)
// ["index"]=>
// int(30)
//}
//%%%%%
内容比较多,也复杂了很多,我们一块一块来看。
现在的情况是,我们有四个协程,两个生产,两个消费,那么问题来了,我们怎么知道生产者生产完了呢?也就是说,我们怎么知道 $channel
应该在什么时候 close() 呢?毕竟有两个协程同时在向同一个 Channels 中 push 数据啊。
这里也是从 Go 那边找的一个例子。我们可以再建一个 Channels ,就叫做 chan2 ,然后在主进程中循环 pop 它,并在两个生产者协程中 push 数据。当两个生产者协程向 channel 添加完成之后,外面主进程的循环 pop 才会结束,我们就关闭
在这个几个例子中,加了很多 co::sleep() ,为的是可以方便地看出协程交替执行的效果,实际工作中是不用加的,因为我们这是例子,没有什么耗时操作,执行太快了,看不出多个协程一起工作的效果。而在实际工作中,可能有各种 IO 情况导致处理时间会有不同,效果就会比较明显。
另外我们使用了 chan() 这个语法糖实例化了一个 Channel 对象,它和 SwooleCoroutineChannel 对象是一样的。同时我们还打印了一下 Channel 对象的一些相关方法属性,大家可以看一下。
stats() 返回队列信息,主要包括下面这些内容:
- consumer_num 消费者数量,表示当前通道为空,有 N 个协程正在等待其他协程调用 push 方法生产数据
- producer_num 生产者数量,表示当前通道已满,有 N 个协程正在等待其他协程调用 pop 方法消费数据
- queue_num 通道中的元素数量
很明显,我们的 producer_num 和 queue_num 都会是 2 ,目前队列是满队的状态,因为主进程中打印的,所有的协程还在休眠状态,所以看不到什么东西,大家可以尝试注释掉生产者里面的休眠代码,就可以看到主进程打印的相关信息了。
length() 表示的就是 queue_num 的信息,也就是队列的元素数量或者说是队列长度。isEmpty() 表示队列是否为空,isFull() 表示队列是否已满。capacity 属性就是我们在构造函数设置的队列长度,errCode 表示当前的错误信息,我们已经用过这个了。
好吧,我承认,我看得也晕,其实最好的方式是大家直接去学一下 Go ,然后对比着那边的 Channel 一起来看。
WaitGroup
基于 Channel 可以实现很多功能,紧接着我们就来讲一个另一个比较重要的功能,那就是协程的 WaitGroup 功能。
如果你学过 Go 语言,那么 sync.WaitGroup 应该不会陌生。如果你没学过,也不用担心。先看代码,再来解释。
代码语言:javascript复制SwooleCoroutinerun(function(){
$wg = new SwooleCoroutineWaitGroup();
$wg->add();
$wg->add();
go(function() use($wg){
echo "协程1,cid:" . Co::getCid() , " start", PHP_EOL;
sleep(1);
echo "协程1,cid:" . Co::getCid() , " end", PHP_EOL;
$wg->done();
});
go(function()use($wg){
echo "协程2,cid:" . Co::getCid() , " start", PHP_EOL;
sleep(2);
echo "协程2,cid:" . Co::getCid() , " end", PHP_EOL;
$wg->done();
});
$wg->wait(); // wait1
echo "继续执行",PHP_EOL;
$wg->add();
go(function()use($wg){
echo "协程3,cid:" . Co::getCid() , " start", PHP_EOL;
sleep(3);
echo "协程3,cid:" . Co::getCid() , " end", PHP_EOL;
$wg->done();
});
$wg->wait();
});
//协程1,cid:2 start
//协程2,cid:3 start
//协程1,cid:2 end
//协程2,cid:3 end
//继续执行
//协程3,cid:4 start
//协程3,cid:4 end
直接看代码和运行结果,你能猜到这是在干嘛吗?好吧,我其实也没学完 Go ,但是咱们 PHPer 对前端还是了解一些的吧,async await 了解过不?或者说 Promise 总听说过吧。这里其实就和 JS 的 Promise 的效果很像。
我们首先实例化一个 SwooleCoroutineWaitGroup 对象。然后通过 add() 方法添加引用计数。因为要实现两个协程,我们就需要添加两个引用计数。然后在协程内部,通过 done() 方法标明这个协程执行完了,或者说也可以认为它会将引用计数减少。
然后在外部调用 wait() 的时候,会等待引用计数归零,才会继续执行后面的代码。
再说得通俗点,如果没有 wait() 那么这两个协程执行之后,后面的代码也会紧跟着执行,马上就会输入 “继续执行” 四个字,并且后面的协程3也会开始运行。但是,有中间的那个 wait() 的话,整个协程容器就会等待前面两个协程完成执行之后,也就是 done() 完了,才会继续执行后面的代码。
你看,这是不是真的非常像 Promise 的功能。JS 中引入这个功能是为了解决什么问题呢?那就是异步执行同步返回的功能。在前端页面上,多个 Ajax 请求同时发出,返回时间是不确定的,而我们的前端业务可能是需要所有的请求都返回结果之后,才能进行后续的操作,这时候就可以用 Promise 来实现这样的功能了。WaitGroup 也是同样的概念。
上面的例子中,输出的结果很清晰的就能看出来,协程1和协程2都end之后,才打印了“继续执行”,并开始执行协程3。假如你注释掉中间的那个 wait() ,也就是注释了 wait1 的那个,那么输出的结果就会是这样的。
代码语言:javascript复制// 注释中间 wait
//协程1,cid:2 start
//协程2,cid:3 start
//继续执行
//协程3,cid:4 start
//协程1,cid:2 end
//协程2,cid:3 end
//协程3,cid:4 end
能看出来不同吧。如果 add() 和 done() 的数量不一样,都会报错,因此这两个方法是成对出现了,有一个 add() 就要有一个 done() 去清理计数。wait() 方法中其实是有一个 Channel 的 pop() 在等待阻塞,当引用计数不为 0 的时候,这个 pop() 就一直阻塞着,而当计数为 0 后,就直接返回。
为什么我知道它的实现呢?WaitGroup 组件是纯 PHP 代码实现的,也是包含在协程的 Library 工具包中的,大家可以自己去看它的源码,真的就是通过 Channel 实现的,而且非常简单好懂,Github 地址在文末。
包括 Go 语言,其实也更推荐的是通过 Channel 来进行协程管理的,因此,咱们在 Swoole 中,也尽量多使用 Channel 吧,毕竟万一将来要学 Go 呢?这不就水到渠成了嘛!
总结
今天学习的内容非常重要,它是我们后面要学习的 并发调用 和 连接池 的基础。所幸的是,这两个东西理解难度不是那么大,但确实还是有一定的难度,如果你现在正在学 Go ,建议一起看,一起学,效果更好哦。
Channel 就是一个协程间可以共享通信的队列系统,非常类似于我们进程中的队列消息通信。而 WaitGroup 则是实现了一个类似于 Promise 的功能,实现异步并发同步返回的效果,它的底层实际上还是 Channel 。
测试代码:
https://github.com/zhangyue0503/swoole/blob/main/4.Swoole协程/source/4.4协程间通信Channel及WithGroup.php
参考文档:
https://wiki.swoole.com/#/coroutine/channel
https://wiki.swoole.com/#/coroutine/wait_group
WaitGroup源码:https://github.com/swoole/library/blob/master/src/core/Coroutine/WaitGroup.php