IOCP一:AcceptEx「建议收藏」

2022-09-06 17:12:50 浏览数 (1)

大家好,又见面了,我是你们的朋友全栈君。

IOCP底层机理还没有透彻的理解,现将部分内容记录如下 2014.7.22 16:50

把完成端口理解为完成队列。

投递的异步IO请求完成后会携带三参数返回。

异步IO请求分为:连接、接收、发送,分别对应AcceptEx、WSARecv、WSASend。

三参数:单句柄数据结构、单IO数据结构、传输字节数。

用两种自定义结构:单句柄数据结构和单IO数据结构

单句柄数据结构与特定socket关联,在该socket上完成的所有类型的所有异步请求完成后都会返回该结构。单句柄数据结构的故事是这样的:把socket关联到完成端口时允许带一个整数,过后在该socket上完成的所有异步请求完成后都会返回该整数。

单IO数据机构与具体异步请求关联,每次投递请求时都要带一个单IO数据结构,请求A完成后会携带与A关联的单IO数据结构返回。

还原故事发生的大体情节:用

代码语言:javascript复制
	HANDLE hcp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
	if(hcp == NULL)
	{
		std::cout<<"Create Completion Port Failed : "<<GetLastError()<<std::endl;
		return -1;
	}

创建完成端口,现在假设socket s上需要进行异步接收,用

代码语言:javascript复制
		CreateIoCompletionPort((HANDLE)s, hcp, (DWORD)pphd, 0);

将s关联到完成端口hcp,同时带一整数pphd。pphd为单句柄数据结构的指针,该结构用new在堆上分配。故传入的是单句柄数据结构指针。用

代码语言:javascript复制
			LPPER_IO_DATA ppiod = new PER_IO_DATA;
			ZeroMemory(&(ppiod->overlapped), sizeof(OVERLAPPED));
			ppiod->operationType = OP_RECV;
			WSABUF databuf;
			databuf.buf = ppiod->buf;
			databuf.len = BUF_LEN;

			DWORD dwRecv = 0;
			DWORD dwFlags = 0;
			WSARecv(s, &databuf, 1, &dwRecv, &dwFlags, &ppiod->overlapped, NULL);

在s上投递请求。所有的工作线程阻塞在完成端口等待请求完成的到来

代码语言:javascript复制
	DWORD dwNum = 0;
	LPPER_HANDLE_DATA pphd;
	LPPER_IO_DATA ppiod;
	bool ret = GetQueuedCompletionStatus(hcp, &dwNum, (LPDWORD)&pphd, (LPOVERLAPPED*)&ppiod, WSA_INFINITE);

请求到来,唤醒一个工作线程,该线程从GetQueuedCompletionStatus函数返回,dwNum、pphd、ppiod三参数出参:

dwNum被赋值本次IO传输字节数,

pphd为指针,GetQueuedCompletionStatus中再取地址,是为指针地址,目的为指针赋值,所赋值为CreateIoCompletionPort((HANDLE)s, hcp, (DWORD)pphd, 0)传入的pphd,

ppiod同pphd,被赋值WSARecv(s, &databuf, 1, &dwRecv, &dwFlags, &ppiod->overlapped, NULL)中的ppiod的地址。

自定义结构可以任意设置,唯一的要求就是单IO数据结构的第一个成员必须是OVERLAPPED。

代码语言:javascript复制
#include <WinSock2.h>
#include <Windows.h>
#include <iostream>
#include <process.h>
#include <string>
#include <MSWSock.h>
#include <set>

#pragma comment(lib, "Ws2_32.lib")
#pragma comment(lib, "Kernel32.lib")
#pragma comment(lib, "Mswsock.lib")

#define BUF_LEN 1024

enum OperateType
{
	OP_RECV,
	OP_SEND,
	OP_ACCEPT,
};

typedef struct PER_HANDLE_DATA
{
	SOCKET s;			//记录是哪个socket上的请求
	SOCKADDR_IN addr;	//记录该socket对应的客户端地址和端口
}PER_HANDLE_DATA, *LPPER_HANDLE_DATA;

typedef struct PER_IO_DATA
{
	OVERLAPPED overlapped;	//第一项必须为OVERLAPPED
	SOCKET cs;				//记录客户端socket
	char buf[BUF_LEN];		//发送:此buf存储待发送数据,接收:此buf存储到来的数据
	int operationType;		//记录完成的请求类型:是接收?是发送? 还是连接?
}PER_IO_DATA, *LPPER_IO_DATA;

异步连接

为什么监听连接用异步?

网上很多IOCP模型:主线程循环accept等待连接的到来,然后将Client socket加入IOCP,同时在Client socket上投递WSARecv等待数据到来。其他worker线程GetQueuedCompletionStatus阻塞在IOCP上等待请求的完成。

