《Linux高性能服务器编程》学习小结(1)

2021-02-01 11:13:12 浏览数 (2)

TCP客户端

代码语言:javascript复制
// 定义 _GNU_SOURCE 是为了获得对 EPOLLRDHUP 即 TCP链接被对方关闭或者对方关闭了写操作的 这一事件类型的支持
#define _GNU_SOURCE 1
// 下面是系统调用需要依赖的头文件
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <libgen.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <stdbool.h>

// 最大读缓冲区大小
#define BUFFER_SIZE 64

// int setnonblocking(int fd) {
//     int old_option = fcntl(fd, F_GETFL);
//     int new_option = old_option | O_NONBLOCK;
//     fcntl(fd, F_SETFL, new_option);
//     return old_option;
// }

// 给epoll注册需要关注的fd以及对应的事件
void addfd(int epollfd, int fd, bool hup) {
    struct epoll_event event;
    event.data.fd = fd;
    event.events |= EPOLLIN;
    if (hup) {
        event.events |= EPOLLRDHUP;
    }
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    // setnonblocking(fd);
}

int main(int argc, char* argv[]) {
    if (argc <= 2) {
        printf("usage: %s ip_address port_numbern", basename(argv[0]));
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi(argv[2]);

    // 设置服务器地址 以及将ip和端口字段的存储字节序转化成网络序
    struct sockaddr_in server_address;
    bzero(&server_address, sizeof(server_address));
    server_address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &server_address.sin_addr);
    server_address.sin_port = htons(port);

    // 创建连接socket
    int sockfd = socket(PF_INET, SOCK_STREAM, 0);
    assert(sockfd >= 0);
    // 同步阻塞connect直到成功或者失败
    if (connect(sockfd, (struct sockaddr*) &server_address, sizeof(server_address)) < 0) {
        printf("connecttion failedn");
        close(sockfd);
        return 1;
    }

    struct epoll_event events[2];
    // 创建epoll文件描述符 参数在2.6之后的版本已经不需要了,内核会自动调整这个最大的关注文件描述符数量,目前要写大于0是为了兼容之前的内核版本
    int epollfd = epoll_create(2);
    assert(epollfd != -1);

    // 注册标准输入文件描述符
    addfd(epollfd, 0, false);
    // 注册连接socket文件描述符
    addfd(epollfd, sockfd, true);

    // 程序空间用来存储socket上从服务器返回的数据的读缓冲区
    char read_buf[BUFFER_SIZE];
    // 给splice函数使用的管道
    int pipefd[2];
    int ret = pipe(pipefd);
    assert(ret != -1);

    // 服务器关闭了连接的flag
    bool ended = false;

    while(1) {
        // 等待所关注的2个文件描述符上的事件触发 会同步阻塞在这里等待
        int number = epoll_wait(epollfd, events, 2, -1);
        // 程序收到信号后会终止如epoll_wait这样的系统调用,且不会自动重启被中断的系统调用,这时需要我们自己判断number的返回值(-1)且errno等于 EINTR,我们自己来让程序继续执行等待的逻辑,如果这个错误不是被信号打断引起的,则按出错处理   
        if (number < 0) {
            if (number < 0 && errno != EINTR) {
                printf("epoll failturen");
                break;
            }
        }

        // number指示这轮wait得到的结果中所有有响应的文件描述符
        for(int i=0;i<number;  i) {
            int active_sockfd = events[i].data.fd;
            if (active_sockfd == 0 && (events[i].events & EPOLLIN)) {
                // 标准输入有数据可以读(用户输入了数据)
                // splice函数把 文件描述符0 中的数据移动到 文件描述符 pipefd[1] 中,不在程序空间中拷贝一次
                ret = splice(0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE);
                // splice函数把 pipefd[0] 中的数据移动到 文件描述符 sockfd 中,不在程序空间中拷贝一次
                // 实现了把用户输入的内容直接发送给socket
                ret = splice(pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE);
            } else if (active_sockfd == sockfd && (events[i].events & EPOLLIN)) {
                // socket上有数据可读 (服务器发来了数据)
                memset(read_buf, '', BUFFER_SIZE);
                // 同步阻塞读取数据 由于读取完的字符最后需要以  字符结尾,导致实际可存储的字节数为 BUFFER_SIZE - 1
                ret = recv(sockfd, read_buf, BUFFER_SIZE - 1, 0);
                // 根据读取到的字节数量进行判断 ret == 0 则是判断对方关闭了连接的标准之一
                if (ret > 0) {
                    printf("%sn", read_buf);
                } else if (ret == 0) {
                    // 服务器关闭了连接
                    printf("server close the connectionn");
                    ended = true;
                    break;
                }
            }
        }

        if (ended) {
            break;
        }
    }

    // 关闭自己打开的所有文件描述符
    close(sockfd);
    close(epollfd);
    return 0;
}

