代码仓库在文末,编译通过,请放心食用!
无界AI生成
本文介绍了如何在 C 中为 Linux 环境实现并发 TCP/IP 服务器。 多线程在我的解决方案中提供并发性。 由于并发性,客户不必等待轮到他们,可以立即得到服务。 我创建的服务器有一个线程来处理新连接(TCPServer 类)。 接受这样的连接后,将创建一个新线程,负责与给定客户端(ConnectionHandler 类)的所有通信。 ConnectionHandler 的实现可以自由更改。 它可以允许对服务器的任何使用,例如它可以很好地用作 HTTP 服务器。
TCPServer.h
代码语言:javascript复制#pragma once
#include <thread>
class TCPServer {
public:
static const int PORT = 1234;
// starts server
void start();
// stops server
void stop();
// joining server thread - waits for server to end
void join();
// constructor automatically starts server thread
TCPServer();
~TCPServer();
private:
// event file descriptor - it will be used to tell server to stop
int efd;
// server thread
std::thread m_thread;
// it works in server thread
void threadFunc();
};
服务器在构造 TCPServer 对象后启动。 创建一个运行 threadFunc() 方法的新线程。 TCPServer::start() 还创建了 eventfd,用于通知服务器停止处理。 threadFunc() 创建一个套接字并将其绑定到指定的端口(在本例中为 1234)。 poll() 用于监视是否有任何打开的文件描述符准备好执行 I/O。 在侦听套接字上接收到 I/O 事件后,将构建新的 ConnectionHandler 对象,该对象在单独的线程上运行。
TCPServer.cpp
代码语言:javascript复制#include "TCPServer.h"
#include <unistd.h>
#include <poll.h>
#include <sys/eventfd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <cassert>
#include <vector>
#include <unordered_map>
#include "ConnectionHandler.h"
#include <iostream>
TCPServer::TCPServer()
: efd(-1) {
this->start();
}
TCPServer::~TCPServer() {
this->stop();
if (this->efd != -1){
close(this->efd);
}
}
void TCPServer::start() {
assert(!this->m_thread.joinable());
if (this->efd != -1) {
close(this->efd);
}
this->efd = eventfd(0, 0);
if (this->efd == -1) {
std::cout << "eventfd() => -1, errno=" << errno << std::endl;
return;
}
// creates thread
this->m_thread = std::thread([this]( "this") { this->threadFunc(); });
// sets name for thread
pthread_setname_np(this->m_thread.native_handle(), "TCPServer");
}
void TCPServer::stop() {
// writes to efd - it will be handled as stopping server thread
uint64_t one = 1;
auto ret = write(this->efd, &one, 8);
if (ret == -1) {
std::cout << "write => -1, errno=" << errno << std::endl;
}
}
void TCPServer::join() {
if (this->m_thread.joinable()) {
this->m_thread.join();
}
}
void TCPServer::threadFunc() {
int sockfd;
std::cout << "Listen on: " << PORT << std::endl;
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
std::cout << "socket() => -1, errno=" << errno << std::endl;
return;
}
int reuseaddr = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(int)) == -1) {
std::cout << "setsockopt() => -1, errno=" << errno << std::endl;
}
struct sockaddr_in servaddr = {0, 0, 0, 0};
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = INADDR_ANY;
servaddr.sin_port = htons(PORT);
// binding to socket that will listen for new connections
if (bind(sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr)) == -1) {
std::cout << "bind() => -1, errno=" << errno << std::endl;
close(sockfd);
return;
}
// started listening, 50 pending connections can wait in a queue
listen(sockfd, 50);
// monitored file descriptors - at start there is efd and just created sockfd. POLLIN means we wait for data to read
std::vector<struct pollfd> fds{ { this->efd, POLLIN, 0 }, { sockfd, POLLIN, 0 } };
std::unordered_map<int, ConnectionHandler> handlers;
while (true) {
const int TIMEOUT = 1000; // 1000 ms
int n = poll(fds.data(), fds.size(), TIMEOUT); // checking if there was any event on monitored file descriptors
if (n == -1 && errno != ETIMEDOUT && errno != EINTR) {
std::cout << "poll() => -1, errno=" << errno << std::endl;
break;
}
// n pending events
if (n > 0) {
if (fds[0].revents) { // handles server stop request (which is sent by TCPServer::stop())
std::cout << "Received stop request" << std::endl;
break;
} else if (fds[1].revents) { // new client connected
// accepting connection
int clientfd = accept(sockfd, NULL, NULL);
std::cout << "New connection" << std::endl;
if (clientfd != -1) {
// insert new pollfd to monitor
fds.push_back(pollfd{clientfd, POLLIN, 0});
// create ConnectionHandler object that will run in separate thread
handlers.emplace(clientfd, clientfd);
} else {
std::cout << "accept => -1, errno=" << errno << std::endl;
}
// clearing revents
fds[1].revents = 0;
}
// iterating all pollfds to check if anyone disconnected
for (auto it = fds.begin() 2; it != fds.end(); ) {
char c;
if (it->revents
&& recv(it->fd, &c, 1, MSG_PEEK | MSG_DONTWAIT) == 0) { // checks if disconnected or just fd readable
std::cout << "Client disconnected" << std::endl;
close(it->fd); // closing socket
handlers.at(it->fd).terminate(); // terminating ConnectionHandler thread
handlers.erase(it->fd);
it = fds.erase(it);
} else {
it;
}
}
}
}
// cleaning section after receiving stop request
for (auto it = fds.begin() 1; it != fds.end(); it ) {
close(it->fd);
if (handlers.find(it->fd) != handlers.end()) {
handlers.at(it->fd).terminate();
}
}
std::cout << "TCP server stopped" << std::endl;
}
ConnectionHandler.h
代码语言:javascript复制#pragma once
#include <thread>
class ConnectionHandler {
private:
std::thread m_thread;
int fd = -1;
bool m_terminate = false;
std::string readMessage();
void sendMessage(const std::string& msg);
void stop();
public:
explicit ConnectionHandler(int fd);
~ConnectionHandler();
void terminate();
void threadFunc();
};
ConnectionHandler.cpp
代码语言:javascript复制#include "ConnectionHandler.h"
#include <cassert>
#include <sys/socket.h>
#include <errno.h>
#include <iostream>
#include <pthread.h>
ConnectionHandler::ConnectionHandler(int fd) : fd(fd) {
assert(!this->m_thread.joinable());
// creating thread that handles received messages
this->m_thread = std::thread([this]( "this") { this->threadFunc(); });
pthread_setname_np(this->m_thread.native_handle(), "ConnectionHandler");
}
ConnectionHandler::~ConnectionHandler() {
this->stop();
}
void ConnectionHandler::stop() {
if (this->m_thread.joinable()) {
this->m_thread.join();
}
}
void ConnectionHandler::threadFunc() {
while (!this->m_terminate) {
std::string msg = this->readMessage();
std::cout << "Received message: " << msg << std::endl;
this->sendMessage("Thank you for your message " msg);
}
}
std::string ConnectionHandler::readMessage() {
std::string msg(1024, '