CAF:c++ actor framework

2023-03-10 13:23:38 浏览数 (1)

C actor framework简单使用

  • 简介
  • 下载/编译/安装
    • 下载
    • 编译/安装
  • 简单使用
    • 单次调用,没有使用线程池
    • 复杂使用
      • 思路
        • main.cpp 内容如下
        • msgHandle.h
        • msgHandle.cpp
      • blocking_actor 类型说明
        • 释义
        • 函数说明
      • 其他loop 类型
  • 其他说明

简介

CAF 是 C Actor 模型框架,借鉴了 erlang 和 akka 的 actor 思想。有强 C 11 特性。 特点是:轻量级,分布式,简单,可适应以及无锁。 官方文档:https://actor-framework.readthedocs.io/en/latest/index.html Github地址: https://github.com/actor-framework/actor-framework wike地址: https://github.com/actor-framework/actor-framework/wiki

下载/编译/安装

下载

Linux – Git 下载方式 CAF GitHub 地址 : https://github.com/actor-framework/actor-framework Git 下载 ,下载源码 master : 备注:还需要安装 boost

代码语言:javascript复制
git clone https://github.com/actor-framework/actor-framework.git

下载所有的 模块和 库。

代码语言:javascript复制
git clone --recursive https://github.com/actor-framework/actor-framework.git

编译/安装

在linux 系统下载好后。在当前目录下,能看到 actor-framework 文件夹。 执行如下命令:

代码语言:javascript复制
cd actor-framework #进入caf 文件夹
./configure #执行configure 脚本,配置对应属性,按照默认属性配置
make 	#编译
make install #安装命令
make uninstall #卸载命令,如需卸载,执行此命令就可以。

简单使用

单次调用,没有使用线程池

代码语言:javascript复制
#include <iostream>
#include <string>
#include <caf/all.hpp>
#include <caf/io/all.hpp>

caf::behavior msgHandle(caf::event_based_actor *self)
{
    return{
        [=](std::string &what)
        {
            std::cout<<"string type msg is :"<<what<<std::endl;
        },
        [=](int &value)
        {
            std::cout<<"int type msg is:"<<value<<std::endl;
        }
    };   
}

int main()
{
    std::cout<<"main.cpp"<<std::endl;
    caf::actor_system_config cfg;
    caf::actor_system system(cfg);
    auto a1 = system.spawn(msgHandle);
    caf::anon_send(a1,"String 类型消息");
    caf::anon_send(a1,123456789);

    return 0;
}

编译:

代码语言:javascript复制
g   -g main.cpp -lcaf_core -o main

输出如下:

代码语言:javascript复制
main.cpp
string type msg is :String 类型消息
int type msg is:123456789

复杂使用

主要针对 : event_based_actor 类型说明

思路

启用线程操作,根据项目的实际需求使用,定义不同的消息注释,用来接收处理。 增加了如下文件:msgHandle.cpp msgHandle.h

main.cpp 内容如下
代码语言:javascript复制
#include <iostream>
#include <string>
#include <caf/all.hpp>
#include <caf/io/all.hpp>
#include <boost/shared_ptr.hpp>

#include "msgHandle.h"

int main()
{
    std::cout<<"main.cpp"<<std::endl;
    //spawn some actors
    caf::actor_system_config cfg;
    caf::actor_system system(cfg);
    //an actor executed in the current thread
    caf::scoped_actor self(system);
    
    msgHandelActor = boost::shared_ptr<caf::actor>(new caf::actor(self->spawn<caf::detached>(msgHandleFuntion)));

    ActorRun::instance()->startActorOne("output:startOne");
    ActorRun::instance()->startActorTwo("output:startTwo ",123);
    ActorRun::instance()->startActorDelay(false,5,"output: ",0);

    self->await_all_other_actors_done();
    system.await_all_actors_done();
    return 0;
}
msgHandle.h

· Atoms / 消息体智能指针 /behavior

代码语言:javascript复制
//定义actor 智能指针 msgHandleActor
extern boost::shared_ptr<caf::actor> msgHandelActor;
//定义原子-消息注释 actorOne,actorTwo,actorDelay 
// 注意消息内容长度小于11
using actorOne = caf::atom_constant<caf::atom("actorOne")>;
using actorTwo = caf::atom_constant<caf::atom("actorTwo")>;
using actorDelay = caf::atom_constant<caf::atom("catorDelay")>;
//函数声明 msgHandleFuntion 
caf::behavior msgHandleFuntion(caf::event_based_actor *self);
  • 执行类 - 详细注释不做说明。
代码语言:javascript复制
class ActorRun{
    public:
        ActorRun();
        ~ActorRun();
        static ActorRun* instance();
        void startActorOne(std::string what);
        void startActorTwo(std::string what,int value);
         /**
         * @brief startActorDelay  --延时发送处理函数
         * @param type      --延时类型 0:毫秒 1:秒 
         * @param step      --延时步长
         * @param what      --消息内容 -- string类型值
         * @param value     --消息内容 -- double类型值
         */
        void startActorDelay(bool type,int step,std::string what,double value);
    private:
    // 内部计数值。循环次数
    int _count;
};
msgHandle.cpp
  • 消息处理函数
