一个C++多线程TCP服务Demo

2023-06-12 14:48:21 浏览数 (1)

代码仓库在文末,编译通过,请放心食用!

无界AI生成

本文介绍了如何在 C 中为 Linux 环境实现并发 TCP/IP 服务器。 多线程在我的解决方案中提供并发性。 由于并发性,客户不必等待轮到他们,可以立即得到服务。 我创建的服务器有一个线程来处理新连接(TCPServer 类)。 接受这样的连接后,将创建一个新线程,负责与给定客户端(ConnectionHandler 类)的所有通信。 ConnectionHandler 的实现可以自由更改。 它可以允许对服务器的任何使用,例如它可以很好地用作 HTTP 服务器。

TCPServer.h

代码语言:javascript复制
#pragma once

#include <thread>

class TCPServer {
public:
    static const int PORT = 1234;

    // starts server
    void start();

    // stops server
    void stop();

    // joining server thread - waits for server to end
    void join();

    // constructor automatically starts server thread
    TCPServer();
    ~TCPServer();

private:
    // event file descriptor - it will be used to tell server to stop
    int efd;

    // server thread
    std::thread m_thread;

    // it works in server thread
    void threadFunc();
};

服务器在构造 TCPServer 对象后启动。 创建一个运行 threadFunc() 方法的新线程。 TCPServer::start() 还创建了 eventfd,用于通知服务器停止处理。 threadFunc() 创建一个套接字并将其绑定到指定的端口(在本例中为 1234)。 poll() 用于监视是否有任何打开的文件描述符准备好执行 I/O。 在侦听套接字上接收到 I/O 事件后,将构建新的 ConnectionHandler 对象,该对象在单独的线程上运行。

TCPServer.cpp

代码语言:javascript复制
#include "TCPServer.h"
#include <unistd.h>
#include <poll.h>
#include <sys/eventfd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <cassert>
#include <vector>
#include <unordered_map>
#include "ConnectionHandler.h"
#include <iostream>

TCPServer::TCPServer()
        : efd(-1) { 
    this->start();
}

TCPServer::~TCPServer() {
    this->stop();
    if (this->efd != -1){
        close(this->efd);
    }
}

void TCPServer::start() {
    assert(!this->m_thread.joinable());

    if (this->efd != -1) {
        close(this->efd);
    }

    this->efd = eventfd(0, 0);
    if (this->efd == -1) {
        std::cout << "eventfd() => -1, errno=" << errno << std::endl;
        return;
    }

    // creates thread
    this->m_thread = std::thread([this]( "this") { this->threadFunc(); });

    // sets name for thread
    pthread_setname_np(this->m_thread.native_handle(), "TCPServer");
}


void TCPServer::stop() {
    // writes to efd - it will be handled as stopping server thread
    uint64_t one = 1;
    auto ret = write(this->efd, &one, 8);
    if (ret == -1) {
        std::cout << "write => -1, errno=" << errno << std::endl;
    }
}

void TCPServer::join() {
    if (this->m_thread.joinable()) {
        this->m_thread.join();
    }
}

