C actor framework简单使用
- 简介
- 下载/编译/安装
- 下载
- 编译/安装
- 简单使用
- 单次调用,没有使用线程池
- 复杂使用
- 思路
- main.cpp 内容如下
- msgHandle.h
- msgHandle.cpp
- blocking_actor 类型说明
- 释义
- 函数说明
- 其他loop 类型
- 其他说明
简介
CAF 是 C Actor 模型框架,借鉴了 erlang 和 akka 的 actor 思想。有强 C 11 特性。 特点是:轻量级,分布式,简单,可适应以及无锁。 官方文档:https://actor-framework.readthedocs.io/en/latest/index.html Github地址: https://github.com/actor-framework/actor-framework wike地址: https://github.com/actor-framework/actor-framework/wiki
下载/编译/安装
下载
Linux – Git 下载方式
CAF GitHub 地址 : https://github.com/actor-framework/actor-framework
Git 下载 ,下载源码 master :
备注:还需要安装 boost 库
git clone https://github.com/actor-framework/actor-framework.git
下载所有的 模块和 库。
代码语言:javascript复制git clone --recursive https://github.com/actor-framework/actor-framework.git
编译/安装
在linux 系统下载好后。在当前目录下,能看到 actor-framework 文件夹。 执行如下命令:
代码语言:javascript复制cd actor-framework #进入caf 文件夹
./configure #执行configure 脚本,配置对应属性,按照默认属性配置
make #编译
make install #安装命令
make uninstall #卸载命令,如需卸载,执行此命令就可以。
简单使用
单次调用,没有使用线程池
代码语言:javascript复制#include <iostream>
#include <string>
#include <caf/all.hpp>
#include <caf/io/all.hpp>
caf::behavior msgHandle(caf::event_based_actor *self)
{
return{
[=](std::string &what)
{
std::cout<<"string type msg is :"<<what<<std::endl;
},
[=](int &value)
{
std::cout<<"int type msg is:"<<value<<std::endl;
}
};
}
int main()
{
std::cout<<"main.cpp"<<std::endl;
caf::actor_system_config cfg;
caf::actor_system system(cfg);
auto a1 = system.spawn(msgHandle);
caf::anon_send(a1,"String 类型消息");
caf::anon_send(a1,123456789);
return 0;
}
编译:
代码语言:javascript复制g -g main.cpp -lcaf_core -o main
输出如下:
代码语言:javascript复制main.cpp
string type msg is :String 类型消息
int type msg is:123456789
复杂使用
主要针对 : event_based_actor 类型说明
思路
启用线程操作,根据项目的实际需求使用,定义不同的消息注释,用来接收处理。 增加了如下文件:msgHandle.cpp msgHandle.h
main.cpp 内容如下
代码语言:javascript复制#include <iostream>
#include <string>
#include <caf/all.hpp>
#include <caf/io/all.hpp>
#include <boost/shared_ptr.hpp>
#include "msgHandle.h"
int main()
{
std::cout<<"main.cpp"<<std::endl;
//spawn some actors
caf::actor_system_config cfg;
caf::actor_system system(cfg);
//an actor executed in the current thread
caf::scoped_actor self(system);
msgHandelActor = boost::shared_ptr<caf::actor>(new caf::actor(self->spawn<caf::detached>(msgHandleFuntion)));
ActorRun::instance()->startActorOne("output:startOne");
ActorRun::instance()->startActorTwo("output:startTwo ",123);
ActorRun::instance()->startActorDelay(false,5,"output: ",0);
self->await_all_other_actors_done();
system.await_all_actors_done();
return 0;
}
msgHandle.h
· Atoms / 消息体智能指针 /behavior
代码语言:javascript复制//定义actor 智能指针 msgHandleActor
extern boost::shared_ptr<caf::actor> msgHandelActor;
//定义原子-消息注释 actorOne,actorTwo,actorDelay
// 注意消息内容长度小于11
using actorOne = caf::atom_constant<caf::atom("actorOne")>;
using actorTwo = caf::atom_constant<caf::atom("actorTwo")>;
using actorDelay = caf::atom_constant<caf::atom("catorDelay")>;
//函数声明 msgHandleFuntion
caf::behavior msgHandleFuntion(caf::event_based_actor *self);
- 执行类 - 详细注释不做说明。
class ActorRun{
public:
ActorRun();
~ActorRun();
static ActorRun* instance();
void startActorOne(std::string what);
void startActorTwo(std::string what,int value);
/**
* @brief startActorDelay --延时发送处理函数
* @param type --延时类型 0:毫秒 1:秒
* @param step --延时步长
* @param what --消息内容 -- string类型值
* @param value --消息内容 -- double类型值
*/
void startActorDelay(bool type,int step,std::string what,double value);
private:
// 内部计数值。循环次数
int _count;
};
msgHandle.cpp
- 消息处理函数
caf::behavior msgHandleFuntion(caf::event_based_actor *self)
{
return{
[=](actorOne it,std::string &what)
{
std::cout<<"actorOne: "<<what<<std::endl;
},
[=](actorTwo it,std::string &what,int &value)
{
std::cout<<"actorTwo: "<<what<<"value:"<<value<<std::endl;
},
[=](actorDelay it,std::string &what,int &count,double &value)
{
std::cout<<"actorDelay: "<<"count:"<<count<<" text:"<< what<<" value:"<<value<<std::endl;
ActorRun::instance()->startActorDelay(true,5,"output: ",1.11*(count 1));
}
};
}
atom 对应消息发送函数
代码语言:javascript复制void ActorRun::startActorOne(std::string what)
{
caf::anon_send(*msgHandelActor,actorOne::value,what);
}
void ActorRun::startActorTwo(std::string what,int value)
{
caf::anon_send(*msgHandelActor,actorTwo::value,what,value);
}
void ActorRun::startActorDelay(bool type,int step,std::string what,double value)
{
if(_count == 10) return;
if(!type)
caf::delayed_anon_send(*msgHandelActor,std::chrono::milliseconds(step),actorDelay::value,what,_count,value);
else
caf::delayed_anon_send(*msgHandelActor,std::chrono::seconds(step),actorDelay::value,what,_count,value);
_count ;
}
- 编译命令
g -g main.cpp msgHandle.cpp -lcaf_core -o main
- 运行结果
main.cpp
actorOne: output:startOne
actorTwo: output:startTwo value:123
actorDelay: count:0 text:output: value:0
actorDelay: count:1 text:output: value:1.11
actorDelay: count:2 text:output: value:2.22
actorDelay: count:3 text:output: value:3.33
actorDelay: count:4 text:output: value:4.44
actorDelay: count:5 text:output: value:5.55
actorDelay: count:6 text:output: value:6.66
actorDelay: count:7 text:output: value:7.77
actorDelay: count:8 text:output: value:8.88
actorDelay: count:9 text:output: value:9.99
blocking_actor 类型说明
释义
Blocking actors always run in a separate thread and are not scheduled by CAF. Unlike event-based actors, blocking actors have explicit, blocking receive functions. Further, blocking actors do not handle system messages automatically via special-purpose callbacks (see Default and System Message Handlers). This gives users full control over the behavior of blocking actors. However, blocking actors still should follow conventions of the actor system. For example, actors should unconditionally terminate after receiving an exit_msg with reason exit_reason::kill
。
具体翻译不做说明,大概功能,类似与线程中,阻塞锁,在生命周期内,一直处于活动状态,直到满足特定条件,退出。
函数说明
- 接收函数
void blockingCalculatorFuntion(caf::blocking_actor *self)
{
bool running = true;
self->receive_while(running)
(
[](actorOne it,std::string &what)
{
std::cout<<"Block: -- "<<"actorOne: "<<what<<std::endl;
},
[](actorTwo it,std::string &what,int &value)
{
std::cout<<"Block: -- "<<"actorTwo: "<<what<<"value:"<<value<<std::endl;
},
//退出条件,直到接收到 actorLoop 原子消息,退出
[&](actorLoop it,std::string &what)
{
std::cout<<"Block: -- "<<"Exit!"<<std::endl;
running =false;
}
);
}
- 消息指针定义,原子消息注释定义
extern boost::shared_ptr<caf::actor> msgHandelLoopActor;
using actorLoop = caf::atom_constant<caf::atom("actorLoop")>;
消息发送函数
代码语言:javascript复制void ActorRun::startActorLoop_One()
{
caf::anon_send(*msgHandelLoopActor,actorOne::value,"Running");
}
//退出函数
void ActorRun::startActorLoop()
{
caf::anon_send(*msgHandelLoopActor,actorLoop::value,"Exit");
}
- 消息指针 spwan
msgHandelLoopActor = boost::shared_ptr<caf::actor>(new caf::actor(self->spawn<caf::detached>(blockingCalculatorFuntion)));
- 消息发送-- main函数内部
ActorRun::instance()->startActorLoop_One();
ActorRun::instance()->startActorLoop();
ActorRun::instance()->startActorLoop_One();
- 执行结果如下:
main.cpp
Block: -- actorOne: Running
Block: -- Exit!
blocking_actor ,在退出之后,结束生命周期,处于disable状态,再次发送消息,不会响应。在生产环境中使用,看个人理解了。
其他loop 类型
主要有三种循环接收,receive_while,
receive_for
and do_receive
。很直观,while,for,do-while
官方样例如下:
while
size_t received = 0;
receive_while([&] { return received < 10; }) (
[&](int) {
received;
}
);
for
代码语言:javascript复制 std::vector<int> results;
size_t i = 0;
receive_for(i, 10) (
[&](int value) {
results.push_back(value);
}
);
do-while
代码语言:javascript复制 size_t received = 0;
do_receive (
[&](int) {
received;
}
).until([&] { return received >= 10; });
使用哪种循环方式,根据个人需求来使用了。
其他说明
一些特性说明: https://blog.csdn.net/xzwdev/article/details/41700001 还有一些其他功能.日志输出,I/O功能。 一些相关特性:同步发送,异步发送,消息跳过没有做相关说明,可以参考官方文档。