IOCP使用acceptEX进行异步接收

IOCP使用acceptEX进行异步接收示例代码#include<winsock2.h>#include<windows.h>#include<string>#include<iostream>#include<process.h>#include<ws2tcpip.h>#include<mswsock.h>usingnamespacestd;//#pragmacomment(lib,”MSWSOCK.lib”)#pragm

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

示例代码为什么要调用WSAIoctl ()函数

#include <winsock2.h>
#include <windows.h>
#include <string>
#include <iostream>
#include<process.h>
#include <ws2tcpip.h>
#include <mswsock.h>
using namespace std;

//#pragma comment(lib,"MSWSOCK.lib")
#pragma comment(lib,"ws2_32.lib")
#pragma comment(lib,"kernel32.lib")

HANDLE g_hIOCP;

enum IO_OPERATION{ 
   
	IO_READ,
	IO_WRITE,
	ACCEPT
};

class IO_DATA{ 
   
public:
    OVERLAPPED                  Overlapped;
    WSABUF                      wsabuf;
    int                         nBytes;
    IO_OPERATION                opCode;
	char						buffer[1024];
	int							BufferLen;
    SOCKET                      client;
	SOCKET						server;
};

LPFN_ACCEPTEX lpfnAcceptEx = NULL;//AcceptEx函数指针
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockAddrs = NULL;
char buffer[1024];

string ValueToIP(const int nValue)
{ 
    
	char strTemp[20];
	sprintf( strTemp,"%d.%d.%d.%d",
		(nValue&0xff000000)>>24,
		(nValue&0x00ff0000)>>16,
		(nValue&0x0000ff00)>>8,
		(nValue&0x000000ff) );

	return string(strTemp);
}

