用OpenSocket开发一个简单的高性能高并发HttpServer

2023-07-20 17:37:51 浏览数 (2)

OpenSocket是一个跨全平台的高性能网络并发库。

它使用了高性能IO,Linux和安卓用epoll,Win32用IOCP,iOS和Mac用kqueue,其他系统使用select。

本文用这种高性能socket库,设计开发一个简单的HttpServer。

为了开发方便,我们使用OpenThread作为线程库。使用OpenThread的Actor模式设计高并发HttpServer。

设计思路如下:

创建5条线程,1条线程封装成监听者Listener,另外4条线程封装成接收者Accepter。

监听者Listener负责监听socket连接事件,监听到socket新连接事件后,就把fd发给其中一个接收者Accepter;

接收者Accepter接收到socket的fd后,打开该socket连接,与客户端进行网络通信。

此简单的HttpServer接收到Http报文后,进行response一份Http报文,然后关闭socket完成Http短连接操作。

具体源码如下:

代码语言:C 复制
#include <assert.h>
#include <map>
#include <set>
#include <memory>
#include <string.h>
#include "opensocket.h"
#include "open/openthread.h"
using namespace open;

const std::string TestServerIp_ = "0.0.0.0";
const int TestServerPort_ = 8888;

//msgType == 1
struct SocketProto : public OpenThreadProto
{
    std::shared_ptr<OpenSocketMsg> data_;
    static inline int ProtoType() { return 1; }
    virtual inline int protoType() const { return SocketProto::ProtoType(); }
};

//msgType == 2
struct RegisterProto : public OpenThreadProto
{
    int srcPid_;
    static inline int ProtoType() { return 2; }
    virtual inline int protoType() const { return RegisterProto::ProtoType(); }
    RegisterProto() :srcPid_(-1) {}
};

//msgType == 3
struct NewClientProto : public OpenThreadProto
{
    int accept_fd_;
    std::string addr_;
    static inline int ProtoType() { return 3; }
    virtual inline int protoType() const { return NewClientProto::ProtoType(); }
    NewClientProto() : accept_fd_(-1) {}
};

////////////App//////////////////////
class App
{
    static void SocketFunc(const OpenSocketMsg* msg)
    {
        if (!msg) return;
        if (msg->uid_ >= 0)
        {
            auto proto = std::shared_ptr<SocketProto>(new SocketProto);
            proto->srcPid_ = -1;
            proto->srcName_ = "OpenSocket";
            proto->data_ = std::shared_ptr<OpenSocketMsg>((OpenSocketMsg*)msg);
            if (!OpenThread::Send((int)msg->uid_, proto))
                printf("SocketFunc dispatch faild pid = %lldn", msg->uid_);
        }
        else
        {
            delete msg;
        }
    }
public:
    static App Instance_;
    OpenSocket openSocket_;
    App() { openSocket_.run(App::SocketFunc); }
};
App App::Instance_;

////////////Listener//////////////////////
class Listener : public OpenThreadWorker
{
    int listen_fd_;
    unsigned int balance_;
    std::set<int> setSlaveId_;
    std::vector<int> vectSlaveId_;
public:
    Listener(const std::string& name)
        :OpenThreadWorker(name),
        listen_fd_(-1)
    {
        balance_ = 0;
        registers(SocketProto::ProtoType(), (OpenThreadHandle)&Listener::onSocketProto);
        registers(RegisterProto::ProtoType(), (OpenThreadHandle)&Listener::onRegisterProto);
    }
    virtual ~Listener() {}
    virtual void onStart()
    {
        listen_fd_ = App::Instance_.openSocket_.listen((uintptr_t)pid(), TestServerIp_, TestServerPort_, 64);
        if (listen_fd_ < 0)
        {
            printf("Listener::onStart faild listen_fd_ = %dn", listen_fd_);
            assert(false);
        }
        App::Instance_.openSocket_.start((uintptr_t)pid(), listen_fd_);
        printf("HTTP: %s:%dn", TestServerIp_.c_str(), TestServerPort_);
    }
    void onRegisterProto(const RegisterProto& proto)
    {
        if (proto.srcPid_ >= 0)
        {
            if (setSlaveId_.find(proto.srcPid_) == setSlaveId_.end())
            {
                setSlaveId_.insert(proto.srcPid_);
                vectSlaveId_.push_back(proto.srcPid_);
                printf("Hello OpenSocket HttpServer, srcPid = %dn", proto.srcPid_);
            }
        }
    }
    // new client socket dispatch to Accept
    void notifyToSlave(int accept_fd, const std::string& addr)
    {
        if (!vectSlaveId_.empty())
        {
            auto proto = std::shared_ptr<NewClientProto>(new NewClientProto);
            proto->accept_fd_ = accept_fd;
            proto->addr_ = addr;
            if (balance_ >= vectSlaveId_.size())
            {
                balance_ = 0;
            }
            int slaveId = vectSlaveId_[balance_  ];
            if (OpenThread::Send(slaveId, proto))
            {
                return;
            }
            printf("Listener::notifyToSlave send faild pid = %dn", slaveId);
        }
        App::Instance_.openSocket_.close(pid_, accept_fd);
    }
    void onSocketProto(const SocketProto& proto)
    {
        const auto& msg = proto.data_;
        switch (msg->type_)
        {
        case OpenSocket::ESocketAccept:
            // linsten new client socket
            notifyToSlave(msg->ud_, msg->data());
            printf("Listener::onSocket [%s]ESocketAccept:acceptFd = %dn", ThreadName((int)msg->uid_).c_str(), msg->ud_);
            break;
        case OpenSocket::ESocketClose:
            break;
        case OpenSocket::ESocketError:
            printf("Listener::onSocket [%s]ESocketError:%sn", ThreadName((int)msg->uid_).c_str(), msg->info());
            break;
        case OpenSocket::ESocketWarning:
            printf("Listener::onSocket [%s]ESocketWarning:%sn", ThreadName((int)msg->uid_).c_str(), msg->info());
            break;
        case OpenSocket::ESocketOpen:
            break;
        case OpenSocket::ESocketUdp:
        case OpenSocket::ESocketData:
            assert(false);
            break;
        default:
            break;
        }
    }
};

