IOCP使用acceptEX进行异步接收

IOCP使用acceptEX进行异步接收示例代码#include<winsock2.h>#include<windows.h>#include<string>#include<iostream>#include<process.h>#include<ws2tcpip.h>#include<mswsock.h>usingnamespacestd;//#pragmacomment(lib,”MSWSOCK.lib”)#pragm

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

示例代码为什么要调用WSAIoctl ()函数

#include <winsock2.h>
#include <windows.h>
#include <string>
#include <iostream>
#include<process.h>
#include <ws2tcpip.h>
#include <mswsock.h>
using namespace std;
//#pragma comment(lib,"MSWSOCK.lib")
#pragma comment(lib,"ws2_32.lib")
#pragma comment(lib,"kernel32.lib")
HANDLE g_hIOCP;
enum IO_OPERATION{ 

IO_READ,
IO_WRITE,
ACCEPT
};
class IO_DATA{ 

public:
OVERLAPPED                  Overlapped;
WSABUF                      wsabuf;
int                         nBytes;
IO_OPERATION                opCode;
char						buffer[1024];
int							BufferLen;
SOCKET                      client;
SOCKET						server;
};
LPFN_ACCEPTEX lpfnAcceptEx = NULL;//AcceptEx函数指针
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockAddrs = NULL;
char buffer[1024];
string ValueToIP(const int nValue)
{ 
 
char strTemp[20];
sprintf( strTemp,"%d.%d.%d.%d",
(nValue&0xff000000)>>24,
(nValue&0x00ff0000)>>16,
(nValue&0x0000ff00)>>8,
(nValue&0x000000ff) );
return string(strTemp);
}
unsigned int _stdcall WorkerThread (LPVOID WorkThreadContext) { 

IO_DATA *lpIOContext = NULL; 
DWORD nBytes = 0;
DWORD dwFlags = 0; 
int nRet = 0;
DWORD dwIoSize = 0; 
void * lpCompletionKey = NULL;
LPOVERLAPPED lpOverlapped = NULL;
while(1){ 

bool ret = GetQueuedCompletionStatus(g_hIOCP, &dwIoSize,(LPDWORD)&lpCompletionKey,(LPOVERLAPPED *)&lpOverlapped, INFINITE);
if ( !ret ) { 

if ( GetLastError() == WAIT_TIMEOUT ) { 

continue;
}
cout << "ret:" << ret << endl;
}
lpIOContext = (IO_DATA *)lpOverlapped;
if(lpIOContext->opCode == IO_READ) // a read operation complete
{ 

cout << "io_read...." << lpIOContext->wsabuf.buf << "end" << endl;
ZeroMemory(&lpIOContext->Overlapped, sizeof(lpIOContext->Overlapped));
strcpy(buffer, "ok.....\n");
lpIOContext->wsabuf.buf = buffer;
lpIOContext->wsabuf.len = strlen(lpIOContext->wsabuf.buf);
lpIOContext->opCode = IO_WRITE;
lpIOContext->nBytes = strlen(buffer);
dwFlags = 0;
nBytes = 0;
nRet = WSASend(
lpIOContext->client,
&lpIOContext->wsabuf, 1, &nBytes,
dwFlags,
&(lpIOContext->Overlapped), NULL);
if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) { 

cout << "WASSend Failed::Reason Code::"<< WSAGetLastError() << endl;
closesocket(lpIOContext->client);
delete lpIOContext;
continue;
}
memset(buffer, NULL, sizeof(buffer));
}
else if(lpIOContext->opCode == IO_WRITE) //a write operation complete
{ 

cout << "io_write...." << endl;
// Write operation completed, so post Read operation.
lpIOContext->opCode = IO_READ; 
nBytes = 1024;
dwFlags = 0;
lpIOContext->wsabuf.buf = buffer;
lpIOContext->wsabuf.len = nBytes;
lpIOContext->nBytes = nBytes;
ZeroMemory(&lpIOContext->Overlapped, sizeof(lpIOContext->Overlapped));
nRet = WSARecv(
lpIOContext->client,
&lpIOContext->wsabuf, 1, &nBytes,
&dwFlags,
&lpIOContext->Overlapped, NULL);
if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) { 

cout << "WASRecv Failed::Reason Code1::"<< WSAGetLastError() << endl;
closesocket(lpIOContext->client);
delete lpIOContext;
continue;
} 
cout<<lpIOContext->wsabuf.buf<<endl;
} else if (lpIOContext->opCode == ACCEPT) { 

//sockaddr* client_addr, *server_addr;
//GetAcceptExSockaddrs(lpIOContext->wsabuf.buf, 0, sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &client_addr, (LPINT)sizeof(sockaddr_in), &server_addr, (LPINT)sizeof(sockaddr_in));
//cout << "接收到一个连接 ip:" << ntohs(((sockaddr_in*)client_addr)->sin_port) << " port:" << endl;
SOCKADDR_IN* remote = NULL;
SOCKADDR_IN* local = NULL;
int remoteLen = sizeof(SOCKADDR_IN);
int localLen = sizeof(SOCKADDR_IN);
lpfnGetAcceptExSockAddrs(lpIOContext->wsabuf.buf, 0,
sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, (LPSOCKADDR*)&local, &localLen, (LPSOCKADDR*)&remote, &remoteLen);
cout << "接收到一个连接 local ip:" << ValueToIP(local->sin_addr.s_addr) << "local port:" << ntohs(local->sin_port) << "remote ip:" << ValueToIP(remote->sin_addr.s_addr) << 
"remote port:" << ntohs(remote->sin_port) << endl;
CreateIoCompletionPort((HANDLE)lpIOContext->client,
g_hIOCP, (u_long) 0, 0);
IO_DATA* io_data = new IO_DATA;
io_data->opCode = IO_READ;
io_data->wsabuf.buf = io_data->buffer;
io_data->wsabuf.len = io_data->BufferLen = 1024;
io_data->client = lpIOContext->client;
//{WSAID_ACCEPTEX, WSAID_GETACCEPTEXSOCKADDRS}
ZeroMemory(&io_data->Overlapped, sizeof(io_data->Overlapped));
if (WSARecv(io_data->client, &io_data->wsabuf, 1, (DWORD*)&nBytes, &dwFlags, &io_data->Overlapped, NULL) == SOCKET_ERROR) { 

if (WSAGetLastError() == WSA_IO_PENDING) { 

cout << "WSARecv Pending..." << endl;
}
}
lpIOContext->client = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
lpIOContext->opCode = ACCEPT;
lpIOContext->wsabuf.buf = lpIOContext->buffer;
lpIOContext->wsabuf.len = lpIOContext->BufferLen = 1024;
lpIOContext->nBytes = 0;
memset(&lpIOContext->Overlapped, 0, sizeof(lpIOContext->Overlapped));
while ( lpfnAcceptEx(lpIOContext->server, lpIOContext->client, lpIOContext->wsabuf.buf, 0,
sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, 
&nBytes, &lpIOContext->Overlapped) == FALSE && WSAGetLastError() != ERROR_IO_PENDING) { 

cout << "lpfnAcceptEx 失败 继续accept" << endl;
}
} else { 

cout << "没有匹配的类型" << endl;
}
}
return 0;
}
unsigned int _stdcall TestThread(LPVOID arg) { 

printf("test thread\n");
for (int i = 0; i < 10; i++) { 

printf("run: %d .....\n", i);
Sleep(1000);
}
return 0;
}
int main ()
{ 

WSADATA wsaData;
WSAStartup(MAKEWORD(2,2), &wsaData);
SOCKET    m_socket = WSASocket(AF_INET,SOCK_STREAM, IPPROTO_TCP, NULL,0,WSA_FLAG_OVERLAPPED);
SYSTEM_INFO sysInfo;
GetSystemInfo(&sysInfo);
int g_ThreadCount = sysInfo.dwNumberOfProcessors * 2;
g_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0, g_ThreadCount);
CreateIoCompletionPort((HANDLE) m_socket, g_hIOCP, (u_long) 0, 0);
sockaddr_in server;
server.sin_family = AF_INET;
server.sin_port = htons(6000);
server.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
bind(m_socket ,(sockaddr*)&server,sizeof(server));
listen(m_socket, 8);
//CreateIoCompletionPort((HANDLE)m_socket,g_hIOCP,0,0);
for( int i=0;i < 1; ++i){ 

HANDLE  hThread;
hThread = (HANDLE)_beginthreadex(NULL, 0, WorkerThread, 0, 0, NULL);
CloseHandle(hThread);
}
//-------------------------------------------------------------------------------------
DWORD dwBytes = 0;
IO_DATA* per_io_data = new IO_DATA;
per_io_data->wsabuf.buf = per_io_data->buffer;
per_io_data->wsabuf.len = per_io_data->BufferLen = 1024;
per_io_data->opCode = ACCEPT;
memset(&per_io_data->Overlapped, 0, sizeof(per_io_data->Overlapped));
int iResult = WSAIoctl(m_socket, SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx, sizeof (GuidAcceptEx), 
&lpfnAcceptEx, sizeof (lpfnAcceptEx), 
&dwBytes, NULL, NULL);
if (iResult == SOCKET_ERROR) { 

wprintf(L"WSAIoctl failed with error: %u iResult:%d\n", WSAGetLastError(), iResult);
getchar();
closesocket(m_socket);
WSACleanup();
return 1;
}
if (SOCKET_ERROR == WSAIoctl(m_socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidGetAcceptExSockAddrs,
sizeof(GuidGetAcceptExSockAddrs), &lpfnGetAcceptExSockAddrs, sizeof(lpfnGetAcceptExSockAddrs),
&dwBytes, NULL, NULL))
{ 

cerr << "WSAIoctl failed with error code: " << WSAGetLastError() << endl;
getchar();
if (INVALID_SOCKET != m_socket)
{ 

closesocket(m_socket);
m_socket = INVALID_SOCKET;
}
//goto EXIT_CODE;
return -1;
}
per_io_data->client = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (per_io_data->client == INVALID_SOCKET) { 

wprintf(L"Create accept socket failed with error: %u\n", WSAGetLastError());
getchar();
closesocket(m_socket);
WSACleanup();
return 1;
}
per_io_data->server = m_socket;
//bool bRetVal = lpfnAcceptEx(m_socket, per_io_data->client, per_io_data->wsabuf.buf,
// per_io_data->wsabuf.len - ((sizeof (SOCKADDR_IN) + 16) * 2),
// sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, 
// &dwBytes, &per_io_data->Overlapped);
bool bRetVal = lpfnAcceptEx(per_io_data->server, per_io_data->client, per_io_data->wsabuf.buf,
0,
sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, 
&dwBytes, &per_io_data->Overlapped);
if (bRetVal == FALSE && WSAGetLastError() != ERROR_IO_PENDING) { 

printf("AcceptEx failed with error: %u\n", WSAGetLastError());
getchar();
closesocket(per_io_data->client);
closesocket(m_socket);
WSACleanup();
return 1;
}
//*****************************************************************************
m_socket = WSASocket(AF_INET,SOCK_STREAM, IPPROTO_TCP, NULL,0,WSA_FLAG_OVERLAPPED);
CreateIoCompletionPort((HANDLE) m_socket, g_hIOCP, (u_long) 0, 0);
server.sin_family = AF_INET;
server.sin_port = htons(6020);
server.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
bind(m_socket ,(sockaddr*)&server,sizeof(server));
listen(m_socket, 8);
IO_DATA* per_io_data2 = new IO_DATA;
per_io_data2->wsabuf.buf = per_io_data2->buffer;
per_io_data2->wsabuf.len = per_io_data2->BufferLen = 1024;
per_io_data2->opCode = ACCEPT;
memset(&per_io_data2->Overlapped, 0, sizeof(per_io_data2->Overlapped));
per_io_data2->client = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
per_io_data2->server = m_socket;
//int iResult = WSAIoctl(m_socket, SIO_GET_EXTENSION_FUNCTION_POINTER,
// &GuidAcceptEx, sizeof (GuidAcceptEx), 
// &lpfnAcceptEx, sizeof (lpfnAcceptEx), 
// &dwBytes, NULL, NULL);
//if (iResult == SOCKET_ERROR) { 