TCP压测工具

代码语言:javascript复制
#include <stdlib.h>
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#include <stdbool.h>

// 每次socket可写的时候都把这个信息发送给服务器端 当然 目标服务器必须支持http协议解析
static const char* request = "GET http://localhost/index.html HTTP/1.1rnConnection: keep-alivernrnxxxxxxxxxxxx";

// 设置文件描述符为非阻塞 保证 EPOLLET类型的读事件不会在数据读取完之后由于再次调用read而导致一直阻塞
int setnonblocking(int fd) {
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}

// 注册关注的文件描述符
void addfd(int epoll_fd, int fd) {
    struct epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLOUT | EPOLLET | EPOLLERR;
    epoll_ctl( epoll_fd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );
}

// 把buffer中的字符一次性写入fd的发送缓冲区 等待内核完成发送
bool write_nbytes(int fd, const char* buffer, int len) {
    int bytes_write = 0;
    printf("write out %d bytes to socket %dn", len, fd);
    while(1) {
        bytes_write = send(fd, buffer, len, 0);
        if (bytes_write == -1) {
            return false;
        }
        else if (bytes_write == 0) {
            return false;
        }

        len -= bytes_write;
        buffer = buffer   bytes_write;
        // 写入完毕
        if (len <= 0) {
            return true;
        }
    }
}

// 只读取一次socket中的数据 由于只是压测 不是处理业务 只要读到了正常数据就可以了
bool read_once(int sockfd, char* buffer, int len) {
    int bytes_read = 0;
    memset(buffer, '', len);
    bytes_read = recv(sockfd, buffer, len, 0);
    if (bytes_read == -1)
    {
        return false;
    }
    else if (bytes_read == 0) {
        return false;
    }
    printf("read in %d bytes from socket %d with content:n%sn", bytes_read, sockfd, buffer);

    return true;
}


// 建立num数量的tcp连接
void start_conn(int epoll_fd, int num, const char* ip, int port) {
    int ret = 0;
    struct sockaddr_in address;
    bzero(&address, sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &address.sin_addr);
    address.sin_port = htons(port);

    for(int i=0;i<num;  i) {
        sleep(1);
        int sockfd = socket(PF_INET, SOCK_STREAM, 0);
        printf("create 1 sockn");
        if (sockfd < 0) {
            continue;
        }
        // 同步阻塞等待连接成功
        if (connect(sockfd, (struct sockaddr*) &address, sizeof(address)) == 0) {
            printf("build connection %dn", i);
            // 注册这个sockfd到epollfd中 由于addfd函数是注册可写事件 此时的sockfd会触发epollout可写事件 驱动程序去发送数据给服务器
            addfd(epoll_fd, sockfd);
        } else {
            printf("build connection %d failedn", i);
            close(sockfd);
        }
    }
}

// 关闭连接 移除文件描述符
void close_conn(int epoll_fd, int sockfd) {
    epoll_ctl(epoll_fd, EPOLL_CTL_DEL, sockfd, 0);
    close(sockfd);
}