代码语言:javascript复制
caf::behavior msgHandleFuntion(caf::event_based_actor *self)
{
    return{
        [=](actorOne it,std::string &what)
        {
            std::cout<<"actorOne: "<<what<<std::endl;
        },
        [=](actorTwo it,std::string &what,int &value)
        {
            std::cout<<"actorTwo: "<<what<<"value:"<<value<<std::endl;
        },
        [=](actorDelay it,std::string &what,int &count,double &value)
        {
            std::cout<<"actorDelay: "<<"count:"<<count<<" text:"<< what<<" value:"<<value<<std::endl;
            ActorRun::instance()->startActorDelay(true,5,"output: ",1.11*(count 1));
        }
    };
}

atom 对应消息发送函数

代码语言:javascript复制
void ActorRun::startActorOne(std::string what)
{
    caf::anon_send(*msgHandelActor,actorOne::value,what);
}

void ActorRun::startActorTwo(std::string what,int value)
{
    caf::anon_send(*msgHandelActor,actorTwo::value,what,value);
}

void ActorRun::startActorDelay(bool type,int step,std::string what,double value)
{
    if(_count == 10) return;
    if(!type) 
        caf::delayed_anon_send(*msgHandelActor,std::chrono::milliseconds(step),actorDelay::value,what,_count,value);
    else
        caf::delayed_anon_send(*msgHandelActor,std::chrono::seconds(step),actorDelay::value,what,_count,value);
    _count   ;
}
  • 编译命令
代码语言:javascript复制
g   -g main.cpp msgHandle.cpp -lcaf_core -o main
  • 运行结果
代码语言:javascript复制
main.cpp
actorOne: output:startOne
actorTwo: output:startTwo value:123
actorDelay: count:0 text:output:  value:0
actorDelay: count:1 text:output:  value:1.11
actorDelay: count:2 text:output:  value:2.22
actorDelay: count:3 text:output:  value:3.33
actorDelay: count:4 text:output:  value:4.44
actorDelay: count:5 text:output:  value:5.55
actorDelay: count:6 text:output:  value:6.66
actorDelay: count:7 text:output:  value:7.77
actorDelay: count:8 text:output:  value:8.88
actorDelay: count:9 text:output:  value:9.99

blocking_actor 类型说明

释义

Blocking actors always run in a separate thread and are not scheduled by CAF. Unlike event-based actors, blocking actors have explicit, blocking receive functions. Further, blocking actors do not handle system messages automatically via special-purpose callbacks (see Default and System Message Handlers). This gives users full control over the behavior of blocking actors. However, blocking actors still should follow conventions of the actor system. For example, actors should unconditionally terminate after receiving an exit_msg with reason exit_reason::kill。 具体翻译不做说明,大概功能,类似与线程中,阻塞锁,在生命周期内,一直处于活动状态,直到满足特定条件,退出。

函数说明
  • 接收函数
代码语言:javascript复制
void blockingCalculatorFuntion(caf::blocking_actor *self)
{
    bool running = true;
    self->receive_while(running)
    (
        [](actorOne it,std::string &what)
        {
            std::cout<<"Block: -- "<<"actorOne: "<<what<<std::endl;
        },
        [](actorTwo it,std::string &what,int &value)
        {
            std::cout<<"Block: -- "<<"actorTwo: "<<what<<"value:"<<value<<std::endl;
        },
        //退出条件,直到接收到 actorLoop 原子消息,退出
        [&](actorLoop it,std::string &what)
        {
            std::cout<<"Block: -- "<<"Exit!"<<std::endl;
            running =false;
        }
    );
}
  • 消息指针定义,原子消息注释定义
代码语言:javascript复制
extern boost::shared_ptr<caf::actor> msgHandelLoopActor;
using actorLoop = caf::atom_constant<caf::atom("actorLoop")>;

消息发送函数

代码语言:javascript复制
void ActorRun::startActorLoop_One()
{
    caf::anon_send(*msgHandelLoopActor,actorOne::value,"Running");
}
//退出函数
void ActorRun::startActorLoop()
{
    caf::anon_send(*msgHandelLoopActor,actorLoop::value,"Exit");
}
  • 消息指针 spwan
代码语言:javascript复制
msgHandelLoopActor = boost::shared_ptr<caf::actor>(new caf::actor(self->spawn<caf::detached>(blockingCalculatorFuntion)));
  • 消息发送-- main函数内部
代码语言:javascript复制
    ActorRun::instance()->startActorLoop_One();
    ActorRun::instance()->startActorLoop();
    ActorRun::instance()->startActorLoop_One();
  • 执行结果如下:
代码语言:javascript复制
main.cpp
Block: -- actorOne: Running
Block: -- Exit!

blocking_actor ,在退出之后,结束生命周期,处于disable状态,再次发送消息,不会响应。在生产环境中使用,看个人理解了。

其他loop 类型

主要有三种循环接收,receive_while, receive_for and do_receive。很直观,while,for,do-while 官方样例如下: while

代码语言:javascript复制
 size_t received = 0;
 receive_while([&] { return received < 10; }) (
   [&](int) {
       received;
   }
 );

for

代码语言:javascript复制
 std::vector<int> results;
 size_t i = 0;
 receive_for(i, 10) (
   [&](int value) {
     results.push_back(value);
   }
 );

do-while

代码语言:javascript复制
 size_t received = 0;
 do_receive (
   [&](int) {
       received;
   }
 ).until([&] { return received >= 10; });

使用哪种循环方式,根据个人需求来使用了。

其他说明

一些特性说明: https://blog.csdn.net/xzwdev/article/details/41700001 还有一些其他功能.日志输出I/O功能。 一些相关特性:同步发送异步发送消息跳过没有做相关说明,可以参考官方文档。

0 人点赞