之前实现过一个swoole生产者消费者模型,有兴趣可以参看这里,这版代码做了如下修改: 1. 生产者放到单独子进程当中,而非像之前那样在主( 父)进程中完成。 虽然功能上没有什么变化,但这样看起来结构更合理一些。 2. 主进程除了生成不同子进程外,还做了一件事:回收僵尸进程。如果程序是长期运行的,这点还是有必要的。
代码如下:
代码语言:javascript复制<?php
abstract class Schedule{
protected $_consumerList = array();
protected $_msgqkey = null;
protected $_consumerNum = 2;
protected $_finishFlag = 'ALLDONE';
public function __construct($cNum = 0){
if ($cNum){
$this->_consumerNum = $cNum;
}
}
public function setConsumerNum($num = 0){
if ($num){
$this->_consumerNum = $num;
return true;
}
return false;
}
public function setFinishFlag($flag = null){
if ($flag){
$this->_finishFlag = $flag;
return true;
}
return false;
}
public function run(){
$this->_consumerList = array();
for($i=0; $i<$this->_consumerNum; $i ){
$consumer = new swoole_process(function($worker){
$this->_consumerFunc($worker);
});
if ($this->_msgqkey){
$consumer->useQueue($this->_msgqkey);
}
else{
$consumer->useQueue();
}
$pid = $consumer->start();
$this->_consumerList[$pid] = $consumer;
}
$producer = new swoole_process(function($worker){
echo "producer begin:n";
$this->_producerFunc($worker);
});
if ($this->_msgqkey){
$producer->useQueue($this->_msgqkey);
}
else{
$producer->useQueue();
}
$pid = $producer->start();
echo sprintf("msgqkey:%sn", $producer->msgQueueKey);
//回收僵尸子进程
swoole_process::signal(SIGCHLD, function($sig){
static $num = 0;
while($ret = swoole_process::wait(false)){
echo "master clear pid:{$ret['pid']}, code:{$ret['code']}n";
$num ;
if ($num == $this->_consumerNum 1){
swoole_event_exit();
}
//echo $num,"n";
}
});
}
protected function _producerFunc($worker){
if ($this->_onlyConsume()){
return;
}
foreach ($this->doProduce($worker) as $data){
$worker->push($data);
}
//任务数据被取完
while(true){
$c = $worker->statQueue();
$n = $c['queue_num'];
if ($n === 0){
break;
}
}
//放入consumer进程结束标识
foreach($this->_consumerList as $pid => $w){
$w->push($this->_finishFlag);
}
//确认结束
while(true){
$c = $worker->statQueue();
$n = $c['queue_num'];
if ($n === 0){
break;
}
}
$worker->freeQueue();
}
protected function _consumerFunc($worker){
while(1){
$data = $worker->pop();
if ($data == $this->_finishFlag){
$pid = $worker->pid;
echo "consumer $pid exitn";
$worker->exit(0);
}
else{
$this->doConsume($data, $worker);
}
}
}
protected function _onlyConsume(){
return !! $this->_msgqkey;
}
abstract protected function doProduce($worker);
abstract protected function doConsume($data, $worker);
}