需求
目前需要有一个批量的并发需求,例如是需要并发群发1000条信息出去。如果传统串行方式执行,需要等待时间会比较长。如果直接使用并发操作,可能一下子执行1000并发可能会导致服务器资源突然飙升,影响正常业务的处理。所以就需要一个能够并发处理,又能合理控制并发数量的执行。
步骤
- 先定义一个模拟发送消息的方法
public function testSendMsg($i): bool
{
//模拟发送消息操作
sleep(rand(0, 3));
var_dump("send {$i} success");
return true;
}
- 通过协程进行并发操作
这样就不需要等所有消息发送完成就可以直接打印 “do success”。但是会有个问题,要知道最终完成结果需要写入日志,或者查询更新后的状态才知道。
代码语言:javascript复制 public function index()
{
for ($i = 0; $i < 1000; $i ) {
//并发1000操作
go(function () use ($i) {
$this->testSendMsg($i);
});
}
//无需等待执行完成,就可以返回操作成功
var_dump('do success');
}
使用WaitGroup并发协程控制
从并发上看,最大发送时长是3秒钟,其实可以等所有操作都执行完成之后,再打印“do success”的。
代码语言:javascript复制 public function index()
{
$wg = new WaitGroup();
for ($i = 0; $i < 1000; $i ) {
//并发1000操作
$wg->add();//增加一个等待任务执行数量 默认是1
go(function () use ($i, $wg) {
$this->testSendMsg($i);
$wg->done(); //记录任务执行已经完成
});
}
//声明在这里等待所有 add 进去的任务完成
$wg->wait();
var_dump('do success');
}
控制并发数量
上面并发虽然提高了效率,但是如果testSendMsg是一个比较耗时、或者是消耗性能的操作,这时候一下子执行1000个,可能服务器会受不了,导致正常业务也出问题。这时候我们就需要对每次执行并发的数量进行控制。
代码语言:javascript复制 public function index()
{
$wg = new WaitGroup();
for ($i = 0; $i < 1000; $i ) {
//并发1000操作
$wg->add();//增加一个等待任务执行数量 默认是1
go(function () use ($i, $wg) {
$this->testSendMsg($i);
$wg->done(); //记录任务执行已经完成
});
if (($i % 100) == 0) {
//控制并发,每100个一组进行并发执行
$wg->wait();
}
}
$wg->wait();
var_dump('do success');
}