void TCPServer::threadFunc() {
    int sockfd;

    std::cout << "Listen on: " << PORT << std::endl;
    if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        std::cout << "socket() => -1, errno=" << errno << std::endl;
        return;
    }

    int reuseaddr = 1;
    if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(int)) == -1) {
        std::cout << "setsockopt() => -1, errno=" << errno << std::endl;
    }

    struct sockaddr_in servaddr = {0, 0, 0, 0};
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = INADDR_ANY;
    servaddr.sin_port = htons(PORT);

    // binding to socket that will listen for new connections
    if (bind(sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr)) == -1) {
        std::cout << "bind() => -1, errno=" << errno << std::endl;
        close(sockfd);
        return;
    }

    // started listening, 50 pending connections can wait in a queue
    listen(sockfd, 50);

    // monitored file descriptors - at start there is efd and just created sockfd. POLLIN means we wait for data to read
    std::vector<struct pollfd> fds{ { this->efd, POLLIN, 0 }, { sockfd, POLLIN, 0 } };
    
    std::unordered_map<int, ConnectionHandler> handlers;

    while (true) {
        const int TIMEOUT = 1000;   // 1000 ms
        int n = poll(fds.data(), fds.size(), TIMEOUT);  // checking if there was any event on monitored file descriptors
        if (n == -1 && errno != ETIMEDOUT && errno != EINTR) {
            std::cout << "poll() => -1, errno=" << errno << std::endl;
            break;
        }
        
        // n pending events
        if (n > 0) {
            if (fds[0].revents) {   // handles server stop request (which is sent by TCPServer::stop())
                std::cout << "Received stop request" << std::endl;
                break;
            } else if (fds[1].revents) {    // new client connected
                // accepting connection
                int clientfd = accept(sockfd, NULL, NULL);
                std::cout << "New connection" << std::endl;
                if (clientfd != -1) {
                    // insert new pollfd to monitor
                    fds.push_back(pollfd{clientfd, POLLIN, 0});

                    // create ConnectionHandler object that will run in separate thread
                    handlers.emplace(clientfd, clientfd);
                } else {
                    std::cout << "accept => -1, errno=" << errno << std::endl;
                }

                // clearing revents
                fds[1].revents = 0;
            }
            
            // iterating all pollfds to check if anyone disconnected
            for (auto it = fds.begin()   2; it != fds.end(); ) {
                char c;
                if (it->revents 
                        && recv(it->fd, &c, 1, MSG_PEEK | MSG_DONTWAIT) == 0) { // checks if disconnected or just fd readable
                    std::cout << "Client disconnected" << std::endl;
                    close(it->fd);  // closing socket
                    handlers.at(it->fd).terminate();    // terminating ConnectionHandler thread
                    handlers.erase(it->fd);
                    it = fds.erase(it);
                } else {
                      it;
                }
            }
        }
    }

    // cleaning section after receiving stop request
    for (auto it = fds.begin()   1; it != fds.end(); it  ) {
        close(it->fd);
        if (handlers.find(it->fd) != handlers.end()) {
            handlers.at(it->fd).terminate();
        }
    }

    std::cout << "TCP server stopped" << std::endl;
}

ConnectionHandler.h

代码语言:javascript复制
#pragma once

#include <thread>

class ConnectionHandler {
private:
    std::thread m_thread;
    int fd = -1;
    bool m_terminate = false;

    std::string readMessage();
    void sendMessage(const std::string& msg);

    void stop();

public:
    explicit ConnectionHandler(int fd);
    ~ConnectionHandler();

    void terminate();
    void threadFunc();
};

ConnectionHandler.cpp

代码语言:javascript复制
#include "ConnectionHandler.h"
#include <cassert>
#include <sys/socket.h>
#include <errno.h>
#include <iostream>
#include <pthread.h>

ConnectionHandler::ConnectionHandler(int fd) : fd(fd) {
    assert(!this->m_thread.joinable());

    // creating thread that handles received messages
    this->m_thread = std::thread([this]( "this") { this->threadFunc(); });
    pthread_setname_np(this->m_thread.native_handle(), "ConnectionHandler");
}

ConnectionHandler::~ConnectionHandler() {
    this->stop();
}

void ConnectionHandler::stop() {
    if (this->m_thread.joinable()) {
        this->m_thread.join();
    }
}

void ConnectionHandler::threadFunc() {
    while (!this->m_terminate) {
        std::string msg = this->readMessage();
        std::cout << "Received message: " << msg << std::endl;
        this->sendMessage("Thank you for your message "   msg);
    }
}

std::string ConnectionHandler::readMessage() {
    std::string msg(1024, '');    // buffor with 1024 length which is filled with NULL character
    
    int readBytes = recv(this->fd, (void *)msg.data(), msg.size(), 0);
    if (readBytes < 1) {
        std::cout << "Error in readMessage, readBytes: " << readBytes << std::endl;
        return "";
    }

    return msg;
}

void ConnectionHandler::sendMessage(const std::string& msg) {
    int n = send(this->fd, msg.c_str(), msg.size(), 0);
    if (n != static_cast<int>(msg.size())) {
        std::cout << "Error while sending message, message size: " << msg.size() << " bytes sent: " << std::endl;
    }
}

void ConnectionHandler::terminate() {
    this->m_terminate = true;
}

main.cpp

代码语言:javascript复制
#include "TCPServer.h"

int main() 
{
    // create server - it starts automatically in constructor
    TCPServer server;
    // server.stop(); // we could stop the server this way
    // wait for server thread to end
    server.join();
    return 0;
}

怎么可以做得更好?

  • 用一个线程池来处理客户端连接。 线程创建代价高昂,因此我们希望避免它
  • 用一个 SIGINT (ctrl-c) 处理程序调用 main.cpp 中的 TCPServer::stop()

附录:

1、代码: https://github.com/leoay/Notes/tree/master/一个C++多线程TCP服务Demo/code

2、原文:https://www.codeer.dev/blog/2020/07/21/cpp-multithreaded-tcp-server.html

0 人点赞