I/O 多路复用, select, poll, epoll

2024-05-16 23:08:01 浏览数 (5)

简述 I/O

I/O 是应用程序必然逃不掉的一个话题。大家在计算机基础学习中,学过计组,操作系统和计网,而想要把 I/O 研究深入肯定要将对这三个计算机基础方面有所深入。

我们谈及高性能的时候逃不掉的五个角度就是 cpu,内存,操作系统,网络带宽和应用程序。当然我们平时最大,当然几乎所有性能方面的分析都离不开这五个层面的分析,cpu 层面的 numa 架构,三级缓存,多核并发。内存层面的内存应用,垃圾回收,内存泄漏等等,当然,在大数据领域磁盘的 I/O 成为了一个很重要指标,但是本质上这不是软件设计领域问题,而是现实工程领域的问题。网络带宽较为简单,带宽大小,基本脱离软件领域设计硬件层面,而操作系统涉及 I/O 就必然离不开select,poll 和 epoll了。

从宏观角度,这三个其实都会设计阻塞,包括基于 epoll 设计的 reactor 模型也会涉及阻塞,当然 windows 有基于异步的 IOCP 模型。不过鉴于大多数应用在服务端还是基于 Linux ,所以不多讨论。

select

select 是通过将文件连接后所有的文件描述符放入一个集合中,当调用 select 函数会把所有文件描述符集合拷贝到内核,然后内核遍历整个集合,有事件发生时候,对这个 socket 进行标记,然后将集合拷贝到内存,这个文件描述符也拷贝到内存。

可以看到这个过程非常粗粝暴力,像极了我们做算法题的样子。

在开始代码之前,我觉得还是要不代码重复部分拿出来,调用系统 api 这样是有一定规范,也就是套路的,每次都写套路一部分多少会有重复。

代码语言:c复制
#define bind_port  8888
#define BUFFER_LENGTH 1024
#define POLL_SIZE     1024
#define EPOLL_SIZE    1024
#define addr_ip 127.0.0.1
int create_sockfd(int port, const char* ip )
{
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(sockfd < 0){
		perror("socketn");
		return -1;
	}
    struct addr_in addr;
    memset(&addr, 0 ,sizeof(struct addr_in));

    addr.sin_family = AF_INET;
	addr.sin_port = htons(port);
	addr.sin_addr.s_addr = ip;

    if(bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0){
		 perror("bind");
		 return 2;
	}

	if(listen(sockfd, 5) < 0){
		perror("listen");
		return 3;
	}
    return sockfd;

}

上述是一个函数常规操作,socket 生成文件描述符,绑定 ip,端口,监听。

select 函数实现如下:

代码语言:c复制
void handle_next(int sockfd){
    fd_set rfds, rset;             //声明文件描述符集合
    FD_ZERO(&rfds);         //初始化为 0
	FD_SET(sockfd, &rfds); //将sockfd 添加进去

	int max_fd = sockfd;    //设置最大 sockfd
	int i = 0;
	while(1){
		rset = rfds;

	    int nready = select(max_fd   1, &rset, NULL, NULL, NULL);  //将文件集合拷贝到内核,然后内核返回发生事件文件描述符数量
	    if(nready < 0){ //没有事件发生
			printf("select error: %dn", errno);
			continue;
	    }

		if(FD_ISSET(sockfd, &rset)){  //sockfd 在设置的文件描述符集合中
			struct sockaddr_in client_addr;
		    memset(&client_addr, 0, sizeof(struct sockaddr_in));
		    socklen_t client_len = sizeof(client_addr);

			int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);  //接受数据
		    if(clientfd < 0) continue;//没接收到

			char str[INET_ADDRSTRLEN] = {0};
			printf("recvied from %s at port, sockfd: %d, clientfd:%d", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)), ntohs(client_addr.sin_port), sockfd, clientfd);

			if(max_fd == FD_SETSIZE){  //文件集合数超出最大,select 有限制最大 1024 个,因为使用bitmap 进行标记
				printf("clientfd --> out rangen");
				break;
			}
			FD_SET(clientfd, & rfds);  //将接收数据返回的fd加入文件描述符集合
			if(clientfd > max_fd) max_fd = clientfd;

			printf(" sockfd: %d, max_fd:%d, clientfd:%d",  sockfd, max_fd clientfd);

			if(--nready == 0) continue;  //先接收数据所以先减后比较
	
		}
			
		for(i = sockfd   1; i <= max_fd; i  ){ //没有在 rset 中说明fd 已经进行了 accept
			if(FD_SET(i, &rset)){

			char buffer[BUFFER_LENGTH] = {0};
		    int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0); //接受数据
			if(ret < 0){
				if(errno == EAGAIN || errno == EWOULDBLOCK){
					printf("read all data");
				}
				FD_CLR(i, &rfds);
				close(i);
			}else if(ret == 0){
				printf("disconnect %dn", i);
				FD_CLR(i, &rfds);
				close(i);
				break;
			}else{
				printf("recv:%s, %d bytesn", buffer, ret);
			}

			if(--nready == 0) break;
		   }
	   }
	}
}

poll

poll 跟 select 从设计层面基本没有区别,唯一的区别就是 select 使用 bitmap 来存储 fd 受限于设计字节数文件描述符集合最多有 1024 个,而 poll 使用动态数组存储事件状态和 fd ,所以在 fd 数量方面不受限制。

