大家好,又见面了,我是你们的朋友全栈君。
IOCP底层机理还没有透彻的理解,现将部分内容记录如下 2014.7.22 16:50
把完成端口理解为完成队列。
投递的异步IO请求完成后会携带三参数返回。
异步IO请求分为:连接、接收、发送,分别对应AcceptEx、WSARecv、WSASend。
三参数:单句柄数据结构、单IO数据结构、传输字节数。
用两种自定义结构:单句柄数据结构和单IO数据结构
单句柄数据结构与特定socket关联,在该socket上完成的所有类型的所有异步请求完成后都会返回该结构。单句柄数据结构的故事是这样的:把socket关联到完成端口时允许带一个整数,过后在该socket上完成的所有异步请求完成后都会返回该整数。
单IO数据机构与具体异步请求关联,每次投递请求时都要带一个单IO数据结构,请求A完成后会携带与A关联的单IO数据结构返回。
还原故事发生的大体情节:用
代码语言:javascript复制 HANDLE hcp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if(hcp == NULL)
{
std::cout<<"Create Completion Port Failed : "<<GetLastError()<<std::endl;
return -1;
}
创建完成端口,现在假设socket s上需要进行异步接收,用
代码语言:javascript复制 CreateIoCompletionPort((HANDLE)s, hcp, (DWORD)pphd, 0);
将s关联到完成端口hcp,同时带一整数pphd。pphd为单句柄数据结构的指针,该结构用new在堆上分配。故传入的是单句柄数据结构指针。用
代码语言:javascript复制 LPPER_IO_DATA ppiod = new PER_IO_DATA;
ZeroMemory(&(ppiod->overlapped), sizeof(OVERLAPPED));
ppiod->operationType = OP_RECV;
WSABUF databuf;
databuf.buf = ppiod->buf;
databuf.len = BUF_LEN;
DWORD dwRecv = 0;
DWORD dwFlags = 0;
WSARecv(s, &databuf, 1, &dwRecv, &dwFlags, &ppiod->overlapped, NULL);
在s上投递请求。所有的工作线程阻塞在完成端口等待请求完成的到来
代码语言:javascript复制 DWORD dwNum = 0;
LPPER_HANDLE_DATA pphd;
LPPER_IO_DATA ppiod;
bool ret = GetQueuedCompletionStatus(hcp, &dwNum, (LPDWORD)&pphd, (LPOVERLAPPED*)&ppiod, WSA_INFINITE);
请求到来,唤醒一个工作线程,该线程从GetQueuedCompletionStatus函数返回,dwNum、pphd、ppiod三参数出参:
dwNum被赋值本次IO传输字节数,
pphd为指针,GetQueuedCompletionStatus中再取地址,是为指针地址,目的为指针赋值,所赋值为CreateIoCompletionPort((HANDLE)s, hcp, (DWORD)pphd, 0)传入的pphd,
ppiod同pphd,被赋值WSARecv(s, &databuf, 1, &dwRecv, &dwFlags, &ppiod->overlapped, NULL)中的ppiod的地址。
自定义结构可以任意设置,唯一的要求就是单IO数据结构的第一个成员必须是OVERLAPPED。
代码语言:javascript复制#include <WinSock2.h>
#include <Windows.h>
#include <iostream>
#include <process.h>
#include <string>
#include <MSWSock.h>
#include <set>
#pragma comment(lib, "Ws2_32.lib")
#pragma comment(lib, "Kernel32.lib")
#pragma comment(lib, "Mswsock.lib")
#define BUF_LEN 1024
enum OperateType
{
OP_RECV,
OP_SEND,
OP_ACCEPT,
};
typedef struct PER_HANDLE_DATA
{
SOCKET s; //记录是哪个socket上的请求
SOCKADDR_IN addr; //记录该socket对应的客户端地址和端口
}PER_HANDLE_DATA, *LPPER_HANDLE_DATA;
typedef struct PER_IO_DATA
{
OVERLAPPED overlapped; //第一项必须为OVERLAPPED
SOCKET cs; //记录客户端socket
char buf[BUF_LEN]; //发送:此buf存储待发送数据,接收:此buf存储到来的数据
int operationType; //记录完成的请求类型:是接收?是发送? 还是连接?
}PER_IO_DATA, *LPPER_IO_DATA;
异步连接
为什么监听连接用异步?
网上很多IOCP模型:主线程循环accept等待连接的到来,然后将Client socket加入IOCP,同时在Client socket上投递WSARecv等待数据到来。其他worker线程GetQueuedCompletionStatus阻塞在IOCP上等待请求的完成。
缺点:阻塞式accept效率不高,且接收连接单独占用一个线程,让原本的一个worker线程专门来等待连接,压榨了一个worker的潜能。
异步连接让所有运行的线程均为worker线程,且MSDN说AcceptEx比accpet连接进行得更快,可以用少量的线程处理大量的Client连接
整体过程:
1.创建listenSocket,与本地地址绑定,开始监听;
2.将listenSocket添加到IOCP;
3.用AcceptEx在listenSocket上投递连接请求。
如何投递异步连接请求?
代码语言:javascript复制bool PostAccept(SOCKET listenSocket)
{
/************************************************
为即将到来的Client连接事先创建好Socket:
阻塞式连接中accept的返回值即为新进连接创建的Socket,
异步连接需要事先将此Socket备下,再行连接
**************************************************/
SOCKET cs = socket(AF_INET, SOCK_STREAM, 0);
if(INVALID_SOCKET == cs)
{
std::cout<<"Create Socket Failed : "<<GetLastError()<<std::endl;
return false;
}
/*每一个异步请求必须一个PER_IO_DATA结构*/
LPPER_IO_DATA ppiod = new PER_IO_DATA;
ZeroMemory(&(ppiod->overlapped), sizeof(OVERLAPPED)); //PER_IO_DATA在使用前必须清空OVERLAPPED成员
ppiod->operationType = OP_ACCEPT; //待会从GetQueuedCompletionStatus返回,通过查看operationType就知道是接收完成?发送完成?还是连接完成
ppiod->cs = cs; //连接完成后要在新进连接上投递异步发送或接收,所以要事先把Client Socket记录下来以备后续使用
/***********************
投递异步连接请求:
函数:AcceptEx
参数:
一参本地监听Socket
二参为即将到来的客人准备好的Socket
三参接收缓冲区:
一存客人发来的第一份数据、二存Server本地地址、三存Client远端地址
地址包括IP和端口,
四参定三参数据区长度,0表只连不接收、连接到来->请求完成,否则连接到来 任意长数据到来->请求完成
五参定三参本地地址区长度,至少sizeof(sockaddr_in) 16
六参定三参远端地址区长度,至少sizeof(sockaddr_in) 16
七八两参不用管
***************************************/
DWORD dwRecv;
int len = sizeof(sockaddr_in) 16;
bool ret = AcceptEx(listenSocket, ppiod->cs, ppiod->buf, 0, len, len, &dwRecv, &ppiod->overlapped);
if(false == ret && ERROR_IO_PENDING != GetLastError())
{
std::cout<<"AcceptEx Failed : "<<GetLastError()<<std::endl;
return false;
}
return true;
}
异步接收
代码语言:javascript复制bool PostRecv(SOCKET s)
{
/*每一个异步请求必须一个PER_IO_DATA结构*/
LPPER_IO_DATA ppiod = new PER_IO_DATA;
ZeroMemory(&(ppiod->overlapped), sizeof(OVERLAPPED));
ppiod->operationType = OP_RECV; //请求类型是接收
memset(ppiod->buf, 0, BUF_LEN); //清空接收缓存
WSABUF databuf;
databuf.buf = ppiod->buf; //接收缓冲区首地址
databuf.len = BUF_LEN; //接收缓冲区长度
/***********************
投递异步接收请求:
函数:WSARecv
参数:
一参接收Socket
二参WSABUF指针,接收缓冲数组
三参说二参数组元素个数、接收缓冲个数
四五六七不用管
***************************************/
DWORD dwRecv = 0;
DWORD dwFlags = 0;
int ret = WSARecv(s, &databuf, 1, &dwRecv, &dwFlags, &ppiod->overlapped, NULL);
if(SOCKET_ERROR == ret && WSA_IO_PENDING != GetLastError())
return false;
return true;
}
异步发送
代码语言:javascript复制bool PostSend(SOCKET s, const char *buf, int len)
{
/*每一个异步请求必须一个PER_IO_DATA结构*/
LPPER_IO_DATA ppiod = new PER_IO_DATA;
ZeroMemory(&(ppiod->overlapped), sizeof(OVERLAPPED));
ppiod->operationType = OP_SEND;
memset(ppiod->buf, 0, BUF_LEN);
memcpy(ppiod->buf, buf, len);
WSABUF databuf;
databuf.buf = ppiod->buf; //发送缓冲首地址
databuf.len = len; //发送数据长度,定多少就发多少:多定多发,少定少发,不定不发
/*同WSARecv*/
DWORD dwRecv = 0;
DWORD dwFlags = 0;
WSASend(s, &databuf, 1, &dwRecv, dwFlags, &ppiod->overlapped, NULL);
return true;
}
处理完成的请求 bool ret = GetQueuedCompletionStatus(hcp, &dwNum, (LPDWORD)&pphd, (LPOVERLAPPED*)&ppiod, WSA_INFINITE);
全体worker线程循环调用GetQueuedCompletionStatus,阻塞在该函数调用中,等待从IOCP传来请求完成的通知。没有任何请求完成时,IOCP让worker沉睡;当请求到来时,IOCP唤醒最后入睡的worker线程起来执行处理。
GetQueuedCompletionStatus成功返回true,失败返回false。
实验结果如下:
1.连接到来, ret = true && dwNum = 0 && ppiod->operationType = OP_ACCEPT
2.连接断开:
A.Client调用closesocket,ret = true && dwNum = 0
B.Client直接退出,ret = false && dwNum = 0
C.Client暴力中断,如断电,2014.7.23 18:49 用两台电脑测试结果表明对于暴力断电,IOCP无任何反应,故心跳检测 必不可少
3.投递请求的线程退出,ret = false && dwNum = 0 && GetLastError() = 995(995:由于线程退出或应用程序请求,已放弃 I/O 操作)
附加:Client关闭连接或直接退出,在对应socket上投递的所有请求均返回。假设在socket 1上投递了三个WSARecv, 当Client关闭连接时,会有三个连接断开返回,不要重复释放空间。
用户控制退出
PostQueuedCompletionStatus(hcp, -1, NULL, NULL);
向IOCP投一个IO完成包,会有一个worker线程从GetQueuedCompletionStatus返回,同时它得到的三参数依次是dwNum = -1, pphd = NULL, ppiod = NULL,正是我们传入PostQueuedCompletionStatus的后三个参数。
PostQueuedCompletionStatus(hcp, X, Y, Z),则返回的worker线程得到的三参数依次是dwNum = X, pphd = Y, ppiod = Z
正常情况下dwNum只有两种取值:正整数和0,所以worker线程返回后通过查看dwNum == -1是否成立就可以判断是否到了结束的时间了。
理想情况下,以上述格式每投递一个完成包就有一个worker退出,所以我们应该投递workerNum个完成包来使所有worker退出。在非理想情况下我们应该防止还有worker没有退出,完美方式
代码语言:javascript复制void NetModule::Stop()
{
/*std::set<HANDLE> setWorkers存储所有worker线程句柄*/
for(size_t i = 0; i < setWorkers.size(); i )
PostQueuedCompletionStatus(hcp, -1, NULL, NULL);
for(auto iter = setWorkers.begin(); iter != setWorkers.end();)
{
int ret = WaitForSingleObject(*iter, 100);
if(WAIT_OBJECT_0 == ret)
setWorkers.erase(iter );
else
{
PostQueuedCompletionStatus(hcp, -1, NULL, NULL);
continue;
}
}
}
所以检查GetQueuedCompletionStatus的返回值的标准方式
代码语言:javascript复制 bool ret = GetQueuedCompletionStatus(hcp, &dwNum, (LPDWORD)&pphd, (LPOVERLAPPED*)&ppiod, WSA_INFINITE);
//线程退出控制,没有释放申请的堆空间,还不完善
if(-1 == dwNum)
{
std::cout<<"Thread Exit"<<std::endl;
_endthreadex(0);
return 0;
}
int type = ppiod->operationType;
if(0 == dwNum && OP_ACCEPT != type)
{
//投递请求的线程退出
if(GetLastError() == 995)
{
std::cout<<"Thread Exit Cause Request Returned"<<std::endl;
}
//连接断开
else
{
std::cout<<"Peer Close The Connection"<<std::endl;
//在一个socket上投递多个WSARecv需要考虑连接被Client断开时所有异步WSARecv均返回不会重复delete PER_HANDLE_DATA
AutoLock lock(pphd->mutex);
if(pphd->flag == true)
{
closesocket(pphd->s);
delete pphd;
pphd->flag = false;
}
}
delete ppiod;
continue;
}
//错误发生
if(false == ret)
{
std::cout<<"An Error Occurs : "<<GetLastError()<<std::endl;
delete ppiod;
continue;
}
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/155531.html原文链接:https://javaforall.cn