linux网络编程系列(九)--epoll的基本使用

2021-04-16 14:50:08 浏览数 (1)

1. 网络编程中的四种IO模型

  • 阻塞IO模型,默认socket都是阻塞的,就是IO操作都要等待操作完成以后才能返回;
  • 非阻塞IO模型,就是IO操作时不等待,立即返回,但需要不断的去询问内核,数据是否准备好了,如果准备好了,就主动调用函数去处理数据,使用fcntl设置socket为非阻塞;
  • 多路复用模型,就是事件驱动IO,也就是说检测到描述符上发生了事件,才去处理,典型的就是select和epoll;
  • 异步IO模型,就是发起IO操作后,立即返回去做其他的事,然后内核会等待数据准备完成后,将数据拷贝到用户内存中,并给用户进程发送一个信号,告知IO操作已完成;

2. epoll函数

2.1 epoll的两种工作模式

2.1.1 LT模式(又叫水平模式,类似于select/poll):

完全靠内核驱动,只要某个文件描述符有变化,就会一直通知我们的应用程序,直到处理完毕为止。

2.1.2 ET模式(又叫边缘触发模式,必须将socket设置为非阻塞):

此种情况下,当文件描述符有变化时,epoll只会通知应用程序一次,并将描述符从监视队列中清除,直到应用程序处理完该次变化,如果应用程序没有处理,那epoll就不会再次关注该文件描述符,这样其实可能会造成丢包的。 这时应用程序需要自己维护一张fds的表格,把从epoll_wait得到的状态信息登记到这张表格,然后应用程序可以选择遍历这张表格,对处于忙碌状态的fds进行操作。

2.1.3 水平模式和边沿模式的选择

ET比LT对应用程序的要求更多,需要程序员设计的部分也更多,看上去LT好像要简单很多,但是当我们要求对fd有超时控制时,LT也同样需要对fds进行遍历,此时不如使用本来就要遍历的ET。 而且由于epollwait每次返回的fds数量是有限的,在大并发的模式下,LT将非常的繁忙,所有的fds都要在它的队列中产生状态信息,而每次只有一部分fds能返回给应用程序。 而ET只要epollwait返回一次fds之后,这些fds就会从队列中删除,只有当fd重新变为空闲状态时才重新加入到队列中,这就是说,随着epoll_wait的返回,队列中的fds是在减少的,这样在大并发的情况下,ET模式将会更加具有优势。

2.2 epoll函数原型

2.2.1 epoll_create
代码语言:javascript复制
  1. int epoll_create(int size); //创建一个epoll的句柄,size用来告诉内核要监听的数目

返回值: >0 返回创建成功的epoll句柄 -1 失败

2.2.2 epoll_ctl
代码语言:javascript复制
  1. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件注册函数, 注册要监听的事件类型: 参数说明:

  • epfd epoll_create返回的句柄
  • op 表示动作,用3个宏表示:EPOLLCTLADD 注册新的fd到epfd中,EPOLLCTLMOD,修改已经注册的fd的监听事件,EPOLLCTLDEL,从epfd中删除一个fd。
  • fd 表示要监听的fd(一般就是socket函数生成的文件描述符)
  • event 告诉内核要监听什么事

struct epoll_event结构如下:

代码语言:javascript复制
  1. struct epoll_event {
  2. __uint32_t events; //多个宏的集合,表示对应文件描述符可读、可写、紧急可读等等
  3. epoll_data_t data; //一个联合体,详细介绍见下面
  4. };
  5. typedef union epoll_data
  6. {
  7. void *ptr;
  8. int fd;
  9. uint32_t u32;
  10. uint64_t u64;
  11. }epoll_data_t;
2.2.3 epoll_wait
代码语言:javascript复制
  1. int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);

参数说明如下:

  • events 根据events中事件,决定是调用accept回应一个连接,还是调用read或者write读写文件
  • maxevents 告诉内核这个events多大,并且不能大于epoll_create中的size

功能说明: 等侍注册在epfd(epoll生成的文件描述符)上的socket fd的事件的发生,如果发生则将发生的sokct fd和事件类型放入到events数组中。 并 且将注册在epfd上的socket fd的事件类型给清空,所以如果下一个循环你还要关注这个socket fd的话,则需要用epollctl(epfd,EPOLLCTLMOD,listenfd,&ev)来重新设置socket fd的事件类型。这时不用EPOLLCTLADD,因为socket fd并未清空,只是事件类型清空。这一步非常重要。当epollwait返回时根据返回值(大于0)调用accept。