代码语言:c复制
void handle_next(int sockfd)
{
    //poll 使用
    struct pollfd fds[POLL_SIZE] = {0};  //文件描述符使用动态数组
	fds[0].fd = sockfd; //初始化fd
	fds[0].events = POLLIN; //fd状态

	int max_fd = 0, i = 0;
	for (i = 1;i < POLL_SIZE;i   ) {  //初始化fd
		fds[i].fd = -1;
	}

	while (1) {
		int nready = poll(fds, max_fd 1, 5); //类似select ,这里设计了阻塞 I/O 和非阻塞 I/O ,最后一个参数表示等待事件的时间 -1 表示一直阻塞等到有 I/O 产生,而 0 表示每次询问都回答,而 5 表示每 5 ms 进行返回。
		if (nready <= 0) continue;

		if ((fds[0].revents & POLLIN) == POLLIN) { //实际发生的事情是 POLLIN 可读事件
			
			struct sockaddr_in client_addr;
			memset(&client_addr, 0, sizeof(struct sockaddr_in));
			socklen_t client_len = sizeof(client_addr);
		
			int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);  //从操作系统读取数据
			if (clientfd <= 0) continue;

			char str[INET_ADDRSTRLEN] = {0};
			printf("recvived from %s at port %d, sockfd:%d, clientfd:%dn", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),
				ntohs(client_addr.sin_port), sockfd, clientfd);

			fds[clientfd].fd = clientfd;
			fds[clientfd].events = POLLIN;

			if (clientfd > max_fd) max_fd = clientfd;

			if (--nready == 0) continue;
		}

		for (i = sockfd   1;i <= max_fd;i   ) {
			if (fds[i].revents & (POLLIN|POLLERR)) {  //可读或错误事件
				char buffer[BUFFER_LENGTH] = {0};
				int ret = recv(i, buffer, BUFFER_LENGTH, 0); //接受数据
				if (ret < 0) {
					if (errno == EAGAIN || errno == EWOULDBLOCK) {
						printf("read all data");
					}
					
					//close(i);
					fds[i].fd = -1;
				} else if (ret == 0) {
					printf(" disconnect %dn", i);
					
					close(i);
					fds[i].fd = -1;
					break;
				} else {
					printf("Recv: %s, %d Bytesn", buffer, ret);
				}
				if (--nready == 0) break;
			}
		}
	}
}

epoll

最后到了大名鼎鼎的 epoll ,epoll 其实很简单,就是在操作系统内核中将事件对象维护在一个红黑树中,通过函数表现就是 epoll_ctl(),当有文件描述符发生事件,会从红黑树形成一个就绪队列,然后用户不用来回拷贝红黑树,就绪队列形成链表,返回用户,通过函数表现就是 epoll_wait()。

上代码:

代码语言:c复制
void handle_next(int sockfd)
{
    int epoll_fd = epoll_create(EPOLL_SIZE);          //用户端创建这样大小的epoll 红黑树
	struct epoll_event ev, events[EPOLL_SIZE] = {0}; //epoll 事件

	ev.events = EPOLLIN;  //可读事件
	ev.data.fd = sockfd;     //epoll 
	epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);  //将 sockfd 添加到 epoll_fd 这个红黑树中

	while (1) {

		int nready = epoll_wait(epoll_fd, events, EPOLL_SIZE, -1);  //查看就绪队列,返回就绪事件内容
		if (nready == -1) {
			printf("epoll_waitn");
			break;
		}

		int i = 0;
		for (i = 0;i < nready;i   ) {
			if (events[i].data.fd == sockfd) {
				
				struct sockaddr_in client_addr;
				memset(&client_addr, 0, sizeof(struct sockaddr_in));
				socklen_t client_len = sizeof(client_addr);
			
				int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);
				if (clientfd <= 0) continue;
	
				char str[INET_ADDRSTRLEN] = {0};
				printf("recvived from %s at port %d, sockfd:%d, clientfd:%dn", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),
					ntohs(client_addr.sin_port), sockfd, clientfd);

				ev.events = EPOLLIN | EPOLLET; //可读且是边缘触发  边缘触发就是有可读事件应用只会苏醒一次
				ev.data.fd = clientfd;
				epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientfd, &ev);  //添加 clientfd
			} else {
				int clientfd = events[i].data.fd;

				char buffer[BUFFER_LENGTH] = {0};
				int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0);//接受数据
				if (ret < 0) {
					if (errno == EAGAIN || errno == EWOULDBLOCK) {
						printf("read all data");
					}
					
					close(clientfd);
					
					ev.events = EPOLLIN | EPOLLET;
					ev.data.fd = clientfd;
					epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, &ev);  //接收后删除fd
				} else if (ret == 0) {
					printf(" disconnect %dn", clientfd);
					
					close(clientfd);

					ev.events = EPOLLIN | EPOLLET;
					ev.data.fd = clientfd;
					epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, &ev);
					
					break;
				} else {
					printf("Recv: %s, %d Bytesn", buffer, ret);
				}
				
			}
		}

	}

}

至此,I/O 多路复用就结束了,所谓多路复用就是将所有文件描述维护起来,当然这个过程主要是内核层面完成,用户层面是不是也可以进行维护呢?这就涉及大名鼎鼎的 reactor 反应堆模型了,fd 这么乱,我要管管了,这是程序员版的我来了,我看见,我征服!

1 人点赞