unsigned int _stdcall WorkerThread (LPVOID WorkThreadContext) { 
   
    IO_DATA *lpIOContext = NULL; 
    DWORD nBytes = 0;
    DWORD dwFlags = 0; 
    int nRet = 0;

    DWORD dwIoSize = 0; 
    void * lpCompletionKey = NULL;
    LPOVERLAPPED lpOverlapped = NULL;

    while(1){ 
   
        bool ret = GetQueuedCompletionStatus(g_hIOCP, &dwIoSize,(LPDWORD)&lpCompletionKey,(LPOVERLAPPED *)&lpOverlapped, INFINITE);
		if ( !ret ) { 
   
			if ( GetLastError() == WAIT_TIMEOUT ) { 
   
				continue;
			}
			cout << "ret:" << ret << endl;
        }

        lpIOContext = (IO_DATA *)lpOverlapped;

        if(lpIOContext->opCode == IO_READ) // a read operation complete
        { 
   
			cout << "io_read...." << lpIOContext->wsabuf.buf << "end" << endl;
            ZeroMemory(&lpIOContext->Overlapped, sizeof(lpIOContext->Overlapped));
			strcpy(buffer, "ok.....\n");
			lpIOContext->wsabuf.buf = buffer;
			lpIOContext->wsabuf.len = strlen(lpIOContext->wsabuf.buf);
            lpIOContext->opCode = IO_WRITE;
            lpIOContext->nBytes = strlen(buffer);
            dwFlags = 0;
            nBytes = 0;
            nRet = WSASend(
                lpIOContext->client,
                &lpIOContext->wsabuf, 1, &nBytes,
                dwFlags,
                &(lpIOContext->Overlapped), NULL);
            if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) { 
   
                cout << "WASSend Failed::Reason Code::"<< WSAGetLastError() << endl;
                closesocket(lpIOContext->client);
                delete lpIOContext;
                continue;
            }
            memset(buffer, NULL, sizeof(buffer));
        }
        else if(lpIOContext->opCode == IO_WRITE) //a write operation complete
        { 
   
			cout << "io_write...." << endl;
            // Write operation completed, so post Read operation.
            lpIOContext->opCode = IO_READ; 
            nBytes = 1024;
            dwFlags = 0;
            lpIOContext->wsabuf.buf = buffer;
            lpIOContext->wsabuf.len = nBytes;
            lpIOContext->nBytes = nBytes;
            ZeroMemory(&lpIOContext->Overlapped, sizeof(lpIOContext->Overlapped));

            nRet = WSARecv(
                lpIOContext->client,
                &lpIOContext->wsabuf, 1, &nBytes,
                &dwFlags,
                &lpIOContext->Overlapped, NULL);
            if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) { 
   
                cout << "WASRecv Failed::Reason Code1::"<< WSAGetLastError() << endl;
                closesocket(lpIOContext->client);
                delete lpIOContext;
                continue;
            } 
            cout<<lpIOContext->wsabuf.buf<<endl;
		} else if (lpIOContext->opCode == ACCEPT) { 
   
			//sockaddr* client_addr, *server_addr;
			//GetAcceptExSockaddrs(lpIOContext->wsabuf.buf, 0, sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &client_addr, (LPINT)sizeof(sockaddr_in), &server_addr, (LPINT)sizeof(sockaddr_in));
			//cout << "接收到一个连接 ip:" << ntohs(((sockaddr_in*)client_addr)->sin_port) << " port:" << endl;

			SOCKADDR_IN* remote = NULL;
			SOCKADDR_IN* local = NULL;
			int remoteLen = sizeof(SOCKADDR_IN);
			int localLen = sizeof(SOCKADDR_IN);
			lpfnGetAcceptExSockAddrs(lpIOContext->wsabuf.buf, 0,
				sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, (LPSOCKADDR*)&local, &localLen, (LPSOCKADDR*)&remote, &remoteLen);

			cout << "接收到一个连接 local ip:" << ValueToIP(local->sin_addr.s_addr) << "local port:" << ntohs(local->sin_port) << "remote ip:" << ValueToIP(remote->sin_addr.s_addr) << 
				"remote port:" << ntohs(remote->sin_port) << endl;



			CreateIoCompletionPort((HANDLE)lpIOContext->client,
				g_hIOCP, (u_long) 0, 0);

			IO_DATA* io_data = new IO_DATA;
			io_data->opCode = IO_READ;
			io_data->wsabuf.buf = io_data->buffer;
			io_data->wsabuf.len = io_data->BufferLen = 1024;
			io_data->client = lpIOContext->client;
			//{WSAID_ACCEPTEX, WSAID_GETACCEPTEXSOCKADDRS}
			ZeroMemory(&io_data->Overlapped, sizeof(io_data->Overlapped));
			if (WSARecv(io_data->client, &io_data->wsabuf, 1, (DWORD*)&nBytes, &dwFlags, &io_data->Overlapped, NULL) == SOCKET_ERROR) { 
   
				if (WSAGetLastError() == WSA_IO_PENDING) { 
   
					cout << "WSARecv Pending..." << endl;
				}
			}

			lpIOContext->client = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
			lpIOContext->opCode = ACCEPT;
			lpIOContext->wsabuf.buf = lpIOContext->buffer;
			lpIOContext->wsabuf.len = lpIOContext->BufferLen = 1024;
			lpIOContext->nBytes = 0;
			memset(&lpIOContext->Overlapped, 0, sizeof(lpIOContext->Overlapped));
			while ( lpfnAcceptEx(lpIOContext->server, lpIOContext->client, lpIOContext->wsabuf.buf, 0,
				sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, 
				&nBytes, &lpIOContext->Overlapped) == FALSE && WSAGetLastError() != ERROR_IO_PENDING) { 
   
				cout << "lpfnAcceptEx 失败 继续accept" << endl;
			}

		} else { 
   
			cout << "没有匹配的类型" << endl;
		}
    }
    return 0;
}

unsigned int _stdcall TestThread(LPVOID arg) { 
   
	printf("test thread\n");
	for (int i = 0; i < 10; i++) { 
   
		printf("run: %d .....\n", i);
		Sleep(1000);
	}
	return 0;
}