// wprintf(L"WSAIoctl failed with error: %u iResult:%d\n", WSAGetLastError(), iResult);
// getchar();
// closesocket(m_socket);
// WSACleanup();
// return 1;
//}
//if (SOCKET_ERROR == WSAIoctl(m_socket, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidGetAcceptExSockAddrs,
// sizeof(GuidGetAcceptExSockAddrs), &lpfnGetAcceptExSockAddrs, sizeof(lpfnGetAcceptExSockAddrs),
// &dwBytes, NULL, NULL))
//{ 

// cerr << "WSAIoctl failed with error code: " << WSAGetLastError() << endl;
// getchar();
// if (INVALID_SOCKET != m_socket)
// { 

// closesocket(m_socket);
// m_socket = INVALID_SOCKET;
// }
// //goto EXIT_CODE;
// return -1;
//}
bRetVal = lpfnAcceptEx(per_io_data2->server, per_io_data2->client, per_io_data2->wsabuf.buf,
0,
sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, 
&dwBytes, &per_io_data2->Overlapped);
if (bRetVal == FALSE && WSAGetLastError() != ERROR_IO_PENDING) { 

printf("AcceptEx failed with error2222: %u\n", WSAGetLastError());
getchar();
closesocket(per_io_data2->client);
closesocket(m_socket);
WSACleanup();
return 1;
}
//---------------------------------------------------------------------------------------
//SOCKET client = accept( m_socket, NULL, NULL );
//cout << "Client connected." << endl;
//if (CreateIoCompletionPort((HANDLE)client, g_hIOCP, 0, 0) == NULL){ 