int main(int argc, char* argv[]) {
    assert(argc == 4);
    int epoll_fd = epoll_create(100);
    // 创建指定 argv[3] 数量 argv[1] ip argv[2] 端口 的连接
    start_conn(epoll_fd, atoi(argv[3]), argv[1], atoi(argv[2]));
    struct epoll_event events[10000];
    char buffer[2048];
    while(1) {
        int fds = epoll_wait(epoll_fd, events, 10000, 2000);
        for(int i=0;i<fds;  i) {
            int sockfd = events[i].data.fd;
            if (events[i].events & EPOLLIN) {
                if (!read_once(sockfd, buffer, 2048)) {
                    close_conn(epoll_fd, sockfd);
                }
                // return 0;
                // 读完数据后设置socket的可写事件 驱动epoll_wait下一轮响应epollout事件
                struct epoll_event event;
                event.events = EPOLLOUT | EPOLLET | EPOLLERR;
                event.data.fd = sockfd;
                epoll_ctl(epoll_fd, EPOLL_CTL_MOD, sockfd, &event);
            }
            else if (events[i].events & EPOLLOUT) {
                if (!write_nbytes(sockfd, request, strlen(request))) {
                    close_conn(epoll_fd, sockfd);
                }
                // 读完数据后设置socket的可读事件 驱动epoll_wait下一轮响应epollin事件
                struct epoll_event event;
                event.events = EPOLLIN | EPOLLET | EPOLLERR;
                event.data.fd = sockfd;
                epoll_ctl(epoll_fd, EPOLL_CTL_MOD, sockfd, &event);
            }
            else if (events[i].events & EPOLLERR) {
                close_conn(epoll_fd, sockfd);
            }
        }
    }

    close(epoll_fd);
    return 0;
}

TCP聊天服务器(IO复用)

代码语言:javascript复制
#define _GNU_SOURCE 1
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <libgen.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <stdbool.h>

// 用户连接的最大数量
#define USER_LIMIT 5
// 存储用户发来的数据缓冲区最大长度
#define BUFFER_SIZE 64
// 根据socket值来定位用户数据client_data对象的users数组的最大长度
#define FD_LIMIT 65535

// 客户数据结构体 用来保存我们需要维持的一些客户端数据
struct client_data {
    int fds_index;
    struct sockaddr_in address;
    char* write_buf;
    char buf[BUFFER_SIZE];
};

// 每个实际的客户所分配的socket文件描述符
struct client_fd {
    int fd;
};

// 设置fd非阻塞
int setnonblocking(int fd) {
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
    return old_option;
}

// 添加fd到epollfd
void addfd(int epollfd, int fd, bool hup) {
    struct epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLERR;
    if (hup) {
        event.events |= EPOLLRDHUP;
    }
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    setnonblocking(fd);
}

