【swoole4.0】实现Actor并发模型

2019-06-14 13:07:17 浏览数 (1)

什么是Actor?

Actor概念来源于Erlang, 对于PHPer来说,可能会比较陌生,写过Java的同学会比较熟悉,Java一直都有线程的概念(虽然PHP有Pthread,但不普及),它是一种非共享内存的并发模型,每个Actor内的数据独立存在,Actor之间通过消息传递的形式进行交互调度,且Actor是一种高度抽象化的编程模型,非常适合于游戏、硬件行业。

Swoole协程与信箱

得益于Swoole4.x,我们可以基于Swoole的协程与Channel快速实现一个信箱模式调度。模拟代码如下:

代码语言:javascript复制
use SwooleCoroutineChannel;
go(function (){
    //创建十个信箱通道
    $mailBoxes = [];
    for ($i = 1;$i <= 10;$i  ){
        $mailBoxes[$i] = new Channel(16);
    }
    //模拟master 邮局调度,随机像一个信箱投递消息
    go(function ()use($mailBoxes){
        while (1){
            co::sleep(2);
            $key = rand(1,10);
            ($mailBoxes[$key])->push(time());
        }
    });
    //模拟actor 实体消费
    for ($i = 1;$i <= 10;$i  ){
        go(function ()use($mailBoxes,$i){
            while (1){
                $msg = ($mailBoxes[$i])->pop();
                echo "Actor {$i} recv msg : {$msg} n";
            }
        });
    }
});

以上代码执行输出:

代码语言:javascript复制
php test.php 
Actor 8 recv msg : 1559622691 
Actor 10 recv msg : 1559622693 
Actor 1 recv msg : 1559622695 
Actor 5 recv msg : 1559622697 

协程通道每次在POP遇到无数据的时候,都会自动让出执行权(具体可以去看Swoole协程调度)

Actor库

基于上面的原理,我们实行了一个多进程分布的协程Actor库

代码语言:javascript复制
composer require easyswoole/actor=2.x-dev

我们依赖dev库进行测试,生产可以自己依赖stable版本

进程关系

Easyswoole的Actor模型中,存在两组进程,一组是proxy进程,用来实现Actor对外服务,一组是worker进程,proxy进程与worker进程之间通过unixsock进行通讯,而Actor实例就均匀的分布worker之中。

样例代码

比如在一个聊天室中,我们可以定义一个房间模型。

代码语言:javascript复制
namespace EasySwooleActorTest;


use EasySwooleActorAbstractActor;
use EasySwooleActorActorConfig;

class RoomActor extends AbstractActor
{
    public static function configure(ActorConfig $actorConfig)
    {
        $actorConfig->setActorName('Room');
    }
    public function onStart()
    {
        //每当一个RoomActor实体被创建的时候,都会执行该回调
        var_dump('room actor '.$this->actorId().' start');
    }
    public function onMessage($msg)
    {
        //每当一个RoomActor实体收到外部消息的时候,都会执行该回调当
        var_dump('room actor '.$this->actorId().' onmessage: '.$msg);
        return 'reply at '.time();
    }
    public function onExit($arg)
    { 
        //每当一个RoomActor实体退出的时候,都会执行该回调
        var_dump('room actor '.$this->actorId().' exit at arg: '.$arg);
        return 'exit at '.time();
    }
    protected function onException(Throwable $throwable)
    {
        //每当一个RoomActor出现异常的时候,都会执行该回调
        var_dump($throwable->getMessage());
    }
}

在cli模式下创建一个Actor服务

代码语言:javascript复制
use EasySwooleActorActor;
use EasySwooleActorTestRoomActor;
use EasySwooleActorProxyProcess;

Actor::getInstance()->register(RoomActor::class);
$list = Actor::getInstance()->generateProcess();

foreach ($list['proxy'] as  $proxy){
    /** @var ProxyProcess $proxy */
    $proxy->getProcess()->start();
}
foreach ($list['worker'] as $actors){
    foreach ($actors as $actorProcess){
        /** @var ProxyProcess $actorProcess */
        $actorProcess->getProcess()->start();
    }
}
while($ret = SwooleProcess::wait()) {
    echo "PID={$ret['pid']}n";
}

创建一个cli测试脚本

代码语言:javascript复制
use EasySwooleActorActor;
use EasySwooleActorTestRoomActor;
Actor::getInstance()->register(RoomActor::class);

go(function (){
    $actorId = RoomActor::client()->create('create arg1');
    var_dump($actorId);
    co::sleep(3);
    var_dump(RoomActor::client()->send($actorId,'this is msg'));
    co::sleep(3);
    var_dump(RoomActor::client()->exit($actorId,'this is exit arg'));
    co::sleep(3);
    RoomActor::client()->create('create arg2');
    co::sleep(3);
    RoomActor::client()->create('create arg3');
    co::sleep(3);
    var_dump(RoomActor::client()->sendAll('sendAll msg'));
    co::sleep(3);
    var_dump(RoomActor::client()->status());
    co::sleep(3);
    var_dump(RoomActor::client()->exitAll('sendAll exit'));
});

以上代码执行结果如下: 服务端

代码语言:javascript复制
php test.php 
string(40) "room actor 00101000000000000000001 start"
string(57) "room actor 00101000000000000000001 onmessage: this is msg"
string(64) "room actor 00101000000000000000001 exit at arg: this is exit arg"
string(40) "room actor 00101000000000000000002 start"
string(40) "room actor 00103000000000000000001 start"
string(57) "room actor 00101000000000000000002 onmessage: sendAll msg"
string(57) "room actor 00103000000000000000001 onmessage: sendAll msg"
string(60) "room actor 00101000000000000000002 exit at arg: sendAll exit"
string(60) "room actor 00103000000000000000001 exit at arg: sendAll exit"

客户端

代码语言:javascript复制
php test2.php 
string(23) "00101000000000000000001"
string(19) "reply at 1559623925"
string(18) "exit at 1559623928"
bool(true)
array(3) {
  [1]=>
  int(1)
  [2]=>
  int(0)
  [3]=>
  int(1)
}
bool(true)

更多细节可以在EasySwoole项目官网得到文档支持 http://easyswoole.com/ 喜欢EasySwoole项目的,可以给个star https://github.com/easy-swoole

0 人点赞