// cout << "Binding Client Socket to IO Completion Port Failed::Reason Code::"<< GetLastError() << endl;
// closesocket(client);
//}
//else { //post a recv request
// IO_DATA * data = new IO_DATA;
// memset(buffer, NULL ,1024);
// memset(&data->Overlapped, 0 , sizeof(data->Overlapped));
// data->opCode = IO_READ;
// data->nBytes = 0;
// data->wsabuf.buf = buffer;
// data->wsabuf.len = sizeof(buffer);
// data->client = client;
// DWORD nBytes= 1024 ,dwFlags=0;
// int nRet = WSARecv(client,&data->wsabuf, 1, &nBytes,
// &dwFlags,
// &data->Overlapped, NULL);
// if(nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())){ 

// cout << "WASRecv Failed::Reason Code::"<< WSAGetLastError() << endl;
// closesocket(client);
// delete data;
// }
// cout<<data->wsabuf.buf<<endl;
//}
HANDLE m_oHandle = CreateEvent(NULL, true, false, NULL);
WaitForSingleObject(m_oHandle, INFINITE);
closesocket(m_socket);
WSACleanup();
return 0;
}

代码部分疑惑说明
说明:①WSAAcceptEx函数作用是投递accept操作到完成端口内核,只有该函数可以完成此功能