////////////HttpRequest//////////////////////
struct HttpRequest
{
    int fd_;
    std::string addr_;

    std::string method_;
    std::string url_;

    int code_;
    int clen_;
    std::string head_;
    std::string body_;
    std::map<std::string, std::string> headers_;
    HttpRequest() :fd_(-1), code_(-1), clen_(-1) {}

    //GET /xx/xx HTTP/x.x
    bool parseHeader();
    bool pushData(const char* data, size_t size);
};

////////////Accepter//////////////////////
class Accepter : public OpenThreadWorker
{
    int listenId_;
    std::map<int, HttpRequest> mapClient_;
public:
    Accepter(const std::string& name)
        :OpenThreadWorker(name),
        listenId_(-1)
    {
        registers(SocketProto::ProtoType(), (OpenThreadHandle)&Accepter::onSocketProto);
        registers(NewClientProto::ProtoType(), (OpenThreadHandle)&Accepter::onNewClientProto);
    }
    virtual ~Accepter() {}
    virtual void onStart() 
    { 
        while (listenId_ < 0)
        {
            listenId_ = ThreadId("listener");
            OpenThread::Sleep(1000);
        }
        auto proto = std::shared_ptr<RegisterProto>(new RegisterProto);
        proto->srcPid_ = pid();
        if (OpenThread::Send(listenId_, proto))
            return;
        printf("Accepter::onStart send faild pid = %dn", listenId_);
    }
    void onNewClientProto(const NewClientProto& proto)
    {
        int accept_fd = proto.accept_fd_;
        if (accept_fd >= 0)
        {
            auto iter = mapClient_.find(accept_fd);
            if (iter != mapClient_.end())
            {
                assert(false);
                mapClient_.erase(iter);
                App::Instance_.openSocket_.close(pid(), accept_fd);
                return;
            }
            auto& client = mapClient_[accept_fd];
            client.fd_ = accept_fd;
            client.addr_ = proto.addr_;
            App::Instance_.openSocket_.start(pid_, accept_fd);
        }
    }
    //GET /xx/xx HTTP/x.x
    void onReadHttp(const std::shared_ptr<OpenSocketMsg> msg)
    {
        auto iter = mapClient_.find(msg->fd_);
        if (iter == mapClient_.end())
        {
            App::Instance_.openSocket_.close(pid_, msg->fd_);
            return;
        }
        auto& request = iter->second;
        if (!request.pushData(msg->data(), msg->size()))
        {
            //Header too large.close connet.
            if (request.head_.size() > 1024)
                App::Instance_.openSocket_.close(pid_, msg->fd_);
            return;
        }
        printf("new client:url = %sn", request.url_.c_str());
        std::string content;
        content.append("<div>It's work!</div><br/>"   request.addr_   "request:"   request.url_);
        std::string buffer = "HTTP/1.1 200 OKrncontent-length:"   std::to_string(content.size())   "rnrn"   content;
        App::Instance_.openSocket_.send(msg->fd_, buffer.data(), (int)buffer.size());
    }
    virtual void onSocketProto(const SocketProto& proto)
    {
        const auto& msg = proto.data_;
        switch (msg->type_)
        {
        case OpenSocket::ESocketData:
            onReadHttp(msg);
            break;
        case OpenSocket::ESocketClose:
            mapClient_.erase(msg->fd_);
            break;
        case OpenSocket::ESocketError:
            mapClient_.erase(msg->fd_);
            printf("Accepter::onStart [%s]ESocketError:%sn", ThreadName((int)msg->uid_).c_str(), msg->info());
            break;
        case OpenSocket::ESocketWarning:
            printf("Accepter::onStart [%s]ESocketWarning:%sn", ThreadName((int)msg->uid_).c_str(), msg->info());
            break;
        case OpenSocket::ESocketOpen:
        {
            auto iter = mapClient_.find(msg->fd_);
            if (iter == mapClient_.end())
            {
                App::Instance_.openSocket_.close(pid_, msg->fd_);
                return;
            }
        }
            break;
        case OpenSocket::ESocketAccept:
        case OpenSocket::ESocketUdp:
            assert(false);
            break;
        default:
            break;
        }
    }
};
int main()
{
    printf("start server==>>n");
    std::vector<OpenThreader*> vectServer = {
        new Listener("listener"),
        new Accepter("accepter1"),
        new Accepter("accepter2"),
        new Accepter("accepter3"),
        new Accepter("accepter4")
    };
    for (size_t i = 0; i < vectServer.size();   i)
        vectServer[i]->start();

    printf("wait close==>>n");
    OpenThread::ThreadJoinAll();
    for (size_t i = 0; i < vectServer.size();   i)
        delete vectServer[i];
    vectServer.clear();
    
    printf("Pausen");
    return getchar();
}


