linux网络编程系列(十)--epoll的基本使用

2021-04-16 14:55:07 浏览数 (1)

1. 网络编程中的四种IO模型

  • 阻塞IO模型,默认socket都是阻塞的,就是IO操作都要等待操作完成以后才能返回;
  • 非阻塞IO模型,就是IO操作时不等待,立即返回,但需要不断的去询问内核,数据是否准备好了,如果准备好了,就主动调用函数去处理数据,使用fcntl设置socket为非阻塞;
  • 多路复用模型,就是事件驱动IO,也就是说检测到描述符上发生了事件,才去处理,典型的就是select和epoll;
  • 异步IO模型,就是发起IO操作后,立即返回去做其他的事,然后内核会等待数据准备完成后,将数据拷贝到用户内存中,并给用户进程发送一个信号,告知IO操作已完成;

2. epoll函数

2.1 epoll的两种工作模式

2.1.1 LT模式(又叫水平模式,类似于select/poll):

完全靠内核驱动,只要某个文件描述符有变化,就会一直通知我们的应用程序,直到处理完毕为止。

2.1.2 ET模式(又叫边缘触发模式,必须将socket设置为非阻塞):

此种情况下,当文件描述符有变化时,epoll只会通知应用程序一次,并将描述符从监视队列中清除,直到应用程序处理完该次变化,如果应用程序没有处理,那epoll就不会再次关注该文件描述符,这样其实可能会造成丢包的。这时应用程序需要自己维护一张fds的表格,把从epoll_wait得到的状态信息登记到这张表格,然后应用程序可以选择遍历这张表格,对处于忙碌状态的fds进行操作。

2.1.3 水平模式和边沿模式的选择

ET比LT对应用程序的要求更多,需要程序员设计的部分也更多,看上去LT好像要简单很多,但是当我们要求对fd有超时控制时,LT也同样需要对fds进行遍历,此时不如使用本来就要遍历的ET。而且由于epollwait每次返回的fds数量是有限的,在大并发的模式下,LT将非常的繁忙,所有的fds都要在它的队列中产生状态信息,而每次只有一部分fds能返回给应用程序。而ET只要epollwait返回一次fds之后,这些fds就会从队列中删除,只有当fd重新变为空闲状态时才重新加入到队列中,这就是说,随着epoll_wait的返回,队列中的fds是在减少的,这样在大并发的情况下,ET模式将会更加具有优势。

2.2 epoll函数原型

2.2.1 epoll_create
代码语言:javascript复制
  1. int epoll_create(int size); //创建一个epoll的句柄,size用来告诉内核要监听的数目

返回值:>0 返回创建成功的epoll句柄 -1 失败

2.2.2 epoll_ctl
代码语言:javascript复制
  1. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件注册函数, 注册要监听的事件类型:参数说明:

  • epfd epoll_create返回的句柄
  • op 表示动作,用3个宏表示:EPOLLCTLADD 注册新的fd到epfd中,EPOLLCTLMOD,修改已经注册的fd的监听事件,EPOLLCTLDEL,从epfd中删除一个fd。
  • fd 表示要监听的fd(一般就是socket函数生成的文件描述符)
  • event 告诉内核要监听什么事

struct epoll_event结构如下:

代码语言:javascript复制
  1. struct epoll_event {
  2. __uint32_t events; //多个宏的集合,表示对应文件描述符可读、可写、紧急可读等等
  3. epoll_data_t data; //一个联合体,详细介绍见下面
  4. };
  5. typedef union epoll_data
  6. {
  7. void *ptr;
  8. int fd;
  9. uint32_t u32;
  10. uint64_t u64;
  11. }epoll_data_t;
2.2.3 epoll_wait
代码语言:javascript复制
  1. int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);

参数说明如下:

  • events 根据events中事件,决定是调用accept回应一个连接,还是调用read或者write读写文件
  • maxevents 告诉内核这个events多大,并且不能大于epoll_create中的size

功能说明:等侍注册在epfd(epoll生成的文件描述符)上的socket fd的事件的发生,如果发生则将发生的sokct fd和事件类型放入到events数组中。