2.3 epoll的实现

2.3.1 epoll函数调用过程

socket/bind/listen/epollcreate/epollctl/epoll_wait/accept/read/write/close

2.3.2 代码实现

首先对CTCP类做一下补充,将socket设置为非阻塞:

代码语言:javascript复制
  1. int CTcp::SetNoblock (int nSock)
  2. {
  3. assert (m_nSock != -1);
  4. int nFlags;
  5. if ( nSock == -1 )
  6. {
  7. nSock = m_nSock;
  8. }
  9. if ((nFlags = fcntl (nSock, F_GETFL, 0)) < 0)
  10. return 0;
  11. nFlags = nFlags | O_NONBLOCK;
  12. if (fcntl (nSock, F_SETFL, nFlags) < 0)
  13. return 0;
  14. return 1;
  15. }

然后基于CTCP类,实现CEpollServer类,代码如下:

代码语言:javascript复制
  1. //EpollServer.h
  2. #ifndef __EPOLL_SERVER_H__
  3. #define __EPOLL_SERVER_H__
  4. #include "SxTcp.h"
  5. //Tcp类
  6. class CEpollServer
  7. {
  8. //构造函数
  9. public:
  10. CEpollServer ();
  11. virtual ~CEpollServer ();
  12. //公有成员函数
  13. public:
  14. int CreateEpoll(const char* szIp, int nPort, int nSize);
  15. int ProcessEpoll();
  16. int CloseEpoll();
  17. //私有成员变量
  18. private:
  19. CTcp m_cTcp;
  20. int m_nEpollFd;
  21. };
  22. #endif