缺点:阻塞式accept效率不高,且接收连接单独占用一个线程,让原本的一个worker线程专门来等待连接,压榨了一个worker的潜能。

异步连接让所有运行的线程均为worker线程,且MSDN说AcceptEx比accpet连接进行得更快,可以用少量的线程处理大量的Client连接

整体过程:

1.创建listenSocket,与本地地址绑定,开始监听;

2.将listenSocket添加到IOCP;

3.用AcceptEx在listenSocket上投递连接请求。

如何投递异步连接请求?

代码语言:javascript复制
bool PostAccept(SOCKET listenSocket)
{
	/************************************************
	为即将到来的Client连接事先创建好Socket:
	阻塞式连接中accept的返回值即为新进连接创建的Socket,
	异步连接需要事先将此Socket备下,再行连接
	**************************************************/
	SOCKET cs = socket(AF_INET, SOCK_STREAM, 0);
	if(INVALID_SOCKET == cs)
	{
		std::cout<<"Create Socket Failed : "<<GetLastError()<<std::endl;
		return false;
	}

	/*每一个异步请求必须一个PER_IO_DATA结构*/
	LPPER_IO_DATA ppiod = new PER_IO_DATA;
	ZeroMemory(&(ppiod->overlapped), sizeof(OVERLAPPED));	//PER_IO_DATA在使用前必须清空OVERLAPPED成员
	ppiod->operationType = OP_ACCEPT;	//待会从GetQueuedCompletionStatus返回,通过查看operationType就知道是接收完成?发送完成?还是连接完成
	ppiod->cs = cs;	//连接完成后要在新进连接上投递异步发送或接收,所以要事先把Client Socket记录下来以备后续使用

	/***********************
	投递异步连接请求:
	函数:AcceptEx
	参数:
	一参本地监听Socket
	二参为即将到来的客人准备好的Socket
	三参接收缓冲区:
		一存客人发来的第一份数据、二存Server本地地址、三存Client远端地址
		地址包括IP和端口,
	四参定三参数据区长度,0表只连不接收、连接到来->请求完成,否则连接到来 任意长数据到来->请求完成
	五参定三参本地地址区长度,至少sizeof(sockaddr_in)   16
	六参定三参远端地址区长度,至少sizeof(sockaddr_in)   16
	七八两参不用管
	***************************************/
	DWORD dwRecv;
	int len = sizeof(sockaddr_in)   16;
	bool ret = AcceptEx(listenSocket, ppiod->cs, ppiod->buf, 0, len, len, &dwRecv, &ppiod->overlapped);
	if(false == ret && ERROR_IO_PENDING != GetLastError())
	{
		std::cout<<"AcceptEx Failed : "<<GetLastError()<<std::endl;
		return false;
	}

	return true;
}

异步接收

代码语言:javascript复制
bool PostRecv(SOCKET s)
{
	/*每一个异步请求必须一个PER_IO_DATA结构*/
	LPPER_IO_DATA ppiod = new PER_IO_DATA;
	ZeroMemory(&(ppiod->overlapped), sizeof(OVERLAPPED));
	ppiod->operationType = OP_RECV;	//请求类型是接收
	memset(ppiod->buf, 0, BUF_LEN);	//清空接收缓存

	WSABUF databuf;
	databuf.buf = ppiod->buf;	//接收缓冲区首地址
	databuf.len = BUF_LEN;		//接收缓冲区长度

	/*********************** 
    投递异步接收请求: 
    函数:WSARecv 
    参数: 
    一参接收Socket 
    二参WSABUF指针,接收缓冲数组
    三参说二参数组元素个数、接收缓冲个数
    四五六七不用管
    ***************************************/ 
	DWORD dwRecv = 0;
	DWORD dwFlags = 0;
	int ret = WSARecv(s, &databuf, 1, &dwRecv, &dwFlags, &ppiod->overlapped, NULL);
	if(SOCKET_ERROR == ret && WSA_IO_PENDING != GetLastError())
		return false;

	return true;
}

异步发送

代码语言:javascript复制
bool PostSend(SOCKET s, const char *buf, int len)
{
	/*每一个异步请求必须一个PER_IO_DATA结构*/
	LPPER_IO_DATA ppiod = new PER_IO_DATA;
	ZeroMemory(&(ppiod->overlapped), sizeof(OVERLAPPED));
	ppiod->operationType = OP_SEND;
	memset(ppiod->buf, 0, BUF_LEN);
	memcpy(ppiod->buf, buf, len);

	WSABUF databuf;
	databuf.buf = ppiod->buf;	//发送缓冲首地址
	databuf.len = len;			//发送数据长度,定多少就发多少:多定多发,少定少发,不定不发

	/*同WSARecv*/
	DWORD dwRecv = 0;
	DWORD dwFlags = 0;
	WSASend(s, &databuf, 1, &dwRecv, dwFlags, &ppiod->overlapped, NULL);

	return true;
}