bool HttpRequest::parseHeader()
{
    if (!headers_.empty() || head_.size() < 12) return true;
    std::string line;
    const char* ptr = strstr(head_.c_str(), "rn");
    if (!ptr) return false;
    clen_ = -1;
    line.append(head_.c_str(), ptr - head_.c_str());

    int state = 0;
    method_.clear();
    url_.clear();
    for (size_t k = 0; k < line.size();   k)
    {
        if (state == 0)
        {
            if (line[k] != ' ')
            {
                method_.push_back(line[k]);
                continue;
            }
            state = 1;
            while (k < line.size() && line[k] == ' ')   k;
            if (line[k] != ' ') --k;
        }
        else
        {
            if (line[k] != ' ')
            {
                url_.push_back(line[k]);
                continue;
            }
            break;
        }
    }

    line.clear();
    int k = -1;
    int j = -1;
    std::string key;
    std::string value;
    for (size_t i = ptr - head_.c_str()   2; i < head_.size() - 1; i  )
    {
        if (head_[i] == 'r' && head_[i   1] == 'n')
        {
            if (j > 0)
            {
                k = 0;
                while (k < line.size() && line[k] == ' ')   k;
                while (k >= 0 && line.back() == ' ') line.pop_back();
                value = line.data()   j   1;
                while (j >= 0 && line[j] == ' ') j--;
                key.clear();
                key.append(line.data(), j);
                for (size_t x = 0; x < key.size(); x  )
                    key[x] = std::tolower(key[x]);
                headers_[key] = value;
            }
              i;
            j = -1;
            line.clear();
            continue;
        }
        line.push_back(head_[i]);
        if (j < 0 && line.back() == ':')
        {
            j = (int)line.size() - 1;
        }
    }
    clen_ = std::atoi(headers_["content-length"].c_str());
    return true;
}

bool HttpRequest::pushData(const char* data, size_t size)
{
    if (code_ == -1)
    {
        head_.append(data, size);
        const char* ptr = strstr(head_.data(), "rnrn");
        if (!ptr) return false;
        code_ = 0;
        body_.append(ptr   4);
        head_.resize(ptr - head_.data()   2);
        if (!parseHeader()) return false;
    }
    else
    {
        body_.append(data, size);
    }
    if (clen_ >= 0)
    {
        if (clen_ == 0 && clen_ == body_.size())
        {
            return true;
        }
        if (clen_ >= body_.size())
        {
            body_.resize(clen_);
            return true;
        }
    }
    else if (body_.size() > 2)
    {
        if (body_[body_.size() - 2] == 'r' && body_.back() == 'n')
        {
            body_.pop_back();
            body_.pop_back();
            return true;
        }
    }
    return false;
}

编译和执行

请安装cmake工具,用cmake可以构建出VS或者XCode工程,就可以在vs或者xcode上编译运行。

源代码:https://github.com/OpenMiniServer/opensockethttps://github.com/OpenMiniServer/opensocket)

代码语言:txt复制
#克隆项目
git clone https://github.com/OpenMiniServer/opensocket
cd ./opensocket
#创建build工程目录
mkdir build
cd build
cmake ..
#如果是win32,在该目录出现opensocket.sln,点击它就可以启动vs写代码调试
make
./httpserver

0 人点赞