int main(int argc, char* argv[]) {
    if (argc < 2) {
        printf("usage: %s  ip_address port_numbern", basename(argv[0]));
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi(argv[2]);

    int ret = 0;
    struct sockaddr_in address;
    bzero(&address, sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &address.sin_addr);
    address.sin_port = htons(port);

    int listenfd = socket(PF_INET, SOCK_STREAM, 0);
    assert(listenfd >= 0);

    // 设置套接字选项避免地址使用错误  快速重启服务器代码的时候由于socket存在time_wait阶段导致bind函数返回失败,所以需要在开发的时候设置这个属性
    int on=1;
    ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
    assert(ret != -1);
    
    ret = bind(listenfd, (struct sockaddr*) &address, sizeof(address));
    assert(ret != -1);

    ret = listen(listenfd, 5);
    assert(ret != - 1);

    // 用户数据结构数组声明
    struct client_data* users = (struct client_data*)malloc(sizeof(struct client_data)*FD_LIMIT);

    // 连接的用户fds
    struct client_fd fds[USER_LIMIT   1];
    struct epoll_event events[USER_LIMIT   1];
    // 当前用户数量
    int user_count = 0;

    int epollfd = epoll_create(USER_LIMIT   1);
    assert(epollfd != -1);

    addfd(epollfd, listenfd, false);

    while(1) {
        int number = epoll_wait(epollfd, events, USER_LIMIT   1, -1);
        if (number < 0) {
            if (number < 0 && errno != EINTR) {
                printf("epoll failturen");
                break;
            }
        }

        for(int i=0;i<number;  i) {
            int sockfd = events[i].data.fd;
            if (sockfd == listenfd) {
                // 有个新的完全连接状态的连接到了
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof(client_address);
                int connfd = accept(listenfd, (struct sockaddr*) &client_address, &client_addrlength);
                if (connfd < 0) {
                    printf("errno is: %dn", errno);
                    continue;
                }
                if (user_count >= USER_LIMIT) {
                    // 客户数量溢出
                    const char* info = "too many usersn";
                    printf("%s", info);
                    send(connfd, info, strlen(info), 0);
                    close(connfd);
                    continue;
                }
                user_count  ;
                users[connfd].address = client_address;
                users[connfd].fds_index = user_count;
                addfd(epollfd, connfd, true);
                fds[user_count].fd = connfd;
                printf("comes a new user, now have %d usersn", user_count);
            }
            else if (events[i].events & EPOLLERR) {
                printf("get an error from %dn", events[i].data.fd);
                char errors[100];
                memset(errors, '', 100);
                socklen_t length = sizeof(errors);
                // 获取并清除socket的错误状态
                if (getsockopt(events[i].data.fd, SOL_SOCKET, SO_ERROR, &errors, &length) < 0) {
                    printf("get socket option failedn");
                }
                continue;
            }
            else if (events[i].events & EPOLLRDHUP) {
                // 连接关闭事件 这个与EPOLLIN会同时触发 所以要写在前面
                // 把当前离开的client的数据替换为fds中最后一个user_count所指对象 就算此时 sockfd就是最后一个也没关系 user_count-- 之后这个数据就无效了
                fds[users[sockfd].fds_index] = fds[user_count];
                users[sockfd] = users[fds[user_count].fd];
                close(sockfd);
                user_count--;
                printf("a client leftn");
            }
            else if (events[i].events & EPOLLIN) {
                // 某个用户发来了消息
                int connfd = events[i].data.fd;
                memset(users[connfd].buf, '', BUFFER_SIZE);
                // 读取用户发送来的内容
                ret = recv(connfd, users[connfd].buf, BUFFER_SIZE-1, 0);
                printf("get %d bytes of client data %s from %dn", ret, users[connfd].buf, connfd);

                if (ret < 0) {
                    if (errno != EAGAIN) {
                        close(connfd);
                        fds[users[connfd].fds_index] = fds[user_count];
                        users[connfd] = users[fds[user_count].fd];
                        user_count--;
                    }
                }
                else if (ret == 0) {
                    // 上面处理过了
                } else {
                    // 群发给所有用户 除了它自己
                    for(int j=1;j<=user_count;  j) {
                        if (fds[j].fd== connfd) {
                            continue;
                        } else {
                            // 触发其他用户socket的可写事件
                            struct epoll_event event;
                            event.data.fd = fds[j].fd;
                            event.events = EPOLLERR | EPOLLRDHUP;
                            event.events |= EPOLLOUT;
                            epoll_ctl(epollfd, EPOLL_CTL_MOD, fds[j].fd, &event);
                            // 其他用户的写缓冲区数据来源指向 当前发消息来的用户buf
                            users[fds[j].fd].write_buf = users[connfd].buf;
                        }
                    }
                }
            }
            else if (events[i].events & EPOLLOUT) {
                // 被群发的用户的可写事件触发了
                int connfd = events[i].data.fd;
                if (!users[connfd].write_buf) {
                    continue;
                }
                // 发送被转发的数据给群聊中的用户
                ret = send(connfd, users[connfd].write_buf, strlen(users[connfd].write_buf), 0);
                // 发送完后重置
                users[connfd].write_buf = NULL;
                // 重新注册可读事件 在发的过程中屏蔽当前socket上的读事件
                struct epoll_event event;
                event.data.fd = connfd;
                event.events = EPOLLERR | EPOLLRDHUP;
                event.events |= EPOLLIN;
                epoll_ctl(epollfd, EPOLL_CTL_MOD, connfd, &event);
            }
            else {
                printf("something else happend n");
            }
        }
    }

    free(users);
    close(listenfd);
    close(epollfd);
    return 0;
}

TCP聊天服务器(多进程)

代码语言:javascript复制
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <stdbool.h>

// 最大用户数量
#define USER_LIMIT 5
// 用户发送来的数据存储的缓冲区大小
#define BUFFER_SIZE 1024
// 最大可接受的socket数量
#define FD_LIMIT 65535
// 每个进程最多可关注的事件数量
#define MAX_EVENT_NUMBER 1024
// 最大可创建的子进程数量
#define PROCESS_LIMIT 65536

// 客户端数据结构
struct client_data
{
	struct sockaddr_in address;
	int connfd;
	pid_t pid;
	// 这对管道是用来fork之后子进程保持跟父进程保持通信的渠道
	int pipefd[2];
};

// 共享内存文件的名字
static const char* shm_name = "/my_shm";
// 信号管道 用来统一进程的事件来源(socket和信息)
int sig_pipefd[2];
// 3个文件描述符
int epollfd;
int listenfd;
int shmfd;
// 共享内存的起始地址 作为进程间共享数据
char* share_men = 0;
// users数组地址
struct client_data *users = 0;
// 子进程数组地址
int* sub_process = 0;
// 用户数量
int user_count = 0;
// 停止子进程flag
bool stop_child = false;

// 以上所有数据在fork之后都会被复制到新的进程中

// 非阻塞
int setnonblocking(int fd)
{
	int old_option = fcntl(fd, F_GETFL);
	int new_option = old_option | O_NONBLOCK;
	fcntl(fd, F_SETFL, new_option);
	return old_option;
}

// 注册事件
void addfd(int epollfd, int fd)
{
	struct epoll_event event;
	event.data.fd = fd;
	event.events = EPOLLIN | EPOLLET;
	epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
	setnonblocking(fd);
}

// 信号事件处理函数
void sig_handler(int sig)
{
	int save_errno = errno;
	int msg = sig;
	// 往信号管道中写入数据 触发管道另一端的可读事件 从而触发epoll_wait 统一事件源
	send(sig_pipefd[1], (char *)&msg, 1, 0);
	errno = save_errno;
}

// 添加信号处理函数
void addsig(int sig, void (*handler)(int), bool restart)
{
	struct sigaction sa;
	memset(&sa, '', sizeof(sa));
	sa.sa_handler = handler;
	// 设置系统重新调用被该信号终止的系统调用 例如epoll_wait被EINTER信号打断
	if (restart)
	{
		sa.sa_flags |= SA_RESTART;
	}
	// 在信号函数处理过程中 不处理其他任何信号包括当前正在处理的信号 避免再次调用自己
	sigfillset(&sa.sa_mask);
	assert(sigaction(sig, &sa, NULL) != -1);
}

// 关闭文件描述符 释放申请的资源
void del_resource()
{
	close(sig_pipefd[0]);
	close(sig_pipefd[1]);
	close(listenfd);
	close(epollfd);
	shm_unlink(shm_name);
	free(users);
	free(sub_process);
}

// 子进程的信号处理函数
void child_term_handler(int sig)
{
	stop_child = true;
}

// 子进程的主体运行逻辑
int run_child(int idx, struct client_data *users, char *share_men)
{
	// 子进程拥有自己的epoll文件描述符 用来监听连接socket以及自己的通信管道
	struct epoll_event events[MAX_EVENT_NUMBER];
	int child_epollfd = epoll_create(5);
	assert(child_epollfd != -1);
	int connfd = users[idx].connfd;
	// 监听socket
	addfd(child_epollfd, connfd);
	int pipefd = users[idx].pipefd[1];
	// 监听通信管道
	addfd(child_epollfd, pipefd);
	int ret;
	// 添加终止信号处理函数
	addsig(SIGTERM, child_term_handler, false);

	while(!stop_child) {
		int number = epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1);
		if (number < 0 && errno != EINTR) {
			printf("epoll failturen");
			break;
		}

		for(int i=0;i<number;  i) {
			int sockfd = events[i].data.fd;
			if (sockfd == connfd && events[i].events & EPOLLIN) {
				// 客户通过socket发来了数据 准确写入共享内存中属于idx的那一段区域
				memset(share_men   idx*BUFFER_SIZE, '', BUFFER_SIZE);
				ret = recv(connfd, share_men   idx*BUFFER_SIZE, BUFFER_SIZE - 1, 0);
				if (ret < 0) {
					if (errno != EAGAIN) {
						stop_child = true;
					}
				}
				else if (ret == 0) {
					// 对方关闭了socket那我们就终止这个子进程 由父进程回收就好了
					stop_child = true;
				}
				else {
					// 接收到了数据 通知父进程 转发数据给其他user
					send(pipefd, (char*) &idx, sizeof(idx), 0);
				}
			}
			else if (sockfd == pipefd && events[i].events & EPOLLIN) {
				// 父进程通过通信管道通知当前子进程可以转发消息了 client就是发来的那个需要转发数据来源的user的idx
				int client = 0;
				ret = recv(sockfd, (char*) &client, sizeof(client), 0);
				if (ret < 0) {
					if (errno != EAGAIN) {
						stop_child = true;
					}
				}
				else if (ret == 0) {
					stop_child = true;
				}
				else {
					// 每个进程去读client的内存区域然后直接发送给socket实现消息转发 无信息在用户空间中拷贝
					send(connfd, share_men   client*BUFFER_SIZE, BUFFER_SIZE, 0);
				}
			} else {
				continue;
			}
		}
	}

	// 关闭资源
	close(connfd);
	close(pipefd);
	close(child_epollfd);
	return 0;
}