处理完成的请求 bool ret = GetQueuedCompletionStatus(hcp, &dwNum, (LPDWORD)&pphd, (LPOVERLAPPED*)&ppiod, WSA_INFINITE);

全体worker线程循环调用GetQueuedCompletionStatus,阻塞在该函数调用中,等待从IOCP传来请求完成的通知。没有任何请求完成时,IOCP让worker沉睡;当请求到来时,IOCP唤醒最后入睡的worker线程起来执行处理。

GetQueuedCompletionStatus成功返回true,失败返回false。

实验结果如下:

1.连接到来, ret = true && dwNum = 0 && ppiod->operationType = OP_ACCEPT

2.连接断开:

A.Client调用closesocket,ret = true && dwNum = 0

B.Client直接退出,ret = false && dwNum = 0

C.Client暴力中断,如断电,2014.7.23 18:49 用两台电脑测试结果表明对于暴力断电,IOCP无任何反应,故心跳检测 必不可少

3.投递请求的线程退出,ret = false && dwNum = 0 && GetLastError() = 995(995:由于线程退出或应用程序请求,已放弃 I/O 操作)

附加:Client关闭连接或直接退出,在对应socket上投递的所有请求均返回。假设在socket 1上投递了三个WSARecv, 当Client关闭连接时,会有三个连接断开返回,不要重复释放空间。

用户控制退出

PostQueuedCompletionStatus(hcp, -1, NULL, NULL);

向IOCP投一个IO完成包,会有一个worker线程从GetQueuedCompletionStatus返回,同时它得到的三参数依次是dwNum = -1, pphd = NULL, ppiod = NULL,正是我们传入PostQueuedCompletionStatus的后三个参数。

PostQueuedCompletionStatus(hcp, X, Y, Z),则返回的worker线程得到的三参数依次是dwNum = X, pphd = Y, ppiod = Z

正常情况下dwNum只有两种取值:正整数和0,所以worker线程返回后通过查看dwNum == -1是否成立就可以判断是否到了结束的时间了。

理想情况下,以上述格式每投递一个完成包就有一个worker退出,所以我们应该投递workerNum个完成包来使所有worker退出。在非理想情况下我们应该防止还有worker没有退出,完美方式

代码语言:javascript复制
void NetModule::Stop()
{
	/*std::set<HANDLE> setWorkers存储所有worker线程句柄*/
	for(size_t i = 0; i < setWorkers.size(); i  )
		PostQueuedCompletionStatus(hcp, -1, NULL, NULL);

	for(auto iter = setWorkers.begin(); iter != setWorkers.end();)
	{
		int ret = WaitForSingleObject(*iter, 100);
		if(WAIT_OBJECT_0 == ret)
			setWorkers.erase(iter  );
		else
		{
			PostQueuedCompletionStatus(hcp, -1, NULL, NULL);
			continue;
		}
	}
}

所以检查GetQueuedCompletionStatus的返回值的标准方式

代码语言:javascript复制
		bool ret = GetQueuedCompletionStatus(hcp, &dwNum, (LPDWORD)&pphd, (LPOVERLAPPED*)&ppiod, WSA_INFINITE);

		//线程退出控制,没有释放申请的堆空间,还不完善
		if(-1 == dwNum)
		{
			std::cout<<"Thread Exit"<<std::endl;
			_endthreadex(0);
			return 0;
		}

		int type = ppiod->operationType;
		if(0 == dwNum && OP_ACCEPT != type)
		{
			//投递请求的线程退出
			if(GetLastError() == 995)
			{
				std::cout<<"Thread Exit Cause Request Returned"<<std::endl;
			}
			//连接断开
			else
			{
				std::cout<<"Peer Close The Connection"<<std::endl;

				//在一个socket上投递多个WSARecv需要考虑连接被Client断开时所有异步WSARecv均返回不会重复delete PER_HANDLE_DATA
				AutoLock lock(pphd->mutex);
				if(pphd->flag == true)
				{
					closesocket(pphd->s);
					delete pphd;
					pphd->flag = false;
				}
			}

			delete ppiod;
			continue;
		}

		//错误发生
		if(false == ret)
		{
			std::cout<<"An Error Occurs : "<<GetLastError()<<std::endl;
			delete ppiod;
			continue;
		}

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/155531.html原文链接:https://javaforall.cn

0 人点赞