代码语言:javascript复制
  1. #include "EpollServer.h"
  2. #include <sys/epoll.h>
  3. #include "TypeError.h"
  4. #include <assert.h>
  5. #include <string.h>
  6. #include <unistd.h>
  7. #include <errno.h>
  8. #include <sys/types.h>
  9. #include <sys/socket.h>
  10. #include <netinet/in.h>
  11. #include <stdio.h>
  12. CEpollServer::CEpollServer ()
  13. {
  14. m_nEpollFd = -1;
  15. }
  16. CEpollServer::~CEpollServer ()
  17. {
  18. CloseEpoll();
  19. m_cTcp.Close();
  20. }
  21. /*创建epoll句柄
  22. 入参:
  23. szIp 服务器ip地址
  24. nPort 要绑定的端口
  25. nSize 要监听的文件描述符数量
  26. 出参:1: 成功 ; 0: 失败
  27. */
  28. int CEpollServer::CreateEpoll(const char* szIp, int nPort, int nSize)
  29. {
  30. assert(szIp != nullptr);
  31. int iRet = 0;
  32. int size = (nSize > 0 ? nSize : DEFAULT_EPOLL_FD_NUM);
  33. iRet = m_cTcp.Open();
  34. if ( iRet == 0 )
  35. {
  36. return SOCKET_ERROR;
  37. }
  38. iRet = m_cTcp.Bind(szIp, nPort);
  39. if ( iRet == 0 )
  40. {
  41. return BIND_ERROR;
  42. }
  43. iRet = m_cTcp.SetNoblock();
  44. if ( iRet == 0 )
  45. {
  46. return SETSOCKOPT_ERROR;
  47. }
  48. iRet = m_cTcp.Listen(nSize 1);//监听描述符数量要比epoll的多?
  49. if ( iRet == 0)
  50. {
  51. return LISTEN_ERROR;
  52. }
  53. if ( m_nEpollFd != -1 )
  54. {
  55. CloseEpoll();
  56. }
  57. m_nEpollFd = epoll_create(size);
  58. if ( m_nEpollFd == -1)
  59. {
  60. return EPOLL_CREATE_ERROR;
  61. }
  62. return 1;
  63. }
  64. /*处理epoll事件
  65. 出参:1: 成功 ; 0: 失败
  66. */
  67. int CEpollServer::ProcessEpoll()
  68. {
  69. assert(m_nEpollFd != -1);
  70. int nFds = 0;
  71. int connFd = -1, readFd = -1, writeFd = -1;
  72. int n = 0, nSize = 0;
  73. int nListenFd = -1;
  74. char buf[MAX_READ_SIZE] = {0};
  75. struct sockaddr_in clientAddr;
  76. socklen_t clilen;
  77. struct epoll_event ev, events[20];
  78. memset((void*)&ev, 0, sizeof(ev));
  79. nListenFd = m_cTcp.GetHandle();
  80. ev.data.fd = nListenFd;
  81. ev.events = EPOLLIN|EPOLLET;
  82. if ( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, nListenFd, &ev) == -1 )
  83. {
  84. return EPOLL_CTL_ERROR;
  85. }
  86. while(1)
  87. {
  88. n = 0;
  89. nSize = 0;
  90. nFds = epoll_wait(m_nEpollFd, events, 20, 500);
  91. for (int i = 0; i< nFds; i)
  92. {
  93. memset(buf, 0, MAX_READ_SIZE);
  94. if (events[i].data.fd == nListenFd )
  95. {
  96. while ( (connFd = accept(nListenFd, (sockaddr*)&clientAddr, &clilen)) > 0 )
  97. {
  98. m_cTcp.SetNoblock(connFd); //ET模式需设置为非阻塞的
  99. ev.data.fd = connFd;
  100. ev.events = EPOLLIN|EPOLLET;
  101. if ( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, connFd, &ev) == -1 )
  102. {
  103. return EPOLL_CTL_ERROR;
  104. }
  105. }
  106. if ( connFd == -1 && errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR )
  107. {
  108. return ACCEPT_ERROR;
  109. }
  110. continue;
  111. }
  112. else if(events[i].events & EPOLLIN)
  113. {
  114. readFd = events[i].data.fd;
  115. if (readFd < 0)
  116. {
  117. continue;
  118. }
  119. //读取数据
  120. while ( (nSize = read(readFd, buf n, MAX_READ_SIZE - 1)) > 0 )
  121. {
  122. n = nSize;
  123. }
  124. //EAGAIN说明读到结尾了
  125. if (nSize == -1 && errno != EAGAIN )
  126. {
  127. fprintf(stderr, "epoll read failedn");
  128. //ngleLog::WriteLog(ERROR, "%s", "epoll read fialed");
  129. }
  130. fprintf(stdout, "read data is:%sn", buf);
  131. ev.data.fd = readFd;
  132. ev.events = EPOLLOUT|EPOLLET;//边沿模式(ET)
  133. epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, readFd, &ev);
  134. }
  135. else if(events[i].events & EPOLLOUT)
  136. {
  137. writeFd = events[i].data.fd;
  138. //写数据
  139. strncpy(buf, "hello client", sizeof(buf)-1);
  140. int dataSize = strlen(buf);
  141. n = dataSize;
  142. while(n > 0)
  143. {
  144. nSize = write(writeFd, buf dataSize - n, n);
  145. if (nSize < n)
  146. {
  147. if (nSize == -1 && errno != EAGAIN)
  148. {
  149. break;
  150. }
  151. }
  152. n -= nSize;
  153. }
  154. ev.data.fd = writeFd;
  155. ev.events = EPOLLIN|EPOLLET;
  156. epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, writeFd, &ev);
  157. }
  158. }
  159. }
  160. }
  161. /*
  162. 关闭epoll文件描述符
  163. */
  164. int CEpollServer::CloseEpoll()
  165. {
  166. if (m_nEpollFd != -1)
  167. {
  168. close (m_nEpollFd);
  169. m_nEpollFd = -1;
  170. }
  171. return 1;
  172. }

将上面CEpollServer类和TCP类结合起来编译成一个动态库,makefile如下:

代码语言:javascript复制
  1. LIB_DIR=./lib
  2. src=$(wildcard *.cpp)
  3. obj=(patsubst %.cpp,%.o,(src))
  4. PIC=-fPIC
  5. LIBSO=-shared
  6. #CC=g -gdwarf-2 -gstrict-dwarf
  7. CC=g -g
  8. %.o:%.cpp
  9. (CC) -c <
  10. network:$(obj)
  11. (CC) -o libnetwork.so ^
  12. cp -f libnetwork.so ../test/lib
  13. clean:
  14. rm -f *.o *.so