并且将注册在epfd上的socket fd的事件类型给清空,所以如果下一个循环你还要关注这个socket fd的话,则需要用epollctl(epfd,EPOLLCTLMOD,listenfd,&ev)来重新设置socket fd的事件类型。这时不用EPOLLCTLADD,因为socket fd并未清空,只是事件类型清空。这一步非常重要。当epollwait返回时根据返回值(大于0)调用accept。

2.3 epoll的实现

2.3.1 epoll函数调用过程

socket/bind/listen/epollcreate/epollctl/epoll_wait/accept/read/write/close

2.3.2 代码实现

首先对CTCP类做一下补充,将socket设置为非阻塞:

代码语言:javascript复制
  1. int CTcp::SetNoblock (int nSock)
  2. {
  3. assert (m_nSock != -1);
  4. int nFlags;
  5. if ( nSock == -1 )
  6. {
  7. nSock = m_nSock;
  8. }
  9. if ((nFlags = fcntl (nSock, F_GETFL, 0)) < 0)
  10. return 0;
  11. nFlags = nFlags | O_NONBLOCK;
  12. if (fcntl (nSock, F_SETFL, nFlags) < 0)
  13. return 0;
  14. return 1;
  15. }

然后基于CTCP类,实现CEpollServer类,代码如下:

代码语言:javascript复制
  1. //EpollServer.h
  2. #ifndef __EPOLL_SERVER_H__
  3. #define __EPOLL_SERVER_H__
  4. #include "SxTcp.h"
  5. //Tcp类
  6. class CEpollServer
  7. {
  8. //构造函数
  9. public:
  10. CEpollServer ();
  11. virtual ~CEpollServer ();
  12. //公有成员函数
  13. public:
  14. int CreateEpoll(const char* szIp, int nPort, int nSize);
  15. int ProcessEpoll();
  16. int CloseEpoll();
  17. //私有成员变量
  18. private:
  19. CTcp m_cTcp;
  20. int m_nEpollFd;
  21. };
  22. #endif