int main ()
{ 
   
    WSADATA wsaData;
    WSAStartup(MAKEWORD(2,2), &wsaData);

    SOCKET    m_socket = WSASocket(AF_INET,SOCK_STREAM, IPPROTO_TCP, NULL,0,WSA_FLAG_OVERLAPPED);

	SYSTEM_INFO sysInfo;
    GetSystemInfo(&sysInfo);
    int g_ThreadCount = sysInfo.dwNumberOfProcessors * 2;
	g_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0, g_ThreadCount);
	CreateIoCompletionPort((HANDLE) m_socket, g_hIOCP, (u_long) 0, 0);

    sockaddr_in server;
    server.sin_family = AF_INET;
    server.sin_port = htons(6000);
    server.sin_addr.S_un.S_addr = htonl(INADDR_ANY);

    bind(m_socket ,(sockaddr*)&server,sizeof(server));

    listen(m_socket, 8);

    //CreateIoCompletionPort((HANDLE)m_socket,g_hIOCP,0,0);

    for( int i=0;i < 1; ++i){ 
   
        HANDLE  hThread;
        hThread = (HANDLE)_beginthreadex(NULL, 0, WorkerThread, 0, 0, NULL);
        CloseHandle(hThread);
    }

	//-------------------------------------------------------------------------------------
	DWORD dwBytes = 0;
	IO_DATA* per_io_data = new IO_DATA;
	per_io_data->wsabuf.buf = per_io_data->buffer;
	per_io_data->wsabuf.len = per_io_data->BufferLen = 1024;
	per_io_data->opCode = ACCEPT;
	memset(&per_io_data->Overlapped, 0, sizeof(per_io_data->Overlapped));
    int iResult = WSAIoctl(m_socket, SIO_GET_EXTENSION_FUNCTION_POINTER,
             &GuidAcceptEx, sizeof (GuidAcceptEx), 
             &lpfnAcceptEx, sizeof (lpfnAcceptEx), 
             &dwBytes, NULL, NULL);

    if (iResult == SOCKET_ERROR) { 
   
		wprintf(L"WSAIoctl failed with error: %u iResult:%d\n", WSAGetLastError(), iResult);
		getchar();
        closesocket(m_socket);
        WSACleanup();
        return 1;
    }


	if (SOCKET_ERROR == WSAIoctl(m_socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidGetAcceptExSockAddrs,
		sizeof(GuidGetAcceptExSockAddrs), &lpfnGetAcceptExSockAddrs, sizeof(lpfnGetAcceptExSockAddrs),
		&dwBytes, NULL, NULL))
	{ 
   
		cerr << "WSAIoctl failed with error code: " << WSAGetLastError() << endl;
		getchar();
		if (INVALID_SOCKET != m_socket)
		{ 
   
			closesocket(m_socket);
			m_socket = INVALID_SOCKET;
		}
		//goto EXIT_CODE;
		return -1;
	}



	per_io_data->client = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
	if (per_io_data->client == INVALID_SOCKET) { 
   
		wprintf(L"Create accept socket failed with error: %u\n", WSAGetLastError());
		getchar();
		closesocket(m_socket);
		WSACleanup();
		return 1;
	}

	per_io_data->server = m_socket;
	//bool bRetVal = lpfnAcceptEx(m_socket, per_io_data->client, per_io_data->wsabuf.buf,
	// per_io_data->wsabuf.len - ((sizeof (SOCKADDR_IN) + 16) * 2),
	// sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, 
	// &dwBytes, &per_io_data->Overlapped);

	bool bRetVal = lpfnAcceptEx(per_io_data->server, per_io_data->client, per_io_data->wsabuf.buf,
		0,
		sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, 
		&dwBytes, &per_io_data->Overlapped);
	if (bRetVal == FALSE && WSAGetLastError() != ERROR_IO_PENDING) { 
   
		printf("AcceptEx failed with error: %u\n", WSAGetLastError());
		getchar();
		closesocket(per_io_data->client);
		closesocket(m_socket);
		WSACleanup();
		return 1;
	}
