简述 I/O
I/O 是应用程序必然逃不掉的一个话题。大家在计算机基础学习中,学过计组,操作系统和计网,而想要把 I/O 研究深入肯定要将对这三个计算机基础方面有所深入。
我们谈及高性能的时候逃不掉的五个角度就是 cpu,内存,操作系统,网络带宽和应用程序。当然我们平时最大,当然几乎所有性能方面的分析都离不开这五个层面的分析,cpu 层面的 numa 架构,三级缓存,多核并发。内存层面的内存应用,垃圾回收,内存泄漏等等,当然,在大数据领域磁盘的 I/O 成为了一个很重要指标,但是本质上这不是软件设计领域问题,而是现实工程领域的问题。网络带宽较为简单,带宽大小,基本脱离软件领域设计硬件层面,而操作系统涉及 I/O 就必然离不开select,poll 和 epoll了。
从宏观角度,这三个其实都会设计阻塞,包括基于 epoll 设计的 reactor 模型也会涉及阻塞,当然 windows 有基于异步的 IOCP 模型。不过鉴于大多数应用在服务端还是基于 Linux ,所以不多讨论。
select
select 是通过将文件连接后所有的文件描述符放入一个集合中,当调用 select 函数会把所有文件描述符集合拷贝到内核,然后内核遍历整个集合,有事件发生时候,对这个 socket 进行标记,然后将集合拷贝到内存,这个文件描述符也拷贝到内存。
可以看到这个过程非常粗粝暴力,像极了我们做算法题的样子。
在开始代码之前,我觉得还是要不代码重复部分拿出来,调用系统 api 这样是有一定规范,也就是套路的,每次都写套路一部分多少会有重复。
代码语言:c复制#define bind_port 8888
#define BUFFER_LENGTH 1024
#define POLL_SIZE 1024
#define EPOLL_SIZE 1024
#define addr_ip 127.0.0.1
int create_sockfd(int port, const char* ip )
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd < 0){
perror("socketn");
return -1;
}
struct addr_in addr;
memset(&addr, 0 ,sizeof(struct addr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = ip;
if(bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0){
perror("bind");
return 2;
}
if(listen(sockfd, 5) < 0){
perror("listen");
return 3;
}
return sockfd;
}
上述是一个函数常规操作,socket 生成文件描述符,绑定 ip,端口,监听。
select 函数实现如下:
代码语言:c复制void handle_next(int sockfd){
fd_set rfds, rset; //声明文件描述符集合
FD_ZERO(&rfds); //初始化为 0
FD_SET(sockfd, &rfds); //将sockfd 添加进去
int max_fd = sockfd; //设置最大 sockfd
int i = 0;
while(1){
rset = rfds;
int nready = select(max_fd 1, &rset, NULL, NULL, NULL); //将文件集合拷贝到内核,然后内核返回发生事件文件描述符数量
if(nready < 0){ //没有事件发生
printf("select error: %dn", errno);
continue;
}
if(FD_ISSET(sockfd, &rset)){ //sockfd 在设置的文件描述符集合中
struct sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_len = sizeof(client_addr);
int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len); //接受数据
if(clientfd < 0) continue;//没接收到
char str[INET_ADDRSTRLEN] = {0};
printf("recvied from %s at port, sockfd: %d, clientfd:%d", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)), ntohs(client_addr.sin_port), sockfd, clientfd);
if(max_fd == FD_SETSIZE){ //文件集合数超出最大,select 有限制最大 1024 个,因为使用bitmap 进行标记
printf("clientfd --> out rangen");
break;
}
FD_SET(clientfd, & rfds); //将接收数据返回的fd加入文件描述符集合
if(clientfd > max_fd) max_fd = clientfd;
printf(" sockfd: %d, max_fd:%d, clientfd:%d", sockfd, max_fd clientfd);
if(--nready == 0) continue; //先接收数据所以先减后比较
}
for(i = sockfd 1; i <= max_fd; i ){ //没有在 rset 中说明fd 已经进行了 accept
if(FD_SET(i, &rset)){
char buffer[BUFFER_LENGTH] = {0};
int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0); //接受数据
if(ret < 0){
if(errno == EAGAIN || errno == EWOULDBLOCK){
printf("read all data");
}
FD_CLR(i, &rfds);
close(i);
}else if(ret == 0){
printf("disconnect %dn", i);
FD_CLR(i, &rfds);
close(i);
break;
}else{
printf("recv:%s, %d bytesn", buffer, ret);
}
if(--nready == 0) break;
}
}
}
}
poll
poll 跟 select 从设计层面基本没有区别,唯一的区别就是 select 使用 bitmap 来存储 fd 受限于设计字节数文件描述符集合最多有 1024 个,而 poll 使用动态数组存储事件状态和 fd ,所以在 fd 数量方面不受限制。
代码语言:c复制void handle_next(int sockfd)
{
//poll 使用
struct pollfd fds[POLL_SIZE] = {0}; //文件描述符使用动态数组
fds[0].fd = sockfd; //初始化fd
fds[0].events = POLLIN; //fd状态
int max_fd = 0, i = 0;
for (i = 1;i < POLL_SIZE;i ) { //初始化fd
fds[i].fd = -1;
}
while (1) {
int nready = poll(fds, max_fd 1, 5); //类似select ,这里设计了阻塞 I/O 和非阻塞 I/O ,最后一个参数表示等待事件的时间 -1 表示一直阻塞等到有 I/O 产生,而 0 表示每次询问都回答,而 5 表示每 5 ms 进行返回。
if (nready <= 0) continue;
if ((fds[0].revents & POLLIN) == POLLIN) { //实际发生的事情是 POLLIN 可读事件
struct sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_len = sizeof(client_addr);
int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len); //从操作系统读取数据
if (clientfd <= 0) continue;
char str[INET_ADDRSTRLEN] = {0};
printf("recvived from %s at port %d, sockfd:%d, clientfd:%dn", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),
ntohs(client_addr.sin_port), sockfd, clientfd);
fds[clientfd].fd = clientfd;
fds[clientfd].events = POLLIN;
if (clientfd > max_fd) max_fd = clientfd;
if (--nready == 0) continue;
}
for (i = sockfd 1;i <= max_fd;i ) {
if (fds[i].revents & (POLLIN|POLLERR)) { //可读或错误事件
char buffer[BUFFER_LENGTH] = {0};
int ret = recv(i, buffer, BUFFER_LENGTH, 0); //接受数据
if (ret < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
printf("read all data");
}
//close(i);
fds[i].fd = -1;
} else if (ret == 0) {
printf(" disconnect %dn", i);
close(i);
fds[i].fd = -1;
break;
} else {
printf("Recv: %s, %d Bytesn", buffer, ret);
}
if (--nready == 0) break;
}
}
}
}
epoll
最后到了大名鼎鼎的 epoll ,epoll 其实很简单,就是在操作系统内核中将事件对象维护在一个红黑树中,通过函数表现就是 epoll_ctl(),当有文件描述符发生事件,会从红黑树形成一个就绪队列,然后用户不用来回拷贝红黑树,就绪队列形成链表,返回用户,通过函数表现就是 epoll_wait()。
上代码:
代码语言:c复制void handle_next(int sockfd)
{
int epoll_fd = epoll_create(EPOLL_SIZE); //用户端创建这样大小的epoll 红黑树
struct epoll_event ev, events[EPOLL_SIZE] = {0}; //epoll 事件
ev.events = EPOLLIN; //可读事件
ev.data.fd = sockfd; //epoll
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev); //将 sockfd 添加到 epoll_fd 这个红黑树中
while (1) {
int nready = epoll_wait(epoll_fd, events, EPOLL_SIZE, -1); //查看就绪队列,返回就绪事件内容
if (nready == -1) {
printf("epoll_waitn");
break;
}
int i = 0;
for (i = 0;i < nready;i ) {
if (events[i].data.fd == sockfd) {
struct sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_len = sizeof(client_addr);
int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);
if (clientfd <= 0) continue;
char str[INET_ADDRSTRLEN] = {0};
printf("recvived from %s at port %d, sockfd:%d, clientfd:%dn", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),
ntohs(client_addr.sin_port), sockfd, clientfd);
ev.events = EPOLLIN | EPOLLET; //可读且是边缘触发 边缘触发就是有可读事件应用只会苏醒一次
ev.data.fd = clientfd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientfd, &ev); //添加 clientfd
} else {
int clientfd = events[i].data.fd;
char buffer[BUFFER_LENGTH] = {0};
int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0);//接受数据
if (ret < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
printf("read all data");
}
close(clientfd);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = clientfd;
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, &ev); //接收后删除fd
} else if (ret == 0) {
printf(" disconnect %dn", clientfd);
close(clientfd);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = clientfd;
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, &ev);
break;
} else {
printf("Recv: %s, %d Bytesn", buffer, ret);
}
}
}
}
}
至此,I/O 多路复用就结束了,所谓多路复用就是将所有文件描述维护起来,当然这个过程主要是内核层面完成,用户层面是不是也可以进行维护呢?这就涉及大名鼎鼎的 reactor 反应堆模型了,fd 这么乱,我要管管了,这是程序员版的我来了,我看见,我征服!