②WSAAcceptEx是扩展函数,本身没有定义在winsock2中,所以需要类似动态加载的方式获取函数指针,Windows提供了函数WSAIoctl和参数SIO_GET_EXTENSION_FUNCTION_POINTER来获取函数指针,宏WSAID_ACCEPTEX是该函数对应的GUID号,完成端口的其他几个功能函数也是用这种方式取得,WSAloctl的第一个参数是一个有效socket,不是特定socket。

③acceptex 参数说明:1)监听socket;2)预先定义的连接socket,等同于其他模式accept函数返回的socket,但是这里的socket是提前申请好的,所以在高并发连接时不会有新建socket的开销,提供了性能;3)该参数为0,表示连接即返回,不为0时传入一个缓存,连接并收到第一份数据时返回,缓存内是收到的数据,高并发时设置缓存,可以等到真实数据发送过来时返回,降低完成端口处理的并发数;4)缓存长度,因为收到数据时,会把远端地址信息和本地地址信息写入缓存,所以长度要减去两个地址信息的长度,也即参数5)+6)的和;5)和6):远端地址、本地地址信息长度,16是预留的16个字节,目前内核没有用到,但是空间要保留,所以必须加16;7)实际接收的数据,没用;8)很关键,后文详细说明(见注①)。

6)此时监听socket的accept操作就被投递给了完成端口内核,接下来就是不断询问完成端口是否有操作抵达