//*****************************************************************************
	m_socket = WSASocket(AF_INET,SOCK_STREAM, IPPROTO_TCP, NULL,0,WSA_FLAG_OVERLAPPED);
	CreateIoCompletionPort((HANDLE) m_socket, g_hIOCP, (u_long) 0, 0);
	server.sin_family = AF_INET;
	server.sin_port = htons(6020);
	server.sin_addr.S_un.S_addr = htonl(INADDR_ANY);

	bind(m_socket ,(sockaddr*)&server,sizeof(server));
	listen(m_socket, 8);

	IO_DATA* per_io_data2 = new IO_DATA;
	per_io_data2->wsabuf.buf = per_io_data2->buffer;
	per_io_data2->wsabuf.len = per_io_data2->BufferLen = 1024;
	per_io_data2->opCode = ACCEPT;
	memset(&per_io_data2->Overlapped, 0, sizeof(per_io_data2->Overlapped));
	per_io_data2->client = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
	per_io_data2->server = m_socket;

	//int iResult = WSAIoctl(m_socket, SIO_GET_EXTENSION_FUNCTION_POINTER,
	// &GuidAcceptEx, sizeof (GuidAcceptEx), 
	// &lpfnAcceptEx, sizeof (lpfnAcceptEx), 
	// &dwBytes, NULL, NULL);

	//if (iResult == SOCKET_ERROR) { 
   
	// wprintf(L"WSAIoctl failed with error: %u iResult:%d\n", WSAGetLastError(), iResult);
	// getchar();
	// closesocket(m_socket);
	// WSACleanup();
	// return 1;
	//}


	//if (SOCKET_ERROR == WSAIoctl(m_socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidGetAcceptExSockAddrs,
	// sizeof(GuidGetAcceptExSockAddrs), &lpfnGetAcceptExSockAddrs, sizeof(lpfnGetAcceptExSockAddrs),
	// &dwBytes, NULL, NULL))
	//{ 
   
	// cerr << "WSAIoctl failed with error code: " << WSAGetLastError() << endl;
	// getchar();
	// if (INVALID_SOCKET != m_socket)
	// { 
   
	// closesocket(m_socket);
	// m_socket = INVALID_SOCKET;
	// }
	// //goto EXIT_CODE;
	// return -1;
	//}

	bRetVal = lpfnAcceptEx(per_io_data2->server, per_io_data2->client, per_io_data2->wsabuf.buf,
		0,
		sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, 
		&dwBytes, &per_io_data2->Overlapped);

	if (bRetVal == FALSE && WSAGetLastError() != ERROR_IO_PENDING) { 
   
		printf("AcceptEx failed with error2222: %u\n", WSAGetLastError());
		getchar();
		closesocket(per_io_data2->client);
		closesocket(m_socket);
		WSACleanup();
		return 1;
	}

	//---------------------------------------------------------------------------------------
	//SOCKET client = accept( m_socket, NULL, NULL );
	//cout << "Client connected." << endl;


	//if (CreateIoCompletionPort((HANDLE)client, g_hIOCP, 0, 0) == NULL){ 
   
	// cout << "Binding Client Socket to IO Completion Port Failed::Reason Code::"<< GetLastError() << endl;
	// closesocket(client);
	//}
	//else { //post a recv request
	// IO_DATA * data = new IO_DATA;
	// memset(buffer, NULL ,1024);
	// memset(&data->Overlapped, 0 , sizeof(data->Overlapped));
	// data->opCode = IO_READ;
	// data->nBytes = 0;
	// data->wsabuf.buf = buffer;
	// data->wsabuf.len = sizeof(buffer);
	// data->client = client;
	// DWORD nBytes= 1024 ,dwFlags=0;
	// int nRet = WSARecv(client,&data->wsabuf, 1, &nBytes,
	// &dwFlags,
	// &data->Overlapped, NULL);
	// if(nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())){ 
   
	// cout << "WASRecv Failed::Reason Code::"<< WSAGetLastError() << endl;
	// closesocket(client);
	// delete data;
	// }
	// cout<<data->wsabuf.buf<<endl;
	//}

	HANDLE m_oHandle = CreateEvent(NULL, true, false, NULL);
	WaitForSingleObject(m_oHandle, INFINITE);
    closesocket(m_socket);
    WSACleanup();
	return 0;
}

