Swoole基于WaitGroup协程控制

2023-08-17 22:00:19 浏览数 (2)

需求

目前需要有一个批量的并发需求,例如是需要并发群发1000条信息出去。如果传统串行方式执行,需要等待时间会比较长。如果直接使用并发操作,可能一下子执行1000并发可能会导致服务器资源突然飙升,影响正常业务的处理。所以就需要一个能够并发处理,又能合理控制并发数量的执行。

步骤

  1. 先定义一个模拟发送消息的方法
代码语言:javascript复制
    public function testSendMsg($i): bool
    {
        //模拟发送消息操作
        sleep(rand(0, 3));
        var_dump("send {$i} success");

        return true;
    }
  1. 通过协程进行并发操作

这样就不需要等所有消息发送完成就可以直接打印 “do success”。但是会有个问题,要知道最终完成结果需要写入日志,或者查询更新后的状态才知道。

代码语言:javascript复制
    public function index()
    {
        for ($i = 0; $i < 1000; $i  ) {
            //并发1000操作
            go(function () use ($i) {
                $this->testSendMsg($i);
            });
        }
        //无需等待执行完成,就可以返回操作成功
        var_dump('do success');
    }

使用WaitGroup并发协程控制

从并发上看,最大发送时长是3秒钟,其实可以等所有操作都执行完成之后,再打印“do success”的。

代码语言:javascript复制
    public function index()
    {
        $wg = new WaitGroup();
        for ($i = 0; $i < 1000; $i  ) {
            //并发1000操作
            $wg->add();//增加一个等待任务执行数量 默认是1
            go(function () use ($i, $wg) {
                $this->testSendMsg($i);
                $wg->done(); //记录任务执行已经完成
            });
        }
        //声明在这里等待所有 add 进去的任务完成
        $wg->wait();
      
        var_dump('do success');
    }

控制并发数量

上面并发虽然提高了效率,但是如果testSendMsg是一个比较耗时、或者是消耗性能的操作,这时候一下子执行1000个,可能服务器会受不了,导致正常业务也出问题。这时候我们就需要对每次执行并发的数量进行控制。

代码语言:javascript复制
    public function index()
    {
        $wg = new WaitGroup();
        for ($i = 0; $i < 1000; $i  ) {
            //并发1000操作
            $wg->add();//增加一个等待任务执行数量 默认是1
            go(function () use ($i, $wg) {
                $this->testSendMsg($i);
                $wg->done(); //记录任务执行已经完成
            });
            if (($i % 100) == 0) {
                //控制并发,每100个一组进行并发执行
                $wg->wait();
            }
        }
        $wg->wait();

      
        var_dump('do success');
    }

0 人点赞