// 父进程的主体运行逻辑
int main(int argc, char *argv[])
{
	if (argc < 2) {
        printf("usage: %s  ip_address port_numbern", basename(argv[0]));
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi(argv[2]);

	int ret = 0;
	struct sockaddr_in address;
	bzero(&address, sizeof(address));
	address.sin_family = AF_INET;
	inet_pton(AF_INET, ip, &address.sin_addr);
	address.sin_port = htons(port);

	listenfd = socket(PF_INET, SOCK_STREAM, 0);
	assert(listenfd >= 0);

	int reuse = 1;
    ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR,(const void *)&reuse , sizeof(int));
	assert(ret != -1);

	ret = bind(listenfd, (struct sockaddr*) &address, sizeof(address));
	assert(ret != -1);

	ret = listen(listenfd, 5);
	assert(ret != -1);

	user_count = 0;
	users = (struct client_data*)malloc(sizeof(struct client_data)*(USER_LIMIT   1));
	// 子进程数组 每个项是一个文件描述符的值
	sub_process = (int *)malloc(sizeof(int)*PROCESS_LIMIT);
	for(int i=0;i<PROCESS_LIMIT;  i) {
		sub_process[i] = -1;
	}

	struct epoll_event events[MAX_EVENT_NUMBER];
	epollfd = epoll_create(5);
	assert(epollfd != -1);
	// 监听listen socket
	addfd(epollfd, listenfd);

	ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd);
	assert(ret != -1);
	setnonblocking(sig_pipefd[1]);
	// 监听信号管道的可读一端
	addfd(epollfd, sig_pipefd[0]);

	// 子进程退出或者暂停时触发
	addsig(SIGCHLD, sig_handler, true); 
	// 进程终止信号 kill命令发送的就是这个信号
	addsig(SIGTERM, sig_handler, true); 
	// 键盘输入ctrl C中断进程触发
	addsig(SIGINT, sig_handler, true); 
	// 往读端被关闭的管道或者socket中写数据时触发
	addsig(SIGPIPE, sig_handler, true); 

	bool stop_server = false;
	bool terminate = false;

	// 创建共享内存文件
	shmfd = shm_open(shm_name, 	O_CREAT | O_RDWR, 0666);
	assert(shmfd != -1);
	// 设置文件大小
	ret = ftruncate(shmfd, USER_LIMIT * BUFFER_SIZE);
	assert(ret != -1);

	// 申请一块具有读写权限的共享内存 它的内容就是shmfd文件描述符指向的内容的映射 MAP_SHARED标识这是进程间共享的
	share_men = (char*)mmap(NULL, USER_LIMIT*BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0);
	assert(share_men != MAP_FAILED);
	// 得到内容后就可以关闭这个文件描述符了
	close(shmfd);

	while(!stop_server) {
		// 主进程等到 listen socket有新的连接到来 以及 信号管道有信号到来 以及 某个子进程通过管道写入了一个信号数据表示需要主进程通知其他子进程转发消息了
		int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
		if ((number < 0) && (errno != EINTR)) {
			printf("epoll failturen");
			break;
		}

		for(int i=0;i<number;  i) {
			int sockfd = events[i].data.fd;
			if (sockfd == listenfd) {
				struct sockaddr_in client_address;
				socklen_t client_addrlength = sizeof(client_address);
				int connfd = accept(listenfd, (struct sockaddr*) &client_address, &client_addrlength);
				if (connfd < 0) {
					printf("errno is: %dn", errno);
					continue;
				}
				if (user_count >= USER_LIMIT) {
					const char* info = "too many usersn";
					printf("%s", info);
					send(connfd, info, strlen(info), 0);
					close(connfd);
					continue;
				}
				users[user_count].address = client_address;
				users[user_count].connfd = connfd;
				// 创建一对双向管道 这是父子进程用来保持通信的渠道
				ret = socketpair(PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd);
				assert(ret != -1);
				// 在这里父子进程各自执行自己的逻辑
				pid_t pid = fork();
				if (pid < 0) {
					close(connfd);
					continue;
				}
				else if (pid == 0) {
					// 子进程逻辑
					// 关闭继承自父进程的epoll文件描述符
					close(epollfd);
					// 关闭继承自父进程的listen socket文件描述符
					close(listenfd);
					// 关闭继承自父进程的双向管道中的一端
					close(users[user_count].pipefd[0]);
					// 关闭继承而来的信号管道
					close(sig_pipefd[0]);
					close(sig_pipefd[1]);
					// 执行子进程的主逻辑 同步阻塞到run_child的等待事件中 返回的时候意味着这个子进程已经终止了 可以进行资源回收了
					run_child(user_count, users, share_men);
					// 由于继承了父进程中打开的这个共享内存 需要在子进程中关闭子进程对它的引用
					munmap((void*)share_men, USER_LIMIT*BUFFER_SIZE);
					// 子进程退出
					exit(0);
				} else {
					// 父进程逻辑
					// 关闭连接socket
					close(connfd);
					// 关闭通信管道中的另一端
					close(users[user_count].pipefd[1]);
					// 注册监听剩下的管道一端的可读事件
					addfd(epollfd, users[user_count].pipefd[0]);
					// 父进程维持users数组 在新连接到来以及有连接退出的时候 动态修改users数组
					users[user_count].pid = pid;
					// 父进程维持子进程数组 在有进程退出的时候动态修改它
					sub_process[pid] = user_count;
					user_count  ;
				}
			}
			else if ((sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN)) {
				// 信号管道中有信号到达了
				int sig;
				// 每个信号值都是一个字节大小 用char类型刚好
				char signals[1024];
				ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
				if (ret == -1) {
					continue;
				}
				else if (ret == 0) {
					continue;
				}
				else {
					for(int i=0;i<ret;  i) {
						switch(signals[i]) {
							case SIGCHLD:
							{
								// 处理某个子进程退出了的情况
								pid_t pid;
								int stat;
								while((pid = waitpid(-1, &stat, WNOHANG)) > 0) {
									// 根据进程id找到用户数据在users数组中的索引
									int del_user = sub_process[pid];
									sub_process[pid] = -1;
									if (del_user < 0 || del_user > user_count) {
										continue;
									}
									// 移除epoll对这个子进程的通信管道的监听事件
									epoll_ctl(epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0);
									// 关闭它
									close(users[del_user].pipefd[0]);
									// user_count比实际最后个user要大一 所以是--user_count
									users[del_user] = users[--user_count];
									// 更新进程数组中被删除位置所放入新进程对应的user索引
									sub_process[users[del_user].pid] = del_user;
								}
								// 如果收到了终止信号且用户数为0 就终止主进程了
								if (terminate && user_count == 0) {
									stop_server = true;
								}
								break;
							}
							case SIGTERM:
							case SIGINT:
							{
								// 父进程收到终止信号 开始杀死所有子进程
								printf("kill all the child nown");
								if (user_count == 0) {
									stop_server = true;
									break;
								}
								for(int i=0;i<user_count;  i) {
									int pid = users[i].pid;
									// 发送终止信号给各个子进程
									kill(pid, SIGTERM);
								}
								terminate = true;
								break;
							}
							default:
							{
								break;
							}
						}
					}
				}
			}
			else if (events[i].events & EPOLLIN) {
				// 子进程通过通信管道写入了数据 表示需要主进程通知其他子进程转发消息了
				int child = 0;
				ret = recv(sockfd, (char*) &child, sizeof(child), 0);
				printf("read data from child accross pipen");
				if (ret == -1) {
					continue;
				}
				else if (ret == 0) {
					continue;
				}
				else {
					for(int j=0;j<user_count;  j) {
						if (users[j].pipefd[0] != sockfd) {
							printf("send data to child accross pipen");
							// 主进程通过向通信管道写入数据来通知子进程可以转发了
							send(users[j].pipefd[0], (char*) &child, sizeof(child), 0);
						}
					}
				}
			}
		}
	}

	// 父进程清理资源
	del_resource();
	return 0;
}

对了,由于使用了共享内存, 编译的时候记得在最后加上

代码语言:javascript复制
gcc -g multiple_process_server.c -lrt

最后,再次感谢 游双 大佬的《Linux高性能服务器编程》。

0 人点赞