代码部分疑惑说明
说明:①WSAAcceptEx函数作用是投递accept操作到完成端口内核,只有该函数可以完成此功能

②WSAAcceptEx是扩展函数,本身没有定义在winsock2中,所以需要类似动态加载的方式获取函数指针,Windows提供了函数WSAIoctl和参数SIO_GET_EXTENSION_FUNCTION_POINTER来获取函数指针,宏WSAID_ACCEPTEX是该函数对应的GUID号,完成端口的其他几个功能函数也是用这种方式取得,WSAloctl的第一个参数是一个有效socket,不是特定socket。

③acceptex 参数说明:1)监听socket;2)预先定义的连接socket,等同于其他模式accept函数返回的socket,但是这里的socket是提前申请好的,所以在高并发连接时不会有新建socket的开销,提供了性能;3)该参数为0,表示连接即返回,不为0时传入一个缓存,连接并收到第一份数据时返回,缓存内是收到的数据,高并发时设置缓存,可以等到真实数据发送过来时返回,降低完成端口处理的并发数;4)缓存长度,因为收到数据时,会把远端地址信息和本地地址信息写入缓存,所以长度要减去两个地址信息的长度,也即参数5)+6)的和;5)和6):远端地址、本地地址信息长度,16是预留的16个字节,目前内核没有用到,但是空间要保留,所以必须加16;7)实际接收的数据,没用;8)很关键,后文详细说明(见注①)。

6)此时监听socket的accept操作就被投递给了完成端口内核,接下来就是不断询问完成端口是否有操作抵达

参考
https://blog.csdn.net/blwinner/article/details/52882954

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/187868.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • mysql 错误10038_如何解决MySql10038错误

    第一种方法:第一步:先看报错窗口2003can’tconnecttoMySQLserveron’127.0.0.1′(10038).第二步:原因是:远程3306端口未对外开放操作。第三步:首先远程连接服务器,点击”开始”–>”管理工具”–>”高级安全Windows防火墙”。第四步:在打开的窗口中,左边选中”入站规则”,右边点击”新建规则”来建立一个入站规则。第五步:…

  • method什么意思_method的值有哪些

    method什么意思_method的值有哪些这个报错只有IDEA会有问题,在eclipse并不会出现这个问题。有哪位前辈可以帮我指点迷津吗?publicclassServletDemo02extendsHttpServlet{privatestaticfinallongserialVersionUID=1L;publicvoiddoPost(HttpServletRequestrequest,HttpServl…

  • fflush与fsync

    fflush与fsync区别:1、头文件不同fflush包含在头文件中;fsync包含在头文件中;2、参数不同fflush函数原型是:intfflush(FILE*fp);   即,fflush的参数时文件指针。fsync函数原型是:intfsync(intfd);   即,fsync的参数时文件描述符。2、应用层次不同fflush函数应用于用户层,将C语言函数库中的函数提

  • 数据库防注入_Spring中依赖注入的四种方式

    数据库防注入_Spring中依赖注入的四种方式方法一:摘自https://www.cnblogs.com/yuanchaoyong/p/7243492.htmlFilter拦截包装request->创建过滤器->添加过滤器通过扩展HttpServletRequestWrapper,对HttpServletRequest进行二次包装,覆盖其publicString[]getParameterValues(String…

    2022年10月27日
  • S3C2440移植uboot之支持NANDFLASH操作

    S3C2440移植uboot之支持NANDFLASH操作上一节我们移植了uboot,S3C2440移植uboot之支持NORFLASH。这节我们继续移植,支持NANDFLASH。之前由于nand部分报错,直接注释了 u-boot-2012.04.01\include\configs\smdk2440.h中的#defineCONFIG_CMD_NAND。现在我们去掉注释,重新编译。报错如下是我们没有定义CONFIG_S3C2410导致的可以…

  • 如何自动发送短信给女朋友

    如何自动发送短信给女朋友

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号