代码语言:javascript复制
  1. //EpollServer.cpp
  2. #include "EpollServer.h"
  3. #include <sys/epoll.h>
  4. #include "TypeError.h"
  5. #include <assert.h>
  6. #include <string.h>
  7. #include <unistd.h>
  8. #include <errno.h>
  9. #include <sys/types.h>
  10. #include <sys/socket.h>
  11. #include <netinet/in.h>
  12. #include <stdio.h>
  13. CEpollServer::CEpollServer ()
  14. {
  15. m_nEpollFd = -1;
  16. }
  17. CEpollServer::~CEpollServer ()
  18. {
  19. CloseEpoll();
  20. m_cTcp.Close();
  21. }
  22. /*创建epoll句柄
  23. 入参:
  24. szIp 服务器ip地址
  25. nPort 要绑定的端口
  26. nSize 要监听的文件描述符数量
  27. 出参:1: 成功 ; 0: 失败
  28. */
  29. int CEpollServer::CreateEpoll(const char* szIp, int nPort, int nSize)
  30. {
  31. assert(szIp != nullptr);
  32. int iRet = 0;
  33. int size = (nSize > 0 ? nSize : DEFAULT_EPOLL_FD_NUM);
  34. iRet = m_cTcp.Open();
  35. if ( iRet == 0 )
  36. {
  37. return SOCKET_ERROR;
  38. }
  39. iRet = m_cTcp.Bind(szIp, nPort);
  40. if ( iRet == 0 )
  41. {
  42. return BIND_ERROR;
  43. }
  44. iRet = m_cTcp.SetNoblock();
  45. if ( iRet == 0 )
  46. {
  47. return SETSOCKOPT_ERROR;
  48. }
  49. iRet = m_cTcp.Listen(nSize 1);//监听描述符数量要比epoll的多?
  50. if ( iRet == 0)
  51. {
  52. return LISTEN_ERROR;
  53. }
  54. if ( m_nEpollFd != -1 )
  55. {
  56. CloseEpoll();
  57. }
  58. m_nEpollFd = epoll_create(size);
  59. if ( m_nEpollFd == -1)
  60. {
  61. return EPOLL_CREATE_ERROR;
  62. }
  63. return 1;
  64. }
  65. /*处理epoll事件
  66. 出参:1: 成功 ; 0: 失败
  67. */
  68. int CEpollServer::ProcessEpoll()
  69. {
  70. assert(m_nEpollFd != -1);
  71. int nFds = 0;
  72. int connFd = -1, readFd = -1, writeFd = -1;
  73. int n = 0, nSize = 0;
  74. int nListenFd = -1;
  75. char buf[MAX_READ_SIZE] = {0};
  76. struct sockaddr_in clientAddr;
  77. socklen_t clilen;
  78. struct epoll_event ev, events[20];
  79. memset((void*)&ev, 0, sizeof(ev));
  80. nListenFd = m_cTcp.GetHandle();
  81. ev.data.fd = nListenFd;
  82. ev.events = EPOLLIN|EPOLLET;
  83. if ( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, nListenFd, &ev) == -1 )
  84. {
  85. return EPOLL_CTL_ERROR;
  86. }
  87. while(1)
  88. {
  89. n = 0;
  90. nSize = 0;
  91. nFds = epoll_wait(m_nEpollFd, events, 20, 500);
  92. for (int i = 0; i< nFds; i)
  93. {
  94. memset(buf, 0, MAX_READ_SIZE);
  95. if (events[i].data.fd == nListenFd )
  96. {
  97. while ( (connFd = accept(nListenFd, (sockaddr*)&clientAddr, &clilen)) > 0 )
  98. {
  99. m_cTcp.SetNoblock(connFd); //ET模式需设置为非阻塞的
  100. ev.data.fd = connFd;
  101. ev.events = EPOLLIN|EPOLLET;
  102. if ( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, connFd, &ev) == -1 )
  103. {
  104. return EPOLL_CTL_ERROR;
  105. }
  106. }
  107. if ( connFd == -1 && errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR )
  108. {
  109. return ACCEPT_ERROR;
  110. }
  111. continue;
  112. }
  113. else if(events[i].events & EPOLLIN)
  114. {
  115. readFd = events[i].data.fd;
  116. if (readFd < 0)
  117. {
  118. continue;
  119. }
  120. //读取数据
  121. while ( (nSize = read(readFd, buf n, MAX_READ_SIZE - 1)) > 0 )
  122. {
  123. n = nSize;
  124. }
  125. //EAGAIN说明读到结尾了
  126. if (nSize == -1 && errno != EAGAIN )
  127. {
  128. fprintf(stderr, "epoll read failedn");
  129. //ngleLog::WriteLog(ERROR, "%s", "epoll read fialed");
  130. }
  131. fprintf(stdout, "read data is:%sn", buf);
  132. ev.data.fd = readFd;
  133. ev.events = EPOLLOUT|EPOLLET;//边沿模式(ET)
  134. epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, readFd, &ev);
  135. }
  136. else if(events[i].events & EPOLLOUT)
  137. {
  138. writeFd = events[i].data.fd;
  139. //写数据
  140. strncpy(buf, "hello client", sizeof(buf)-1);
  141. int dataSize = strlen(buf);
  142. n = dataSize;
  143. while(n > 0)
  144. {
  145. nSize = write(writeFd, buf dataSize - n, n);
  146. if (nSize < n)
  147. {
  148. if (nSize == -1 && errno != EAGAIN)
  149. {
  150. break;
  151. }
  152. }
  153. n -= nSize;
  154. }
  155. ev.data.fd = writeFd;
  156. ev.events = EPOLLIN|EPOLLET;
  157. epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, writeFd, &ev);
  158. }
  159. }
  160. }
  161. }
  162. /*
  163. 关闭epoll文件描述符
  164. */
  165. int CEpollServer::CloseEpoll()
  166. {
  167. if (m_nEpollFd != -1)
  168. {
  169. close (m_nEpollFd);
  170. m_nEpollFd = -1;
  171. }
  172. return 1;
  173. }

将上面CEpollServer类和TCP类结合起来编译成一个动态库,makefile如下:

代码语言:javascript复制
  1. LIB_DIR=./lib
  2. src=$(wildcard *.cpp)
  3. obj=(patsubst %.cpp,%.o,(src))
  4. PIC=-fPIC
  5. LIBSO=-shared
  6. #CC=g -gdwarf-2 -gstrict-dwarf
  7. CC=g -g
  8. %.o:%.cpp
  9. (CC) -c <
  10. network:$(obj)
  11. (CC) -o libnetwork.so ^
  12. cp -f libnetwork.so ../test/lib
  13. clean:
  14. rm -f *.o *.so

