1. 网络编程中的四种IO模型
- 阻塞IO模型,默认socket都是阻塞的,就是IO操作都要等待操作完成以后才能返回;
- 非阻塞IO模型,就是IO操作时不等待,立即返回,但需要不断的去询问内核,数据是否准备好了,如果准备好了,就主动调用函数去处理数据,使用fcntl设置socket为非阻塞;
- 多路复用模型,就是事件驱动IO,也就是说检测到描述符上发生了事件,才去处理,典型的就是select和epoll;
- 异步IO模型,就是发起IO操作后,立即返回去做其他的事,然后内核会等待数据准备完成后,将数据拷贝到用户内存中,并给用户进程发送一个信号,告知IO操作已完成;
2. epoll函数
2.1 epoll的两种工作模式
2.1.1 LT模式(又叫水平模式,类似于select/poll):
完全靠内核驱动,只要某个文件描述符有变化,就会一直通知我们的应用程序,直到处理完毕为止。
2.1.2 ET模式(又叫边缘触发模式,必须将socket设置为非阻塞):
此种情况下,当文件描述符有变化时,epoll只会通知应用程序一次,并将描述符从监视队列中清除,直到应用程序处理完该次变化,如果应用程序没有处理,那epoll就不会再次关注该文件描述符,这样其实可能会造成丢包的。 这时应用程序需要自己维护一张fds的表格,把从epoll_wait得到的状态信息登记到这张表格,然后应用程序可以选择遍历这张表格,对处于忙碌状态的fds进行操作。
2.1.3 水平模式和边沿模式的选择
ET比LT对应用程序的要求更多,需要程序员设计的部分也更多,看上去LT好像要简单很多,但是当我们要求对fd有超时控制时,LT也同样需要对fds进行遍历,此时不如使用本来就要遍历的ET。 而且由于epollwait每次返回的fds数量是有限的,在大并发的模式下,LT将非常的繁忙,所有的fds都要在它的队列中产生状态信息,而每次只有一部分fds能返回给应用程序。 而ET只要epollwait返回一次fds之后,这些fds就会从队列中删除,只有当fd重新变为空闲状态时才重新加入到队列中,这就是说,随着epoll_wait的返回,队列中的fds是在减少的,这样在大并发的情况下,ET模式将会更加具有优势。
2.2 epoll函数原型
2.2.1 epoll_create
代码语言:javascript复制
int epoll_create(int size);
//创建一个epoll的句柄,size用来告诉内核要监听的数目
返回值: >0 返回创建成功的epoll句柄 -1 失败
2.2.2 epoll_ctl
代码语言:javascript复制
int epoll_ctl(int epfd,
int op,
int fd,
struct epoll_event *event);
epoll的事件注册函数, 注册要监听的事件类型: 参数说明:
- epfd epoll_create返回的句柄
- op 表示动作,用3个宏表示:EPOLLCTLADD 注册新的fd到epfd中,EPOLLCTLMOD,修改已经注册的fd的监听事件,EPOLLCTLDEL,从epfd中删除一个fd。
- fd 表示要监听的fd(一般就是socket函数生成的文件描述符)
- event 告诉内核要监听什么事
struct epoll_event结构如下:
代码语言:javascript复制
struct epoll_event {
-
__uint32_t events;
//多个宏的集合,表示对应文件描述符可读、可写、紧急可读等等
-
epoll_data_t data;
//一个联合体,详细介绍见下面
};
typedef
union epoll_data
{
-
void
*ptr;
-
int fd;
-
uint32_t u32;
-
uint64_t u64;
}epoll_data_t;
2.2.3 epoll_wait
代码语言:javascript复制
int epoll_wait(int epfd,
struct epoll_event* events,
int maxevents,
int timeout);
参数说明如下:
- events 根据events中事件,决定是调用accept回应一个连接,还是调用read或者write读写文件
- maxevents 告诉内核这个events多大,并且不能大于epoll_create中的size
功能说明: 等侍注册在epfd(epoll生成的文件描述符)上的socket fd的事件的发生,如果发生则将发生的sokct fd和事件类型放入到events数组中。 并 且将注册在epfd上的socket fd的事件类型给清空,所以如果下一个循环你还要关注这个socket fd的话,则需要用epollctl(epfd,EPOLLCTLMOD,listenfd,&ev)来重新设置socket fd的事件类型。这时不用EPOLLCTLADD,因为socket fd并未清空,只是事件类型清空。这一步非常重要。当epollwait返回时根据返回值(大于0)调用accept。
2.3 epoll的实现
2.3.1 epoll函数调用过程
socket/bind/listen/epollcreate/epollctl/epoll_wait/accept/read/write/close
2.3.2 代码实现
首先对CTCP类做一下补充,将socket设置为非阻塞:
代码语言:javascript复制
int
CTcp::SetNoblock
(int nSock)
{
assert (m_nSock !=
-1);
-
int nFlags;
-
if
( nSock ==
-1
)
-
{
nSock = m_nSock;
-
}
-
if
((nFlags = fcntl (nSock, F_GETFL,
0))
<
0)
-
return
0;
nFlags = nFlags | O_NONBLOCK;
-
if
(fcntl (nSock, F_SETFL, nFlags)
<
0)
-
return
0;
-
return
1;
}
然后基于CTCP类,实现CEpollServer类,代码如下:
代码语言:javascript复制
//EpollServer.h
#ifndef __EPOLL_SERVER_H__
#define __EPOLL_SERVER_H__
#include
"SxTcp.h"
//Tcp类
class
CEpollServer
{
//构造函数
public:
-
CEpollServer
();
-
virtual
~CEpollServer
();
//公有成员函数
public:
-
int
CreateEpoll(const
char* szIp,
int nPort,
int nSize);
-
int
ProcessEpoll();
-
int
CloseEpoll();
//私有成员变量
private:
-
CTcp m_cTcp;
-
int m_nEpollFd;
};
#endif
#include
"EpollServer.h"
#include
<sys/epoll.h>
#include
"TypeError.h"
#include
<assert.h>
#include
<string.h>
#include
<unistd.h>
#include
<errno.h>
#include
<sys/types.h>
#include
<sys/socket.h>
#include
<netinet/in.h>
#include
<stdio.h>
CEpollServer::CEpollServer
()
{
m_nEpollFd =
-1;
}
CEpollServer::~CEpollServer
()
{
-
CloseEpoll();
m_cTcp.Close();
}
/*创建epoll句柄
入参:
szIp 服务器ip地址
nPort 要绑定的端口
nSize 要监听的文件描述符数量
出参:1: 成功 ; 0: 失败
*/
int
CEpollServer::CreateEpoll(const
char* szIp,
int nPort,
int nSize)
{
assert(szIp !=
nullptr);
-
int iRet =
0;
-
int size =
(nSize >
0
? nSize : DEFAULT_EPOLL_FD_NUM);
iRet = m_cTcp.Open();
-
if
( iRet ==
0
)
-
{
-
return SOCKET_ERROR;
-
}
iRet = m_cTcp.Bind(szIp, nPort);
-
if
( iRet ==
0
)
-
{
-
return BIND_ERROR;
-
}
iRet = m_cTcp.SetNoblock();
-
if
( iRet ==
0
)
-
{
-
return SETSOCKOPT_ERROR;
-
}
iRet = m_cTcp.Listen(nSize 1);//监听描述符数量要比epoll的多?
-
if
( iRet ==
0)
-
{
-
return LISTEN_ERROR;
-
}
-
if
( m_nEpollFd !=
-1
)
-
{
-
CloseEpoll();
-
}
m_nEpollFd = epoll_create(size);
-
if
( m_nEpollFd ==
-1)
-
{
-
return EPOLL_CREATE_ERROR;
-
}
-
return
1;
}
/*处理epoll事件
出参:1: 成功 ; 0: 失败
*/
int
CEpollServer::ProcessEpoll()
{
assert(m_nEpollFd !=
-1);
-
int nFds =
0;
-
int connFd =
-1, readFd =
-1, writeFd =
-1;
-
int n =
0, nSize =
0;
-
int nListenFd =
-1;
-
char buf[MAX_READ_SIZE]
=
{0};
-
struct sockaddr_in clientAddr;
-
socklen_t clilen;
-
struct epoll_event ev, events[20];
memset((void*)&ev,
0,
sizeof(ev));
nListenFd = m_cTcp.GetHandle();
ev.data.fd = nListenFd;
ev.events = EPOLLIN|EPOLLET;
-
if
( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, nListenFd,
&ev)
==
-1
)
-
{
-
return EPOLL_CTL_ERROR;
-
}
-
while(1)
-
{
n =
0;
nSize =
0;
nFds = epoll_wait(m_nEpollFd, events,
20,
500);
-
for
(int i =
0; i< nFds;
i)
-
{
memset(buf,
0, MAX_READ_SIZE);
-
if
(events[i].data.fd == nListenFd )
-
{
-
while
(
(connFd = accept(nListenFd,
(sockaddr*)&clientAddr,
&clilen))
>
0
)
-
{
m_cTcp.SetNoblock(connFd);
//ET模式需设置为非阻塞的
ev.data.fd = connFd;
ev.events = EPOLLIN|EPOLLET;
-
if
( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, connFd,
&ev)
==
-1
)
-
{
-
return EPOLL_CTL_ERROR;
-
}
-
}
-
if
( connFd ==
-1
&& errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR )
-
{
-
return ACCEPT_ERROR;
-
}
-
continue;
-
}
-
else
if(events[i].events & EPOLLIN)
-
{
readFd = events[i].data.fd;
-
if
(readFd <
0)
-
{
-
continue;
-
}
-
//读取数据
-
while
(
(nSize = read(readFd, buf n, MAX_READ_SIZE -
1))
>
0
)
-
{
n = nSize;
-
}
-
//EAGAIN说明读到结尾了
-
if
(nSize ==
-1
&& errno != EAGAIN )
-
{
fprintf(stderr,
"epoll read failedn");
-
//ngleLog::WriteLog(ERROR, "%s", "epoll read fialed");
-
}
fprintf(stdout,
"read data is:%sn", buf);
ev.data.fd = readFd;
ev.events = EPOLLOUT|EPOLLET;//边沿模式(ET)
epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, readFd,
&ev);
-
}
-
else
if(events[i].events & EPOLLOUT)
-
{
writeFd = events[i].data.fd;
-
//写数据
strncpy(buf,
"hello client",
sizeof(buf)-1);
-
int dataSize = strlen(buf);
n = dataSize;
-
while(n >
0)
-
{
nSize = write(writeFd, buf dataSize - n, n);
-
if
(nSize < n)
-
{
-
if
(nSize ==
-1
&& errno != EAGAIN)
-
{
-
break;
-
}
-
}
n -= nSize;
-
}
ev.data.fd = writeFd;
ev.events = EPOLLIN|EPOLLET;
epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, writeFd,
&ev);
-
}
-
}
-
}
}
/*
关闭epoll文件描述符
*/
int
CEpollServer::CloseEpoll()
{
-
if
(m_nEpollFd !=
-1)
-
{
close (m_nEpollFd);
m_nEpollFd =
-1;
-
}
-
return
1;
}
将上面CEpollServer类和TCP类结合起来编译成一个动态库,makefile如下:
代码语言:javascript复制
LIB_DIR=./lib
src=$(wildcard *.cpp)
- obj=(patsubst %.cpp,%.o,(src))
PIC=-fPIC
LIBSO=-shared
#CC=g -gdwarf-2 -gstrict-dwarf
CC=g
-g
%.o:%.cpp
- (CC) -c <
network:$(obj)
- (CC) -o libnetwork.so ^
cp -f libnetwork.so ../test/lib
clean:
rm -f *.o *.so
然后实现TestEpollServer.cpp,如下: 注意:下面ConfigIni和SingleLog都是我本人测试时候写的库,如需使用下面代码,需要修改!
代码语言:javascript复制
#include
"../../readini/ConfigIni.h"
#include
<string>
#include
"../../network/EpollServer.h"
#include
"../../log/SingleLog.h"
CEpollServer g_clEpollServer;
#define FILEDIR "./socket.ini"
//epoll server
int epoll_server_init()
{
-
int iRet =
-1;
string strIp;
-
int nPort =
0, nEpollNum =
0, nTimeout =
0;
-
ConfigIni::Init(string(FILEDIR));
strIp =
ConfigIni::ReadStr(string("SERVER"), string("Addr"));
-
if
(strIp ==
"")
-
{
-
SingleLog::WriteLog(ERROR,"read server addr failed");
-
return iRet;
-
}
nPort =
ConfigIni::ReadInt(string("SERVER"), string("Port"));
-
if
( nPort ==
-1
)
-
{
-
SingleLog::WriteLog(ERROR,"read server port failed");
-
return iRet;
-
}
nEpollNum =
ConfigIni::ReadInt(string("SERVER"), string("MaxEpollNum"));
-
if
( nEpollNum ==
-1
)
-
{
-
SingleLog::WriteLog(ERROR,"read server epoll num failed");
-
return iRet;
-
}
nTimeout =
ConfigIni::ReadInt(string("SERVER"), string("Timeout"));
-
if
( nTimeout ==
-1
)
-
{
-
SingleLog::WriteLog(ERROR,"read server timeout failed");
-
return iRet;
-
}
iRet = g_clEpollServer.CreateEpoll(strIp.c_str(), nPort, nEpollNum);
-
if
( iRet ==
0
)
-
{
-
SingleLog::WriteLog(ERROR,
"epoll create failed");
-
return
-1;
-
}
-
return
0;
}
void epoll_server_run()
{
g_clEpollServer.ProcessEpoll();
}
int main()
{
-
SingleLog::Init();
-
if
(epoll_server_init()
==
-1)
-
{
-
return
-1;
-
}
epoll_server_run();
-
return
0;
}
//TestClient.cpp
#include
<stdio.h>
#include
<iostream>
#include
<string.h>
#include
"../../network/SxTcp.h"
using
namespace std;
int main()
{
-
CTcp tcp;
-
int iRet =
0;
-
int iFd =
0;
-
char buf[128]
=
{0};
iRet = tcp.Open();
-
if
(iRet ==
0)
-
{
perror("socket create failed");
-
return
-1;
-
}
iRet = tcp.Connect("192.168.233.250",
6666);
-
if
(iRet ==
0)
-
{
perror("socket connect failed");
-
return
-1;
-
}
-
while(1)
-
{
memset(buf,
0,
sizeof(buf));
cout <<
"please input some string:";
cin >> buf;
iRet = tcp.Send(buf, strlen(buf));
-
if
(iRet <
-1
&& errno != EAGAIN)
-
{
perror("send failed");
-
return
-1;
-
}
-
else
if(iRet ==
0)
-
{
perror("connect is closed");
-
return
-1;
-
}
memset(buf,
0,
sizeof(buf));
iRet = tcp.Recv(buf,
sizeof(buf));
-
if
(iRet <
0
&& errno != EAGAIN)
-
{
perror("recv failed");
-
return
-1;
-
}
-
else
if(iRet ==
0)
-
{
perror("socket not connect");
-
return
-1;
-
}
fprintf(stdout,
"recv data is:%sn", buf);
-
}
-
return
0;
}
分别编译TestEpollServer.cpp和TestClient.cpp,生成服务端和客户端应用程序,即可实现通信。