OpenSocket
OpenSocket是一个全网最容易实现跨平台的高性能网络并发库。
Linux和安卓用epoll,Win32用IOCP,iOS和Mac用kqueue,其他系统使用select。
结合OpenThread使用,可以轻轻在任意平台(包括移动平台)构建高性能并发服务器。
OpenThread可实现三大多线程设计模式。
https://github.com/OpenMiniServer
跨平台支持
Linux和安卓使用epoll,Windows使用IOCP(wepoll),iOS和Mac使用kqueue,其他系统使用select。
编译和执行
请安装cmake工具,用cmake可以构建出VS或者XCode工程,就可以在vs或者xcode上编译运行。
源代码:https://github.com/OpenMiniServer/opensocket
代码语言:txt复制#克隆项目
git clone https://github.com/OpenMiniServer/opensocket
cd ./opensocket
#创建build工程目录
mkdir build
cd build
cmake ..
#如果是win32,在该目录出现opensocket.sln,点击它就可以启动vs写代码调试
make
./helloworld
./httpclient #高并发Http客户端
./httpserver #高并发Http服务器
#浏览器访问:http://127.0.0.1:8888
./server #高并发服务器框架
全部源文件
- src/socket_os.h
- src/socket_os.c
- src/opensocket.h
- src/opensocket.cpp
- src/wepoll.h(only win32)
- src/wepoll.c(only win32)
技术特点
OpenSocket的技术特点:
- 跨平台设计,提供Linux统一的socket接口。
- Linux和安卓使用epoll,Windows使用IOCP(wepoll),iOS和Mac使用kqueue,其他系统使用select。
- 支持IPv6,小巧迷你,配合OpenThread的多线程三大设计模式,轻轻实现高性能网络。
1.HelloWorld
使用OpenThread创建3条线程:listen,accept和client。
- 线程listen专门负责监听,把监听到新客户端连接,发送给accept线程。
- 线程accept可以有多个,收到listen线程发送过来的新socket事件,打开socket连接,与客户端通信。
- 线程client表示客户端,client的socket向 listen请求连接,listen把请求连接发给其中一个accept线程,accept线程接到连接后,与客户端通信。
#include <assert.h>
#include <time.h>
#include <math.h>
#include <string.h>
#include "opensocket.h"
#include "open/openthread.h"
using namespace open;
const std::string TestServerIp_ = "0.0.0.0";
const std::string TestClientIp_ = "127.0.0.1";
const int TestServerPort_ = 8888;
//全局的OpenSocket对象。
OpenSocket openSocket_;
//线程之间交换数据结构
struct ProtoBuffer
{
bool isSocket_;
int acceptFd_;
std::string addr_;
std::shared_ptr<OpenSocketMsg> data_;
ProtoBuffer() :
isSocket_(0), acceptFd_(0) {}
};
//OpenSocket线程,不允许有业务处理,消息需要立刻派发到其他线程
static void SocketFunc(const OpenSocketMsg* msg)
{
if (!msg) return;
if (msg->uid_ < 0)
{
delete msg; return;
}
auto proto = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
proto->isSocket_ = true;
proto->data_ = std::shared_ptr<OpenSocketMsg>((OpenSocketMsg*)msg);
//把proto发到 msg->uid_指定的线程。
bool ret = OpenThread::Send((int)msg->uid_, proto);
if (!ret)
{
printf("SocketFunc dispatch faild pid = %lldn", msg->uid_);
}
}
// 监听网络连接的线程
static int listen_fd_ = 0;
void ListenThread(OpenThreadMsg& msg)
{
//线程id
int pid = msg.pid();
//线程名称
auto& pname = msg.name();
assert(pname == "listen");
//线程启动消息
if (msg.state_ == OpenThread::START)
{
//等待accept线程启动完,才继续执行
while (OpenThread::ThreadId("accept") < 0) OpenThread::Sleep(100);
//监听IP和端口,内部会执行bind操作。
listen_fd_ = openSocket_.listen((uintptr_t)pid, TestServerIp_, TestServerPort_, 64);
if (listen_fd_ < 0)
{
printf("Listen::START faild listen_fd_ = %dn", listen_fd_);
assert(false);
}
//把listen_fd加入到poll监听事件,并绑定线程pid,该socket任何事件都会发到该线程(listen线程)。
openSocket_.start((uintptr_t)pid, listen_fd_);
}
//其他线程发过来的消息
else if (msg.state_ == OpenThread::RUN)
{
const ProtoBuffer* proto = msg.data<ProtoBuffer>();
if (!proto || !proto->isSocket_ || !proto->data_) return;
//openSocket_.start 开启poll监听socket事件,并指定线程id。故此处会收到新客户端socket连接的消息
auto& socketMsg = proto->data_;
switch (socketMsg->type_)
{
//监听客户端连接事件,把事件发给accept线程。
case OpenSocket::ESocketAccept:
{
printf("Listen::RUN [%s]ESocketAccept: new client. acceptFd:%d, client:%sn", pname.c_str(), socketMsg->ud_, socketMsg->info());
auto proto = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
proto->addr_ = socketMsg->info();
proto->isSocket_ = false;
proto->acceptFd_ = socketMsg->ud_;
//把客户端的fd和ip发给accept线程。
bool ret = OpenThread::Send("accept", proto);
assert(ret);
}
break;
case OpenSocket::ESocketClose:
printf("Listen::RUN [%s]ESocketClose:linten close, listenFd:%dn", pname.c_str(), socketMsg->fd_);
break;
case OpenSocket::ESocketError:
printf("Listen::RUN [%s]ESocketError:%sn", pname.c_str(), socketMsg->info());
break;
case OpenSocket::ESocketWarning:
printf("Listen::RUN [%s]ESocketWarning:%sn", pname.c_str(), socketMsg->info());
break;
case OpenSocket::ESocketOpen:
printf("Listen::RUN [%s]ESocketOpen:linten open, listenFd:%dn", pname.c_str(), socketMsg->fd_);
break;
//因为listen线程只有listen,故不会收到socket数据。
case OpenSocket::ESocketData:
case OpenSocket::ESocketUdp:
assert(false);
break;
default:
break;
}
}
//线程退出消息
else if (msg.state_ == OpenThread::STOP)
{
//线程退出的时候,关闭listen监听socket
openSocket_.close(pid, listen_fd_);
}
}
// Accept线程,接收到新客户端连接,开启socket通信。
void AcceptThread(OpenThreadMsg& msg)
{
int pid = msg.pid();
auto& pname = msg.name();
assert(pname == "accept");
if (msg.state_ == OpenThread::START)
{
}
else if (msg.state_ == OpenThread::RUN)
{
const ProtoBuffer* proto = msg.data<ProtoBuffer>();
if (!proto) return;
//会收到两种消息,一种是listen线程发过来的消息,另一种是OpenSocket发过来的消息
if (!proto->isSocket_)
{
//listen线程发过来的消息
printf("Accept::RUN [%s]open accept client:%sn", pname.c_str(), proto->addr_.c_str());
//拿到客户端的fd,加入到poll监听,并绑定pid指定的线程ID(即Accept线程)。
//poll监听到任何事件都发到这个线程。
openSocket_.start(pid, proto->acceptFd_);
}
else
{
//OpenSocket发过来的消息
if (!proto->data_) return;
auto& socketMsg = proto->data_;
switch (socketMsg->type_)
{
//客户端发过来的socket数据流
case OpenSocket::ESocketData:
{
//recevie from client
{
//本次接收到socket的数据大小
auto size = socketMsg->size();
//本次接收到socket的数据
auto data = socketMsg->data();
assert(size >= 4);
int len = *(int*)data;
std::string buffer;
buffer.append(data 4, len);
assert(buffer == "Waiting for you!");
}
//接到客户端socket消息后,立刻给客户端发消息。
//response to client
{
char buffer[256] = { 0 };
std::string tmp = "Of Course,I Still Love You!";
*(int*)buffer = (int)tmp.size();
memcpy(buffer 4, tmp.data(), tmp.size());
//给客户端发消息,需要指定该客户端的fd
openSocket_.send(socketMsg->fd_, buffer, (int)(4 tmp.size()));
}
}
break;
case OpenSocket::ESocketOpen:
//拿到客户端的fd,加入到poll监听,成功就会收到该消息。
printf("Accept::RUN [%s]ESocketClose:accept client open, acceptFd:%dn", pname.c_str(), socketMsg->fd_);
break;
case OpenSocket::ESocketClose:
printf("Accept::RUN [%s]ESocketClose:accept client close, acceptFd:%dn", pname.c_str(), socketMsg->fd_);
break;
case OpenSocket::ESocketError:
printf("Accept::RUN [%s]ESocketError:accept client %sn", pname.c_str(), socketMsg->info());
break;
case OpenSocket::ESocketWarning:
printf("Accept::RUN [%s]ESocketWarning:%sn", pname.c_str(), socketMsg->info());
break;
//因为accept线程没有绑定listen的socket,故不会收到该消息。
case OpenSocket::ESocketAccept:
case OpenSocket::ESocketUdp:
assert(false);
break;
default:
break;
}
}
}
}
//客户端线程,模拟一个客户端。可以拆开,用另一个进程实现,模拟客户端访问服务器。
static int client_fd_ = 0;
void ClientThread(OpenThreadMsg& msg)
{
//客户端线程id
int pid = msg.pid();
//客户端线程名称
auto& pname = msg.name();
assert(pname == "client");
if (msg.state_ == OpenThread::START)
{
//因为在同一个进程,所以,需要等listen等线程启动完毕,才能连接。
while (OpenThread::ThreadId("accept") < 0) OpenThread::Sleep(100);
//连接服务器ip和端口,并把fd与客户端线程绑定,fd的任何信息都会发到客户端线程。
client_fd_ = openSocket_.connect(pid, TestClientIp_, TestServerPort_);
}
else if (msg.state_ == OpenThread::RUN)
{
//作为客户端线程,只有OpenSocket消息。
const ProtoBuffer* proto = msg.data<ProtoBuffer>();
if (!proto || !proto->isSocket_ || !proto->data_) return;
auto& socketMsg = proto->data_;
switch (socketMsg->type_)
{
case OpenSocket::ESocketData:
{
//接收到服务器的socket数据
auto size = socketMsg->size();
auto data = socketMsg->data();
assert(size >= 4);
int len = *(int*)data;
std::string buffer;
buffer.append(data 4, len);
assert(buffer == "Of Course,I Still Love You!");
openSocket_.close(pid, socketMsg->fd_);
OpenThread::StopAll();
}
break;
case OpenSocket::ESocketOpen:
{
//如果与服务器连接成果,就会收到此消息
assert(client_fd_ == socketMsg->fd_);
printf("Client::RUN [%s]ESocketClose:Client client open, clientFd:%dn", pname.c_str(), socketMsg->fd_);
char buffer[256] = {0};
std::string tmp = "Waiting for you!";
*(int*)buffer = (int)tmp.size();
memcpy(buffer 4, tmp.data(), tmp.size());
//与服务器连接成功,就向服务器发消息。发消息只需要指定fd。
openSocket_.send(client_fd_, buffer, (int)(4 tmp.size()));
}
break;
case OpenSocket::ESocketClose:
printf("Client::RUN [%s]ESocketClose:Client client close, clientFd:%dn", pname.c_str(), socketMsg->fd_);
break;
case OpenSocket::ESocketError:
printf("Client::RUN [%s]ESocketError:Client client %sn", pname.c_str(), socketMsg->info());
break;
case OpenSocket::ESocketWarning:
printf("Client::RUN [%s]ESocketWarning:Client %sn", pname.c_str(), socketMsg->info());
break;
case OpenSocket::ESocketAccept:
case OpenSocket::ESocketUdp:
assert(false);
break;
default:
break;
}
}
}
int main()
{
// 创建三个线程。listen线程、accept线程和client线程。
OpenThread::Create("listen", ListenThread);
OpenThread::Create("accept", AcceptThread);
OpenThread::Create("client", ClientThread);
// 启动OpenSocket。一个进程只需要一个OpenSocket。
openSocket_.run(SocketFunc);
//等待全部子线程结束,否则一直阻塞。
OpenThread::ThreadJoinAll();
printf("Pausen");
return getchar();
}
2.HttpClient
为了开发方面,我们使用OpenThread作为线程库。OpenThread可以实现多线程三大设计模式,开发这个HttpClient,使用Worker模式。
设计思路如下:
- 每个HttpClient是一条线程OpenThreader。一个HttpClient对象,可以处理任意个Http请求。它由Factory类管理和创建。
- 一次Http请求就是一个task,从Factory挑选一个HttpClient对象,向该对象发送task。并进行阻塞等待结果。
- HttpClient对象收到task消息。把IP和端口做参数,调用OpenSocket的connect。 connect有两个作用,一个是执行网络连接产生一个fd,同时把这个fd加入到poll,fd与HttpClient对象的线程id进行绑定。 fd加入到poll成功以后,该socket的任何消息可以通过线程Id,发到对应的HttpClient。
- HttpClient对象接收到socket的open消息后,向服务器发送http报文。
- 因为connect把fd和线程id进行绑定。所以,HttpClient会收到服务器返回的Http报文。
HttpClient接收完Http报文,就唤醒请求线程。请求线程被唤醒,拿到Http请求数据。#include <assert.h>
#include <time.h>
#include <math.h>
#include <map>
#include <string.h>
#include "open/openthread.h"
#include "opensocket.h"
using namespace open;
////////////HttpRequest//////////////////////
//请求http对象,包含返回对象。
class HttpRequest
{
std::string url_;
public:
std::map<std::string, std::string> headers_;
int port_;
std::string host_;
std::string ip_;
std::string path_;
std::string method_;
std::string body_;
HttpRequest() :port_(80) {}
std::string& operator[](const std::string& key) { return headers_[key]; }
//指定url,并进行解析和域名解析
void setUrl(const std::string& url);
inline void operator=(const std::string& url) { setUrl(url); }
//http返回对象
struct HttpResponse
{
int code_;
int clen_;
std::string head_;
std::string body_;
//std::multimap<std::string, std::string> headers_;
std::map<std::string, std::string> headers_;
std::string& operator[](const std::string& key) { return headers_[key]; }
HttpResponse():code_(0), clen_(0) {}
//解析返回http消息头
void parseHeader();
bool pushData(const char* data, size_t size);
};
HttpResponse response_;
//阻塞当前线程,等待http消息返回,才继续执行。
OpenSync openSync_;
};
////////////Proto//////////////////////
struct SocketProto : public OpenThreadProto
{
std::shared_ptr<OpenSocketMsg> data_;
static inline int ProtoType() { return 1; }
virtual inline int protoType() const { return SocketProto::ProtoType(); }
};
struct TaskProto : public OpenThreadProto
{
int fd_;
OpenSync openSync_;
std::shared_ptr<HttpRequest> request_;
static inline int ProtoType() { return 2; }
virtual inline int protoType() const { return TaskProto::ProtoType(); }
TaskProto() :fd_(0) {}
};
////////////App//////////////////////
//应用程序单利,封装OpenSocket,一个进程只有一个对象。
class App
{
//Msg需要手动释放,放到智能指针,由智能指针释放
static void SocketFunc(const OpenSocketMsg* msg)
{
if (!msg) return;
if (msg->uid_ >= 0)
{
auto proto = std::shared_ptr<SocketProto>(new SocketProto);
proto->srcPid_ = -1;
proto->srcName_ = "OpenSocket";
proto->data_ = std::shared_ptr<OpenSocketMsg>((OpenSocketMsg*)msg);
if (!OpenThread::Send((int)msg->uid_, proto))
printf("SocketFunc dispatch faild pid = %lldn", msg->uid_);
}
else delete msg;
}
public:
static App Instance_;
//OpenSocket对象,可以设计成单利
OpenSocket openSocket_;
//App构造的时候,启动OpenSocket。
App() { openSocket_.run(App::SocketFunc); }
};
App App::Instance_;
////////////HttpClient//////////////////////
//HttpClient线程类,Factory管理一组线程。
class HttpClient : public OpenThreadWorker
{
//Factory
class Factory
{
const std::vector<HttpClient*> vectWorker_;
public:
Factory()
:vectWorker_({
new HttpClient("HttpClient1"),
new HttpClient("HttpClient2"),
new HttpClient("HttpClient3"),
new HttpClient("HttpClient4"),
}) {}
HttpClient* getWorker()
{
if (vectWorker_.empty()) return 0;
return vectWorker_[std::rand() % vectWorker_.size()];
}
};
static Factory Instance_;
// HttpClient
// name是线程名,必须制定。在Linux上,top -Hp可以看到这个线程名。
HttpClient(const std::string& name)
:OpenThreadWorker(name)
{
registers(SocketProto::ProtoType(), (OpenThreadHandle)&HttpClient::onSocketProto);
registers(TaskProto::ProtoType(), (OpenThreadHandle)&HttpClient::onTaskProto);
start();
}
~HttpClient()
{
//销毁之前,尽可能唤醒请求线程,防止请求线程阻塞为被唤醒
for (auto iter = mapFdToTask_.begin(); iter != mapFdToTask_.end(); iter )
iter->second.openSync_.wakeup();
}
private:
void onTaskProto(TaskProto& proto)
{
auto& request = proto.request_;
//连接Http服务器,并把fd与当前线程绑定。该socket的全部消息,都发到此线程
proto.fd_ = App::Instance_.openSocket_.connect(pid(), request->ip_, request->port_);
request->response_.code_ = -1;
request->response_.head_.clear();
request->response_.body_.clear();
mapFdToTask_[proto.fd_] = proto;
}
//与http服务器连接成功以后,发送http请求报文
void onSendHttp(const std::shared_ptr<OpenSocketMsg>& data)
{
//需要判断fd绑定的task是否存在,否则关闭与Http服务器的连接
auto iter = mapFdToTask_.find(data->fd_);
if (iter == mapFdToTask_.end())
{
App::Instance_.openSocket_.close(pid(), data->fd_);
return;
}
auto& task = iter->second;
auto& request = task.request_;
std::string buffer = request->method_ " " request->path_ " HTTP/1.1 rn";
auto iter1 = request->headers_.begin();
for (; iter1 != request->headers_.end(); iter1 )
{
buffer.append(iter1->first ": " iter1->second "rn");
}
if (!request->body_.empty())
{
buffer.append("Content-Length:" std::to_string(request->body_.size()) "rnrn");
buffer.append(request->body_);
buffer.append("rn");
}
else
{
buffer.append("rn");
}
//制作好Http请求报文,发送给服务器。
App::Instance_.openSocket_.send(task.fd_, buffer.data(), (int)buffer.size());
}
//处理Http服务器发送过了socket数据流,拼成完整的Http返回报文
void onReadHttp(const std::shared_ptr<OpenSocketMsg>& data)
{
//Http任务列表没有绑定fd的任务,就对该fd关闭。
auto iter = mapFdToTask_.find(data->fd_);
if (iter == mapFdToTask_.end())
{
App::Instance_.openSocket_.close(pid(), data->fd_);
return;
}
auto& task = iter->second;
auto& response = task.request_->response_;
if (response.pushData(data->data(), data->size()))
{
App::Instance_.openSocket_.close(pid(), data->fd_);
}
}
//与Http服务器关闭的消息,唤醒请求线程,并对fd绑定的任务,移出任务列表
void onCloseHttp(const std::shared_ptr<OpenSocketMsg>& data)
{
auto iter = mapFdToTask_.find(data->fd_);
if (iter != mapFdToTask_.end())
{
iter->second.openSync_.wakeup();
mapFdToTask_.erase(iter);
}
}
//接收绑定此线程的socket消息。
void onSocketProto(const SocketProto& proto)
{
const auto& msg = proto.data_;
switch (msg->type_)
{
case OpenSocket::ESocketData:
onReadHttp(msg);
break;
case OpenSocket::ESocketClose:
onCloseHttp(msg);
break;
case OpenSocket::ESocketError:
printf("[%s]ESocketError:%sn", ThreadName((int)msg->uid_).c_str(), msg->info());
onCloseHttp(msg);
break;
case OpenSocket::ESocketWarning:
printf("[%s]ESocketWarning:%sn", ThreadName((int)msg->uid_).c_str(), msg->info());
break;
case OpenSocket::ESocketOpen:
onSendHttp(msg);
break;
case OpenSocket::ESocketAccept:
case OpenSocket::ESocketUdp:
assert(false);
break;
default:
break;
}
}
std::map<int, TaskProto> mapFdToTask_;
public:
static bool Http(std::shared_ptr<HttpRequest>& request)
{
if (request->ip_.empty())
{
assert(false);
return false;
}
request->response_.code_ = -1;
auto worker = Instance_.getWorker();
if (!worker) return false;
auto proto = std::shared_ptr<TaskProto>(new TaskProto);
proto->request_ = request;
bool ret = OpenThread::Send(worker->pid(), proto);
assert(ret);
proto->openSync_.await();
return ret;
}
};
HttpClient::Factory HttpClient::Instance_;
int main()
{
auto request = std::shared_ptr<HttpRequest>(new HttpRequest);
//请求交易所的最新龙虎数据
request->setUrl("http://reportdocs.static.szse.cn/files/text/jy/jy230308.txt");
request->method_ = "GET";
(*request)["Host"] = "reportdocs.static.szse.cn";
(*request)["Accept"] = "text/html,application/xhtml xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7";
(*request)["Accept-Encoding"] = "gzip,deflate";
(*request)["Accept-Language"] = "zh-CN,zh;q=0.9";
(*request)["Cache-Control"] = "max-age=0";
(*request)["User-Agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36(KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36";
(*request)["Upgrade-Insecure-Requests"] = "1";
HttpClient::Http(request);
auto& response = request->response_;
printf("code:%d, header:%sn", response.code_, response.head_.c_str());
return getchar();
}
void HttpRequest::setUrl(const std::string& url)
{
if (url.empty()) return;
url_ = url;
int len = (int)url.length();
char* ptr = (char*)url.c_str();
if (len >= 8)
{
if (memcmp(ptr, "http://", strlen("http://")) == 0)
ptr = strlen("http://");
else if (memcmp(ptr, "https://", strlen("https://")) == 0)
ptr = strlen("https://");
}
const char* tmp = strstr(ptr, "/");
path_.clear();
if (tmp != 0)
{
path_.append(tmp);
host_.clear();
host_.append(ptr, tmp - ptr);
}
else
{
host_ = ptr;
}
port_ = 80;
ip_.clear();
ptr = (char*)host_.c_str();
tmp = strstr(ptr, ":");
if (tmp != 0)
{
ip_.append(ptr, tmp - ptr);
tmp = 1;
port_ = atoi(tmp);
}
else
{
ip_ = ptr;
}
//域名解析,把域名转ip。可以缓存,提供效率
ip_ = OpenSocket::DomainNameToIp(ip_);
}
void HttpRequest::HttpResponse::parseHeader()
{
if (!headers_.empty() || head_.size() < 12) return;
std::string line;
const char* ptr = strstr(head_.c_str(), "rn");
if (!ptr) return;
code_ = 0;
clen_ = 0;
line.append(head_.c_str(), ptr - head_.c_str());
for (size_t i = 0; i < line.size(); i )
{
if (line[i] == ' ')
{
while (i < line.size() && line[i] == ' ') i;
code_ = std::atoi(line.data() i);
break;
}
}
if (code_ <= 0) return;
line.clear();
int k = -1;
int j = -1;
std::string key;
std::string value;
for (size_t i = ptr - head_.c_str() 2; i < head_.size() - 1; i )
{
if (head_[i] == 'r' && head_[i 1] == 'n')
{
if (j > 0)
{
k = 0;
while (k < line.size() && line[k] == ' ') k;
while (k >= 0 && line.back() == ' ') line.pop_back();
value = line.data() j 1;
while (j >= 0 && line[j] == ' ') j--;
key.clear();
key.append(line.data(), j);
for (size_t x = 0; x < key.size(); x )
key[x] = std::tolower(key[x]);
headers_[key] = value;
}
i;
j = -1;
line.clear();
continue;
}
line.push_back(head_[i]);
if (j < 0 && line.back() == ':')
{
j = (int)line.size() - 1;
}
}
clen_ = std::atoi(headers_["content-length"].c_str());
}
bool HttpRequest::HttpResponse::pushData(const char* data, size_t size)
{
if (code_ == -1)
{
head_.append(data, size);
const char* ptr = strstr(head_.data(), "rnrn");
if (!ptr) return false;
code_ = 0;
body_.append(ptr 4);
head_.resize(ptr - head_.data() 2);
parseHeader();
}
else
{
body_.append(data, size);
}
if (clen_ > 0)
{
if (clen_ >= body_.size())
{
body_.resize(clen_);
return true;
}
}
else if (body_.size() > 2)
{
if (body_[body_.size() - 2] == 'r' && body_.back() == 'n')
{
body_.pop_back();
body_.pop_back();
return true;
}
}
return false;
}
3.HttpServer
使用OpenThread的Actor模式设计高并发HttpServer。
创建5条线程,1条线程封装成监听者Listener,另外4条线程封装成接收者Accepter。
监听者Listener负责监听socket连接事件,监听到socket新连接事件后,就把fd发给其中一个接收者Accepter;
接收者Accepter接收到socket的fd后,打开该socket连接,与客户端进行网络通信。
此简单的Http接收到Http报文后,进行response一份Http报文,然后关闭socket完成Http短连接操作。
OpenSocket是对poll的封装,一个进程只需要创建一个OpenSocket对象。
代码语言:C 复制#include <assert.h>
#include <map>
#include <set>
#include <memory>
#include <string.h>
#include "opensocket.h"
#include "open/openthread.h"
using namespace open;
const std::string TestServerIp_ = "0.0.0.0";
const int TestServerPort_ = 8888;
//msgType == 1
struct SocketProto : public OpenThreadProto
{
std::shared_ptr<OpenSocketMsg> data_;
static inline int ProtoType() { return 1; }
virtual inline int protoType() const { return SocketProto::ProtoType(); }
};
//msgType == 2
struct RegisterProto : public OpenThreadProto
{
int srcPid_;
static inline int ProtoType() { return 2; }
virtual inline int protoType() const { return RegisterProto::ProtoType(); }
RegisterProto() :srcPid_(-1) {}
};
//msgType == 3
struct NewClientProto : public OpenThreadProto
{
int accept_fd_;
std::string addr_;
static inline int ProtoType() { return 3; }
virtual inline int protoType() const { return NewClientProto::ProtoType(); }
NewClientProto() : accept_fd_(-1) {}
};
////////////App//////////////////////
class App
{
static void SocketFunc(const OpenSocketMsg* msg)
{
if (!msg) return;
if (msg->uid_ >= 0)
{
auto proto = std::shared_ptr<SocketProto>(new SocketProto);
proto->srcPid_ = -1;
proto->srcName_ = "OpenSocket";
proto->data_ = std::shared_ptr<OpenSocketMsg>((OpenSocketMsg*)msg);
if (!OpenThread::Send((int)msg->uid_, proto))
printf("SocketFunc dispatch faild pid = %lldn", msg->uid_);
}
else
{
delete msg;
}
}
public:
static App Instance_;
OpenSocket openSocket_;
App() { openSocket_.run(App::SocketFunc); }
};
App App::Instance_;
////////////Listener//////////////////////
class Listener : public OpenThreadWorker
{
int listen_fd_;
unsigned int balance_;
std::set<int> setSlaveId_;
std::vector<int> vectSlaveId_;
public:
Listener(const std::string& name)
:OpenThreadWorker(name),
listen_fd_(-1)
{
balance_ = 0;
registers(SocketProto::ProtoType(), (OpenThreadHandle)&Listener::onSocketProto);
registers(RegisterProto::ProtoType(), (OpenThreadHandle)&Listener::onRegisterProto);
}
virtual ~Listener() {}
virtual void onStart()
{
listen_fd_ = App::Instance_.openSocket_.listen((uintptr_t)pid(), TestServerIp_, TestServerPort_, 64);
if (listen_fd_ < 0)
{
printf("Listener::onStart faild listen_fd_ = %dn", listen_fd_);
assert(false);
}
App::Instance_.openSocket_.start((uintptr_t)pid(), listen_fd_);
printf("HTTP: %s:%dn", TestServerIp_.c_str(), TestServerPort_);
}
void onRegisterProto(const RegisterProto& proto)
{
if (proto.srcPid_ >= 0)
{
if (setSlaveId_.find(proto.srcPid_) == setSlaveId_.end())
{
setSlaveId_.insert(proto.srcPid_);
vectSlaveId_.push_back(proto.srcPid_);
printf("Hello OpenSocket HttpServer, srcPid = %dn", proto.srcPid_);
}
}
}
// new client socket dispatch to Accept
void notifyToSlave(int accept_fd, const std::string& addr)
{
if (!vectSlaveId_.empty())
{
auto proto = std::shared_ptr<NewClientProto>(new NewClientProto);
proto->accept_fd_ = accept_fd;
proto->addr_ = addr;
if (balance_ >= vectSlaveId_.size())
{
balance_ = 0;
}
int slaveId = vectSlaveId_[balance_ ];
if (OpenThread::Send(slaveId, proto))
{
return;
}
printf("Listener::notifyToSlave send faild pid = %dn", slaveId);
}
App::Instance_.openSocket_.close(pid_, accept_fd);
}
void onSocketProto(const SocketProto& proto)
{
const auto& msg = proto.data_;
switch (msg->type_)
{
case OpenSocket::ESocketAccept:
// linsten new client socket
notifyToSlave(msg->ud_, msg->data());
printf("Listener::onSocket [%s]ESocketAccept:acceptFd = %dn", ThreadName((int)msg->uid_).c_str(), msg->ud_);
break;
case OpenSocket::ESocketClose:
break;
case OpenSocket::ESocketError:
printf("Listener::onSocket [%s]ESocketError:%sn", ThreadName((int)msg->uid_).c_str(), msg->info());
break;
case OpenSocket::ESocketWarning:
printf("Listener::onSocket [%s]ESocketWarning:%sn", ThreadName((int)msg->uid_).c_str(), msg->info());
break;
case OpenSocket::ESocketOpen:
break;
case OpenSocket::ESocketUdp:
case OpenSocket::ESocketData:
assert(false);
break;
default:
break;
}
}
};
////////////HttpRequest//////////////////////
struct HttpRequest
{
int fd_;
std::string addr_;
std::string method_;
std::string url_;
int code_;
int clen_;
std::string head_;
std::string body_;
std::map<std::string, std::string> headers_;
HttpRequest() :fd_(-1), code_(-1), clen_(-1) {}
//GET /xx/xx HTTP/x.x
bool parseHeader();
bool pushData(const char* data, size_t size);
};
////////////Accepter//////////////////////
class Accepter : public OpenThreadWorker
{
int listenId_;
std::map<int, HttpRequest> mapClient_;
public:
Accepter(const std::string& name)
:OpenThreadWorker(name),
listenId_(-1)
{
registers(SocketProto::ProtoType(), (OpenThreadHandle)&Accepter::onSocketProto);
registers(NewClientProto::ProtoType(), (OpenThreadHandle)&Accepter::onNewClientProto);
}
virtual ~Accepter() {}
virtual void onStart()
{
while (listenId_ < 0)
{
listenId_ = ThreadId("listener");
OpenThread::Sleep(1000);
}
auto proto = std::shared_ptr<RegisterProto>(new RegisterProto);
proto->srcPid_ = pid();
if (OpenThread::Send(listenId_, proto))
return;
printf("Accepter::onStart send faild pid = %dn", listenId_);
}
void onNewClientProto(const NewClientProto& proto)
{
int accept_fd = proto.accept_fd_;
if (accept_fd >= 0)
{
auto iter = mapClient_.find(accept_fd);
if (iter != mapClient_.end())
{
assert(false);
mapClient_.erase(iter);
App::Instance_.openSocket_.close(pid(), accept_fd);
return;
}
auto& client = mapClient_[accept_fd];
client.fd_ = accept_fd;
client.addr_ = proto.addr_;
App::Instance_.openSocket_.start(pid_, accept_fd);
}
}
//GET /xx/xx HTTP/x.x
void onReadHttp(const std::shared_ptr<OpenSocketMsg> msg)
{
auto iter = mapClient_.find(msg->fd_);
if (iter == mapClient_.end())
{
App::Instance_.openSocket_.close(pid_, msg->fd_);
return;
}
auto& request = iter->second;
if (!request.pushData(msg->data(), msg->size()))
{
//Header too large.close connet.
if (request.head_.size() > 1024)
App::Instance_.openSocket_.close(pid_, msg->fd_);
return;
}
printf("new client:url = %sn", request.url_.c_str());
std::string content;
content.append("<div>It's work!</div><br/>" request.addr_ "request:" request.url_);
std::string buffer = "HTTP/1.1 200 OKrncontent-length:" std::to_string(content.size()) "rnrn" content;
App::Instance_.openSocket_.send(msg->fd_, buffer.data(), (int)buffer.size());
}
virtual void onSocketProto(const SocketProto& proto)
{
const auto& msg = proto.data_;
switch (msg->type_)
{
case OpenSocket::ESocketData:
onReadHttp(msg);
break;
case OpenSocket::ESocketClose:
mapClient_.erase(msg->fd_);
break;
case OpenSocket::ESocketError:
mapClient_.erase(msg->fd_);
printf("Accepter::onStart [%s]ESocketError:%sn", ThreadName((int)msg->uid_).c_str(), msg->info());
break;
case OpenSocket::ESocketWarning:
printf("Accepter::onStart [%s]ESocketWarning:%sn", ThreadName((int)msg->uid_).c_str(), msg->info());
break;
case OpenSocket::ESocketOpen:
{
auto iter = mapClient_.find(msg->fd_);
if (iter == mapClient_.end())
{
App::Instance_.openSocket_.close(pid_, msg->fd_);
return;
}
}
break;
case OpenSocket::ESocketAccept:
case OpenSocket::ESocketUdp:
assert(false);
break;
default:
break;
}
}
};
int main()
{
printf("start server==>>n");
std::vector<OpenThreader*> vectServer = {
new Listener("listener"),
new Accepter("accepter1"),
new Accepter("accepter2"),
new Accepter("accepter3"),
new Accepter("accepter4")
};
for (size_t i = 0; i < vectServer.size(); i)
vectServer[i]->start();
printf("wait close==>>n");
OpenThread::ThreadJoinAll();
for (size_t i = 0; i < vectServer.size(); i)
delete vectServer[i];
vectServer.clear();
printf("Pausen");
return getchar();
}
bool HttpRequest::parseHeader()
{
if (!headers_.empty() || head_.size() < 12) return true;
std::string line;
const char* ptr = strstr(head_.c_str(), "rn");
if (!ptr) return false;
clen_ = -1;
line.append(head_.c_str(), ptr - head_.c_str());
int state = 0;
method_.clear();
url_.clear();
for (size_t k = 0; k < line.size(); k)
{
if (state == 0)
{
if (line[k] != ' ')
{
method_.push_back(line[k]);
continue;
}
state = 1;
while (k < line.size() && line[k] == ' ') k;
if (line[k] != ' ') --k;
}
else
{
if (line[k] != ' ')
{
url_.push_back(line[k]);
continue;
}
break;
}
}
line.clear();
int k = -1;
int j = -1;
std::string key;
std::string value;
for (size_t i = ptr - head_.c_str() 2; i < head_.size() - 1; i )
{
if (head_[i] == 'r' && head_[i 1] == 'n')
{
if (j > 0)
{
k = 0;
while (k < line.size() && line[k] == ' ') k;
while (k >= 0 && line.back() == ' ') line.pop_back();
value = line.data() j 1;
while (j >= 0 && line[j] == ' ') j--;
key.clear();
key.append(line.data(), j);
for (size_t x = 0; x < key.size(); x )
key[x] = std::tolower(key[x]);
headers_[key] = value;
}
i;
j = -1;
line.clear();
continue;
}
line.push_back(head_[i]);
if (j < 0 && line.back() == ':')
{
j = (int)line.size() - 1;
}
}
clen_ = std::atoi(headers_["content-length"].c_str());
return true;
}
bool HttpRequest::pushData(const char* data, size_t size)
{
if (code_ == -1)
{
head_.append(data, size);
const char* ptr = strstr(head_.data(), "rnrn");
if (!ptr) return false;
code_ = 0;
body_.append(ptr 4);
head_.resize(ptr - head_.data() 2);
if (!parseHeader()) return false;
}
else
{
body_.append(data, size);
}
if (clen_ >= 0)
{
if (clen_ == 0 && clen_ == body_.size())
{
return true;
}
if (clen_ >= body_.size())
{
body_.resize(clen_);
return true;
}
}
else if (body_.size() > 2)
{
if (body_[body_.size() - 2] == 'r' && body_.back() == 'n')
{
body_.pop_back();
body_.pop_back();
return true;
}
}
return false;
}
4.设计一个高并发服务器框架
Listener负责监听socket连接事件,发socket连接事件发给Accepter。
Accepter负责接收Listener发过来的socket连接事件,并与socket进行通信。
Client是客户端集群,使用它可以对服务器进行压力测试。
代码语言:C 复制#include <assert.h>
#include <map>
#include <set>
#include <memory>
#include <string.h>
#include "opensocket.h"
#include "open/openthread.h"
using namespace open;
const std::string TestServerIp_ = "0.0.0.0";
const std::string TestClientIp_ = "127.0.0.1";
const int TestServerPort_ = 8888;
//proto
struct SocketProto : public OpenThreadProto
{
std::shared_ptr<OpenSocketMsg> data_;
static inline int ProtoType() { return 1; }
virtual inline int protoType() const { return SocketProto::ProtoType(); }
};
class ProtoBuffer : public OpenThreadProto
{
std::shared_ptr<void> data_;
public:
int msgId_;
ProtoBuffer()
: OpenThreadProto()
, msgId_(0)
, data_(0) {}
virtual ~ProtoBuffer() {}
template <class T>
inline T& data()
{
T* t = 0;
if (data_)
{
t = dynamic_cast<T*>((T*)data_.get());
if (data_.get() == t) return *t;
data_.reset();
}
t = new T;
data_ = std::shared_ptr<T>(t);
return *t;
}
template <class T>
inline T& data() const
{
if (data_)
{
T* t = dynamic_cast<T*>((T*)data_.get());
if (data_.get() == t) return *t;
}
assert(false);
static T t;
return t;
}
static inline int ProtoType() { return (int)(uintptr_t) & (ProtoType); }
virtual inline int protoType() const { return ProtoBuffer::ProtoType(); }
};
////////////App//////////////////////
class App
{
static void SocketFunc(const OpenSocketMsg* msg)
{
if (!msg) return;
if (msg->uid_ >= 0)
{
auto proto = std::shared_ptr<SocketProto>(new SocketProto);
proto->srcPid_ = -1;
proto->srcName_ = "OpenSocket";
proto->data_ = std::shared_ptr<OpenSocketMsg>((OpenSocketMsg*)msg);
if (!OpenThread::Send((int)msg->uid_, proto))
printf("SocketFunc dispatch faild pid = %lldn", msg->uid_);
return;
}
delete msg;
}
public:
static App Instance_;
OpenSocket openSocket_;
App() { openSocket_.run(App::SocketFunc); }
};
App App::Instance_;
enum EMsgId
{
new_accept,
new_client,
test_client
};
////////////Listener//////////////////////
struct DataNewClient
{
int accept_fd_;
std::string addr_;
DataNewClient() :accept_fd_(-1) {}
};
class Listener : public OpenThreadWorker
{
std::set<int> setSlaveId_;
std::vector<int> vectSlaveId_;
int listen_fd_;
unsigned int balance_;
bool isOpening_;
public:
Listener(const std::string& name)
:OpenThreadWorker(name),
listen_fd_(-1)
{
isOpening_ = false;
balance_ = 0;
registers(SocketProto::ProtoType(), (OpenThreadHandle)&Listener::onSocketProto);
registers(ProtoBuffer::ProtoType(), (OpenThreadHandle)&Listener::onProtoBuffer);
}
virtual ~Listener() {}
virtual void onStart()
{
listen_fd_ = App::Instance_.openSocket_.listen((uintptr_t)pid(), TestServerIp_, TestServerPort_, 64);
if (listen_fd_ < 0)
{
printf("Listener::onStart faild listen_fd_ = %dn", listen_fd_);
assert(false);
}
App::Instance_.openSocket_.start((uintptr_t)pid(), listen_fd_);
}
private:
void onProtoBuffer(const ProtoBuffer& proto)
{
if (proto.msgId_ == new_accept)
{
auto msg = proto.data<std::string>();
assert(msg == "listen success!");
if (proto.srcPid() >= 0)
{
if (setSlaveId_.find(proto.srcPid()) == setSlaveId_.end())
{
setSlaveId_.insert(proto.srcPid());
vectSlaveId_.push_back(proto.srcPid());
printf("Hello OpenSocket, srcPid = %dn", proto.srcPid());
}
}
}
}
void notify(int accept_fd, const std::string& addr)
{
if (!vectSlaveId_.empty())
{
auto proto = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
proto->msgId_ = new_client;
auto& msg = proto->data<DataNewClient>();
msg.accept_fd_ = accept_fd;
msg.addr_ = addr;
if (balance_ >= vectSlaveId_.size())
{
balance_ = 0;
}
int slaveId = vectSlaveId_[balance_ ];
bool ret = send(slaveId, proto);
if (ret) return;
}
App::Instance_.openSocket_.close(pid_, accept_fd);
}
virtual void onSocketProto(const SocketProto& proto)
{
const auto msg = proto.data_;
switch (msg->type_)
{
case OpenSocket::ESocketAccept:
notify(msg->ud_, msg->data());
printf("Listener::onStart [%s]ESocketAccept:acceptFd = %dn", ThreadName((int)msg->uid_).c_str(), msg->ud_);
break;
case OpenSocket::ESocketClose:
isOpening_ = false;
break;
case OpenSocket::ESocketError:
printf("Listener::onStart [%s]ESocketError:%sn", ThreadName((int)msg->uid_).c_str(), msg->info());
break;
case OpenSocket::ESocketWarning:
printf("Listener::onStart [%s]ESocketWarning:%sn", ThreadName((int)msg->uid_).c_str(), msg->info());
break;
case OpenSocket::ESocketOpen:
isOpening_ = true;
break;
case OpenSocket::ESocketUdp:
case OpenSocket::ESocketData:
assert(false);
break;
default:
break;
}
}
};
////////////Accepter//////////////////////
struct ServerClient
{
int fd_;
std::string addr_;
std::string buffer_;
ServerClient() :fd_(-1) {}
};
class Accepter : public OpenThreadWorker
{
int listenId_;
std::map<int, ServerClient> mapClient_;
public:
Accepter(const std::string& name)
:OpenThreadWorker(name),
listenId_(-1)
{
registers(SocketProto::ProtoType(), (OpenThreadHandle)&Accepter::onSocketProto);
registers(ProtoBuffer::ProtoType(), (OpenThreadHandle)&Accepter::onProtoBuffer);
}
virtual ~Accepter() {}
virtual void onStart()
{
while (listenId_ < 0)
{
listenId_ = ThreadId("listener");
OpenThread::Sleep(1000);
}
auto root = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
root->msgId_ = new_accept;
auto& data = root->data<std::string>();
data = "listen success!";
send(listenId_, root);
}
private:
void onProtoBuffer(const ProtoBuffer& proto)
{
if (proto.msgId_ == new_client)
{
auto msg = proto.data<DataNewClient>();
int accept_fd = msg.accept_fd_;
if (accept_fd >= 0)
{
auto iter = mapClient_.find(accept_fd);
if (iter != mapClient_.end())
{
assert(false);
mapClient_.erase(iter);
App::Instance_.openSocket_.close(pid(), accept_fd);
return;
}
auto& client = mapClient_[accept_fd];
client.fd_ = accept_fd;
client.addr_ = msg.addr_;
App::Instance_.openSocket_.start(pid_, accept_fd);
}
}
}
void onRead(const std::shared_ptr<OpenSocketMsg>& msg)
{
auto iter = mapClient_.find(msg->fd_);
if (iter == mapClient_.end())
{
App::Instance_.openSocket_.close(pid_, msg->fd_);
return;
}
auto& client = iter->second;
client.buffer_.append(msg->data(), msg->size());
auto& buffer = client.buffer_;
if (buffer.empty())
return;
std::string data = "[" name_ "]" client.addr_ ":" buffer;
client.buffer_.clear();
App::Instance_.openSocket_.send(client.fd_, data.data(), (int)data.size());
}
virtual void onSocketProto(const SocketProto& proto)
{
const auto msg = proto.data_;
switch (msg->type_)
{
case OpenSocket::ESocketData:
onRead(msg);
break;
case OpenSocket::ESocketClose:
mapClient_.erase(msg->fd_);
break;
case OpenSocket::ESocketError:
mapClient_.erase(msg->fd_);
printf("Accepter::onStart [%s]ESocketError:%sn", ThreadName((int)msg->uid_).c_str(), msg->info());
break;
case OpenSocket::ESocketWarning:
printf("Accepter::onStart [%s]ESocketWarning:%sn", ThreadName((int)msg->uid_).c_str(), msg->info());
break;
case OpenSocket::ESocketOpen:
{
auto iter = mapClient_.find(msg->fd_);
if (iter == mapClient_.end())
{
App::Instance_.openSocket_.close(pid_, msg->fd_);
return;
}
}
break;
case OpenSocket::ESocketAccept:
case OpenSocket::ESocketUdp:
assert(false);
break;
default:
break;
}
}
};
struct TestMsg
{
int count_;
TestMsg() :count_(0) {}
};
struct User
{
int fd_;
int userId_;
std::string buffer_;
};
class Client : public OpenThreadWorker
{
std::map<int, User> mapUser_;
public:
Client(const std::string& name)
:OpenThreadWorker(name)
{
registers(SocketProto::ProtoType(), (OpenThreadHandle)&Client::onSocketProto);
registers(ProtoBuffer::ProtoType(), (OpenThreadHandle)&Client::onProtoBuffer);
}
virtual ~Client() {}
virtual void onStart()
{
}
private:
void onProtoBuffer(const ProtoBuffer& proto)
{
if (proto.msgId_ == test_client)
{
auto msg = proto.data<TestMsg>();
int count = msg.count_;
int fd = 0;
for (int i = 0; i < count; i )
{
fd = App::Instance_.openSocket_.connect(pid_, TestClientIp_, TestServerPort_);
if (fd < 0)
{
printf("Client::start_test faild fd = %dn", fd);
assert(0);
}
auto& user = mapUser_[fd];
user.fd_ = fd;
user.userId_ = pid_ i * 1000;
printf("Client::start_test[%s] fd = %d n", name().c_str(), fd);
}
}
}
void onRead(const std::shared_ptr<OpenSocketMsg>& msg)
{
auto iter = mapUser_.find(msg->fd_);
if (iter == mapUser_.end())
{
App::Instance_.openSocket_.close(pid_, msg->fd_);
return;
}
auto& user = iter->second;
user.buffer_.append(msg->data(), msg->size());
printf("Client::onRead[%s:%d]:%sn", name().c_str(), user.userId_, user.buffer_.c_str());
user.buffer_.clear();
OpenSocket::Sleep(500);
std::string data = "Hello OpenSocket!";
App::Instance_.openSocket_.send(user.fd_, data.data(), (int)data.size());
}
virtual void onSocketProto(const SocketProto& proto)
{
const auto msg = proto.data_;
switch (msg->type_)
{
case OpenSocket::ESocketData:
onRead(msg);
break;
case OpenSocket::ESocketClose:
mapUser_.erase(msg->fd_);
break;
case OpenSocket::ESocketError:
mapUser_.erase(msg->fd_);
printf("Client::onStart [%s]ESocketError:%sn", ThreadName((int)msg->uid_).c_str(), msg->info());
break;
case OpenSocket::ESocketWarning:
printf("Client::onStart [%s]ESocketWarning:%sn", ThreadName((int)msg->uid_).c_str(), msg->info());
break;
case OpenSocket::ESocketOpen:
{
auto iter = mapUser_.find(msg->fd_);
if (iter == mapUser_.end())
{
App::Instance_.openSocket_.close(pid_, msg->fd_);
return;
}
std::string buffer = "Hello OpenSocket!";
App::Instance_.openSocket_.send(msg->fd_, buffer.data(), (int)buffer.size());
}
break;
case OpenSocket::ESocketAccept:
case OpenSocket::ESocketUdp:
assert(false);
break;
default:
break;
}
}
};
int main()
{
std::vector<OpenThreadWorker*> vectWorker;
printf("start server==>>n");
//server
std::vector<OpenThreadWorker*> vectServer =
{
new Listener("listener"),
new Accepter("accepter1"),
new Accepter("accepter2"),
new Accepter("accepter3"),
new Accepter("accepter4")
};
for (size_t i = 0; i < vectServer.size(); i )
{
vectWorker.push_back(vectServer[i]);
vectServer[i]->start();
}
printf("slepp 3000 ms==>>n");
//wait server start.
OpenThread::Sleep(3000);
printf("start client==>>n");
//client
std::vector<OpenThreadWorker*> vectClient =
{
new Client("client1"),
new Client("client2"),
new Client("client3"),
new Client("client4"),
};
for (size_t i = 0; i < vectClient.size(); i )
{
vectWorker.push_back(vectClient[i]);
vectClient[i]->start();
}
auto proto = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
proto->msgId_ = test_client;
auto& data = proto->data<TestMsg>();
data.count_ = 10;
//
for (size_t i = 0; i < vectClient.size(); i )
{
vectClient[i]->send(vectClient[i]->pid(), proto);
}
printf("wait close==>>n");
//wait all worker
OpenThread::ThreadJoinAll();
for (size_t i = 0; i < vectWorker.size(); i )
{
delete vectWorker[i];
}
vectWorker.clear();
printf("Pausen");
return getchar();
}