参考
https://blog.csdn.net/blwinner/article/details/52882954

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/187868.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • JMH基准测试

    JMH基准测试一、基准测试 基准测试是什么 基准测试是指通过设计科学的测试方法、测试工具和测试系统,实现对一类测试对象的某项性能指标进行定量的和可对比的测试。 例如,对计算机CPU进行浮点运算、数据访问的带宽和延迟等指标的基准测试,可以使用户清楚地了解每一款CPU的运算性能及作业吞吐能力是否满足应用程序的要求 再如对数据库管理系统的ACID(Atomicity,Consistency,Isolation,Durability,原子性、一致性、独立性和持久性)、查询时

  • TypeLoadException: 找不到 Windows 运行时类型“Windows.UI.Xaml.Controls.Binding

    TypeLoadException: 找不到 Windows 运行时类型“Windows.UI.Xaml.Controls.Binding

  • IT人力外包越来越流行的原因剖析[通俗易懂]

    IT人力外包越来越流行的原因剖析[通俗易懂]近年来,互联网快速深入我们生活工作的每个角落,it人才成为各大企业争先抢夺的香饽饽,而通过it人力外包引进互联网软件人才已经成为一种趋势,那么越来越多的企业选择与it人力外包公司合作的原因是什么呢?首先选择与it人力外包公司合作,用人单位不需要聘请专门人员或者成立专门部门对it外包人员进行人力资源管理,这些it外包人员的聘用、工资、奖金的发放、社会保险等都是由it人力外包公司负责完成,这使得企业有更多的精力来经营其他业务。其次,it人力外包的用人方式非常灵活,可以化解人员编制限制与业务快速.

  • Linux之traceroute命令[通俗易懂]

    Linux之traceroute命令[通俗易懂]显示数据包到主机间的路径,traceroute命令用于追踪数据包在网络上的传输时的全部路径,它默认发送的数据包大小是40字节。通过traceroute我们可以知道信息从你的计算机到互联网另一端的主机是走的什么路径。当然每次数据包由某一同样的出发点(source)到达某一同样的目的地(destination)走的路径可能会不一样,但基本上来说大部分时候所走的路由是相同的。traceroute通过发送小的数据包到目的设备直到其返回,来测量其需要多长时间。一条路径上的每个设备traceroute要测.

  • FindWindowEx()函数详解

    FindWindowEx()函数详解 函数功能:该函数获得一个窗口的句柄,该窗口的类名和窗口名与给定的字符串相匹配。这个函数查找子窗口,从排在给定的子窗口后面的下一个子窗口开始。在查找时不区分大小写。    函数原型:HWNDFindWindowEx(HWNDhwndParent,HWNDhwndChildAfter,LPCTSTRlpszClass,LPCTSTRlpszWindow);    参数;    hwnd

  • 大疆网上测评题库_【大疆待定面试】发了在线测评,感觉题目挺新的。-看准网…

    大疆网上测评题库_【大疆待定面试】发了在线测评,感觉题目挺新的。-看准网…写面经,攒人品。大疆服务运营培训生。1.大疆网上笔试题(比较独创,很有趣,也有歇跟大疆相关的题,要比较熟悉大疆),笔试过后,有岗位笔试作业。2.大疆服务运营培训生笔试作业题目。三道大题,开放性题目,专业和岗位相关,涉及报告类题目。规定期限内提交,审核,通过后进入面试环节。3.一面,微信视频面试。提前约定时间,到点准时打来,直奔主题。自我介绍,针对个人经历开始提问,最后会用英文简单问答一下看英语能力…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号