然后实现TestEpollServer.cpp,如下:

注意:下面ConfigIni和SingleLog都是我本人测试时候写的库,如需使用下面代码,需要修改!

代码语言:javascript复制
  1. #include "../../readini/ConfigIni.h"
  2. #include <string>
  3. #include "../../network/EpollServer.h"
  4. #include "../../log/SingleLog.h"
  5. CEpollServer g_clEpollServer;
  6. #define FILEDIR "./socket.ini"
  7. //epoll server
  8. int epoll_server_init()
  9. {
  10. int iRet = -1;
  11. string strIp;
  12. int nPort = 0, nEpollNum = 0, nTimeout = 0;
  13. ConfigIni::Init(string(FILEDIR));
  14. strIp = ConfigIni::ReadStr(string("SERVER"), string("Addr"));
  15. if (strIp == "")
  16. {
  17. SingleLog::WriteLog(ERROR,"read server addr failed");
  18. return iRet;
  19. }
  20. nPort = ConfigIni::ReadInt(string("SERVER"), string("Port"));
  21. if ( nPort == -1 )
  22. {
  23. SingleLog::WriteLog(ERROR,"read server port failed");
  24. return iRet;
  25. }
  26. nEpollNum = ConfigIni::ReadInt(string("SERVER"), string("MaxEpollNum"));
  27. if ( nEpollNum == -1 )
  28. {
  29. SingleLog::WriteLog(ERROR,"read server epoll num failed");
  30. return iRet;
  31. }
  32. nTimeout = ConfigIni::ReadInt(string("SERVER"), string("Timeout"));
  33. if ( nTimeout == -1 )
  34. {
  35. SingleLog::WriteLog(ERROR,"read server timeout failed");
  36. return iRet;
  37. }
  38. iRet = g_clEpollServer.CreateEpoll(strIp.c_str(), nPort, nEpollNum);
  39. if ( iRet == 0 )
  40. {
  41. SingleLog::WriteLog(ERROR, "epoll create failed");
  42. return -1;
  43. }
  44. return 0;
  45. }
  46. void epoll_server_run()
  47. {
  48. g_clEpollServer.ProcessEpoll();
  49. }
  50. int main()
  51. {
  52. SingleLog::Init();
  53. if (epoll_server_init() == -1)
  54. {
  55. return -1;
  56. }
  57. epoll_server_run();
  58. return 0;
  59. }
代码语言:javascript复制
  1. //TestClient.cpp
  2. #include <stdio.h>
  3. #include <iostream>
  4. #include <string.h>
  5. #include "../../network/SxTcp.h"
  6. using namespace std;
  7. int main()
  8. {
  9. CTcp tcp;
  10. int iRet = 0;
  11. int iFd = 0;
  12. char buf[128] = {0};
  13. iRet = tcp.Open();
  14. if (iRet == 0)
  15. {
  16. perror("socket create failed");
  17. return -1;
  18. }
  19. iRet = tcp.Connect("192.168.233.250", 6666);
  20. if (iRet == 0)
  21. {
  22. perror("socket connect failed");
  23. return -1;
  24. }
  25. while(1)
  26. {
  27. memset(buf, 0, sizeof(buf));
  28. cout << "please input some string:";
  29. cin >> buf;
  30. iRet = tcp.Send(buf, strlen(buf));
  31. if (iRet < -1 && errno != EAGAIN)
  32. {
  33. perror("send failed");
  34. return -1;
  35. }
  36. else if(iRet == 0)
  37. {
  38. perror("connect is closed");
  39. return -1;
  40. }
  41. memset(buf, 0, sizeof(buf));
  42. iRet = tcp.Recv(buf, sizeof(buf));
  43. if (iRet < 0 && errno != EAGAIN)
  44. {
  45. perror("recv failed");
  46. return -1;
  47. }
  48. else if(iRet == 0)
  49. {
  50. perror("socket not connect");
  51. return -1;
  52. }
  53. fprintf(stdout, "recv data is:%sn", buf);
  54. }
  55. return 0;
  56. }

分别编译TestEpollServer.cpp和TestClient.cpp,生成服务端和客户端应用程序,即可实现通信。

更多c 及python系列文章,请关注我的公众号:晟夏的叶。

0 人点赞