然后实现TestEpollServer.cpp,如下: 注意:下面ConfigIni和SingleLog都是我本人测试时候写的库,如需使用下面代码,需要修改!

代码语言:javascript复制
  1. #include "../../readini/ConfigIni.h"
  2. #include <string>
  3. #include "../../network/EpollServer.h"
  4. #include "../../log/SingleLog.h"
  5. CEpollServer g_clEpollServer;
  6. #define FILEDIR "./socket.ini"
  7. //epoll server
  8. int epoll_server_init()
  9. {
  10. int iRet = -1;
  11. string strIp;
  12. int nPort = 0, nEpollNum = 0, nTimeout = 0;
  13. ConfigIni::Init(string(FILEDIR));
  14. strIp = ConfigIni::ReadStr(string("SERVER"), string("Addr"));
  15. if (strIp == "")
  16. {
  17. SingleLog::WriteLog(ERROR,"read server addr failed");
  18. return iRet;
  19. }
  20. nPort = ConfigIni::ReadInt(string("SERVER"), string("Port"));
  21. if ( nPort == -1 )
  22. {
  23. SingleLog::WriteLog(ERROR,"read server port failed");
  24. return iRet;
  25. }
  26. nEpollNum = ConfigIni::ReadInt(string("SERVER"), string("MaxEpollNum"));
  27. if ( nEpollNum == -1 )
  28. {
  29. SingleLog::WriteLog(ERROR,"read server epoll num failed");
  30. return iRet;
  31. }
  32. nTimeout = ConfigIni::ReadInt(string("SERVER"), string("Timeout"));
  33. if ( nTimeout == -1 )
  34. {
  35. SingleLog::WriteLog(ERROR,"read server timeout failed");
  36. return iRet;
  37. }
  38. iRet = g_clEpollServer.CreateEpoll(strIp.c_str(), nPort, nEpollNum);
  39. if ( iRet == 0 )
  40. {
  41. SingleLog::WriteLog(ERROR, "epoll create failed");
  42. return -1;
  43. }
  44. return 0;
  45. }
  46. void epoll_server_run()
  47. {
  48. g_clEpollServer.ProcessEpoll();
  49. }
  50. int main()
  51. {
  52. SingleLog::Init();
  53. if (epoll_server_init() == -1)
  54. {
  55. return -1;
  56. }
  57. epoll_server_run();
  58. return 0;
  59. }
代码语言:javascript复制
  1. //TestClient.cpp
  2. #include <stdio.h>
  3. #include <iostream>
  4. #include <string.h>
  5. #include "../../network/SxTcp.h"
  6. using namespace std;
  7. int main()
  8. {
  9. CTcp tcp;
  10. int iRet = 0;
  11. int iFd = 0;
  12. char buf[128] = {0};
  13. iRet = tcp.Open();
  14. if (iRet == 0)
  15. {
  16. perror("socket create failed");
  17. return -1;
  18. }
  19. iRet = tcp.Connect("192.168.233.250", 6666);
  20. if (iRet == 0)
  21. {
  22. perror("socket connect failed");
  23. return -1;
  24. }
  25. while(1)
  26. {
  27. memset(buf, 0, sizeof(buf));
  28. cout << "please input some string:";
  29. cin >> buf;
  30. iRet = tcp.Send(buf, strlen(buf));
  31. if (iRet < -1 && errno != EAGAIN)
  32. {
  33. perror("send failed");
  34. return -1;
  35. }
  36. else if(iRet == 0)
  37. {
  38. perror("connect is closed");
  39. return -1;
  40. }
  41. memset(buf, 0, sizeof(buf));
  42. iRet = tcp.Recv(buf, sizeof(buf));
  43. if (iRet < 0 && errno != EAGAIN)
  44. {
  45. perror("recv failed");
  46. return -1;
  47. }
  48. else if(iRet == 0)
  49. {
  50. perror("socket not connect");
  51. return -1;
  52. }
  53. fprintf(stdout, "recv data is:%sn", buf);
  54. }
  55. return 0;
  56. }

分别编译TestEpollServer.cpp和TestClient.cpp,生成服务端和客户端应用程序,即可实现通信。

0 人点赞