IO 模型_netty reactor模型

IO 模型_netty reactor模型//IOCP2.cpp:Definestheentrypointfortheconsoleapplication.//#include”stdafx.h”#include<WinSock2.h>#include<MSWSock.h>#include<Windows.h>#include&lt…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

// IOCP2.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include <WinSock2.h>
#include <MSWSock.h>
#include <Windows.h>
#include <process.h>
#pragma comment(lib, "WS2_32.lib")

#define MAX_BUFFER    256
#define MAX_TIMEOUT 1000
#define MAX_SOCKET  1024
#define MAX_THREAD    64
#define MAX_ACCEPT  5

typedef enum _OPERATION_INFO_
{
    OP_NULL,
    OP_ACCEPT,
    OP_READ,
    OP_WRITE
}OPERATIONINFO;

typedef struct _PER_HANDLE_DATA_
{
public:
    _PER_HANDLE_DATA_()
    {
        clean();
    }
    ~_PER_HANDLE_DATA_()
    {
        clean();
    }
protected:
    void clean()
    {
        sock = INVALID_SOCKET;
        memset(&addr, 0, sizeof(addr));
        addr.sin_addr.S_un.S_addr = INADDR_ANY;
        addr.sin_port = htons(0);
        addr.sin_family = AF_INET;
    }
public:    
    SOCKET sock;
    SOCKADDR_IN addr;
}PERHANDLEDATA, *PPERHANDLEDATA;

typedef struct _PER_IO_DTATA_
{
public: 
    _PER_IO_DTATA_()
    {
        clean();
    }
    ~_PER_IO_DTATA_()
    {
        clean();
    }
    void clean()
    {
        ZeroMemory(&ol, sizeof(ol));
        memset(buf, 0, sizeof(buf));
        sAccept = INVALID_SOCKET;
        sListen = INVALID_SOCKET;
        wsaBuf.buf = buf;
        wsaBuf.len = MAX_BUFFER;
        opType =  OP_NULL;
    }
public:
    WSAOVERLAPPED ol;
    SOCKET sAccept; // Only valid with AcceptEx
    SOCKET sListen; // Only valid with AcceptEx
    WSABUF wsaBuf;
    char buf[MAX_BUFFER];
    OPERATIONINFO opType;
}PERIODATA, *PPERIODATA;

HANDLE hThread[MAX_THREAD] = {
   
   0};
PERIODATA* pAcceptData[MAX_ACCEPT] = {
   
   0};
int g_nThread = 0;
BOOL g_bExitThread = FALSE;
LPFN_ACCEPTEX lpfnAcceptEx = NULL;
LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockAddrs = NULL;
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;

unsigned __stdcall ThreadProc(LPVOID lParam);
BOOL PostAccept(PERIODATA* pIoData);

int _tmain(int argc, _TCHAR* argv[])
{
    WORD wVersionRequested = MAKEWORD(2, 2);
    WSADATA wsaData;
    if(0 != WSAStartup(wVersionRequested, &wsaData))
    {
        printf("WSAStartup failed with error code: %d/n", GetLastError());
        return EXIT_FAILURE;
    }

    if(2 != HIBYTE(wsaData.wVersion) || 2 != LOBYTE(wsaData.wVersion))
    {
        printf("Socket version not supported./n");
        WSACleanup();
        return EXIT_FAILURE;
    }

    // Create IOCP
    HANDLE hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 0);
    if(NULL == hIOCP)
    {
        printf("CreateIoCompletionPort failed with error code: %d/n", WSAGetLastError());
        WSACleanup();
        return EXIT_FAILURE;
    }

    // Create worker thread
    SYSTEM_INFO si = {
   
   0};
    GetSystemInfo(&si);
    for(int i = 0; i < (int)si.dwNumberOfProcessors+2; i++)
    {
        hThread[g_nThread] = (HANDLE)_beginthreadex(NULL, 0, ThreadProc, (LPVOID)hIOCP, 0, NULL);
        if(NULL == hThread[g_nThread])
        {
            printf("_beginthreadex failed with error code: %d/n", GetLastError());
            continue;
        }
        ++g_nThread;

        if(g_nThread > MAX_THREAD)
        {
            break;
        }
    }

    // Create listen SOCKET
    SOCKET sListen = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
    if(INVALID_SOCKET == sListen)
    {
        printf("WSASocket failed with error code: %d/n", WSAGetLastError());
        goto EXIT_CODE;
    }

    // Associate SOCKET with IOCP
    if(NULL == CreateIoCompletionPort((HANDLE)sListen, hIOCP, NULL, 0))
    {
        printf("CreateIoCompletionPort failed with error code: %d/n", WSAGetLastError());
        if(INVALID_SOCKET != sListen)
        {
            closesocket(sListen);
            sListen = INVALID_SOCKET;
        }
        goto EXIT_CODE;
    }

    // Bind SOCKET
    SOCKADDR_IN addr;
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_addr.S_un.S_addr = inet_addr("127.0.0.1");
    addr.sin_port = htons(5050);
    if(SOCKET_ERROR == bind(sListen, (LPSOCKADDR)&addr, sizeof(addr)))
    {
        printf("bind failed with error code: %d/n", WSAGetLastError());
        if(INVALID_SOCKET != sListen)
        {
            closesocket(sListen);
            sListen = INVALID_SOCKET;
        }
        goto EXIT_CODE;
    }

    // Start Listen
    if(SOCKET_ERROR == listen(sListen, 200))
    {
        printf("listen failed with error code: %d/n", WSAGetLastError());
        if(INVALID_SOCKET != sListen)
        {
            closesocket(sListen);
            sListen = INVALID_SOCKET;
        }
        goto EXIT_CODE;
    }

    printf("Server start, wait for client to connect .../n");

    DWORD dwBytes = 0;
    if(SOCKET_ERROR == WSAIoctl(sListen, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptEx, sizeof(GuidAcceptEx), &lpfnAcceptEx,
        sizeof(lpfnAcceptEx), &dwBytes, NULL, NULL))
    {
        printf("WSAIoctl failed with error code: %d/n", WSAGetLastError());
        if(INVALID_SOCKET != sListen)
        {
            closesocket(sListen);
            sListen = INVALID_SOCKET;
        }
        goto EXIT_CODE;
    }

    if(SOCKET_ERROR == WSAIoctl(sListen, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidGetAcceptExSockAddrs, 
        sizeof(GuidGetAcceptExSockAddrs), &lpfnGetAcceptExSockAddrs, sizeof(lpfnGetAcceptExSockAddrs), 
        &dwBytes, NULL, NULL))
    {
        printf("WSAIoctl failed with error code: %d/n", WSAGetLastError());
        if(INVALID_SOCKET != sListen)
        {
            closesocket(sListen);
            sListen = INVALID_SOCKET;
        }
        goto EXIT_CODE;
    }

    // Post MAX_ACCEPT accept
    for(int i=0; i<MAX_ACCEPT; i++)
    {
        pAcceptData[i] = new PERIODATA;
        pAcceptData[i]->sListen = sListen;
        PostAccept(pAcceptData[i]);
    }
    // After 1 hour later, Server shutdown.
    Sleep(1000 * 60 *60);

EXIT_CODE:
    g_bExitThread = TRUE;

    PostQueuedCompletionStatus(hIOCP, 0, NULL, NULL);
    WaitForMultipleObjects(g_nThread, hThread, TRUE, INFINITE);
    for(int i = 0; i < g_nThread; i++)
    {
        CloseHandle(hThread[g_nThread]);
    }

    for(int i=0; i<MAX_ACCEPT; i++)
    {
        if(pAcceptData[i])
        {
            delete pAcceptData[i];
            pAcceptData[i] = NULL;
        }
    }

    if(INVALID_SOCKET != sListen)
    {
        closesocket(sListen);
        sListen = INVALID_SOCKET;
    }
    CloseHandle(hIOCP); // Close IOCP

    WSACleanup();
    return 0;
}

BOOL PostAccept(PERIODATA* pIoData)
{
    if(INVALID_SOCKET == pIoData->sListen)
    {
        return FALSE;
    }

    DWORD dwBytes = 0;
    pIoData->opType = OP_ACCEPT;
    pIoData->sAccept = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
    if(INVALID_SOCKET == pIoData->sAccept)
    {
        printf("WSASocket failed with error code: %d/n", WSAGetLastError());
        return FALSE;
    }

    if(FALSE == lpfnAcceptEx(pIoData->sListen, pIoData->sAccept, pIoData->wsaBuf.buf, pIoData->wsaBuf.len - ((sizeof(SOCKADDR_IN)+16)*2), 
        sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, &dwBytes, &(pIoData->ol)))
    {
        if(WSA_IO_PENDING != WSAGetLastError())
        {
            printf("lpfnAcceptEx failed with error code: %d/n", WSAGetLastError());

            return FALSE;
        }
    }
    return TRUE;
}

unsigned __stdcall ThreadProc(LPVOID lParam)
{
    HANDLE hIOCP = (HANDLE)lParam;

    PERHANDLEDATA* pPerHandleData = NULL;
    PERIODATA* pPerIoData = NULL;
    WSAOVERLAPPED* lpOverlapped = NULL;
    DWORD dwTrans = 0;
    DWORD dwFlags = 0;
    while(!g_bExitThread)
    {
        BOOL bRet = GetQueuedCompletionStatus(hIOCP, &dwTrans, (PULONG_PTR)&pPerHandleData, &lpOverlapped, MAX_TIMEOUT);
        if(!bRet)
        {
            // Timeout and exit thread
            if(WAIT_TIMEOUT == GetLastError())
            {
                continue;
            }
            // Error
            printf("GetQueuedCompletionStatus failed with error: %d/n", GetLastError());
            continue;
        }
        else
        {
            pPerIoData = CONTAINING_RECORD(lpOverlapped, PERIODATA, ol);
            if(NULL == pPerIoData)
            {
                // Exit thread
                break;
            }

            if((0 == dwTrans) && (OP_READ == pPerIoData->opType || OP_WRITE == pPerIoData->opType))
            {
                // Client leave.
                printf("Client: <%s : %d> leave./n", inet_ntoa(pPerHandleData->addr.sin_addr), ntohs(pPerHandleData->addr.sin_port));
                closesocket(pPerHandleData->sock);
                delete pPerHandleData;
                delete pPerIoData;
                continue;
            }
            else
            {
                switch(pPerIoData->opType)
                {
                case OP_ACCEPT: // Accept
                    {    
                        SOCKADDR_IN* remote = NULL;
                        SOCKADDR_IN* local = NULL;
                        int remoteLen = sizeof(SOCKADDR_IN);
                        int localLen = sizeof(SOCKADDR_IN);
                        lpfnGetAcceptExSockAddrs(pPerIoData->wsaBuf.buf, pPerIoData->wsaBuf.len - ((sizeof(SOCKADDR_IN)+16)*2),
                            sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, (LPSOCKADDR*)&local, &localLen, (LPSOCKADDR*)&remote, &remoteLen);
                        printf("Client <%s : %d> come in./n", inet_ntoa(remote->sin_addr), ntohs(remote->sin_port));
                        printf("Recv Data: <%s : %d> %s./n", inet_ntoa(remote->sin_addr), ntohs(remote->sin_port), pPerIoData->wsaBuf.buf);

                        if(NULL != pPerHandleData)
                        {
                            delete pPerHandleData;
                            pPerHandleData = NULL;
                        }
                        pPerHandleData = new PERHANDLEDATA;
                        pPerHandleData->sock = pPerIoData->sAccept;

                        PERHANDLEDATA* pPerHandle = new PERHANDLEDATA;
                        pPerHandle->sock = pPerIoData->sAccept;
                        PERIODATA* pPerIo = new PERIODATA;
                        pPerIo->opType = OP_WRITE;
                        strcpy_s(pPerIo->buf, MAX_BUFFER, pPerIoData->buf);
                        DWORD dwTrans = strlen(pPerIo->buf);
                        memcpy(&(pPerHandleData->addr), remote, sizeof(SOCKADDR_IN));
                        // Associate with IOCP
                        if(NULL == CreateIoCompletionPort((HANDLE)(pPerHandleData->sock), hIOCP, (ULONG_PTR)pPerHandleData, 0))
                        {
                            printf("CreateIoCompletionPort failed with error code: %d/n", GetLastError());
                            closesocket(pPerHandleData->sock);
                            delete pPerHandleData;
                            continue;
                        }

                        // Post Accept
                        memset(&(pPerIoData->ol), 0, sizeof(pPerIoData->ol));
                        PostAccept(pPerIoData);

                        // Post Receive                        
                        DWORD dwFlags = 0;
                        if(SOCKET_ERROR == WSASend(pPerHandle->sock, &(pPerIo->wsaBuf), 1, 
                            &dwTrans, dwFlags, &(pPerIo->ol), NULL))
                        {
                            if(WSA_IO_PENDING != WSAGetLastError())
                            {
                                printf("WSASend failed with error code: %d/n", WSAGetLastError());
                                closesocket(pPerHandle->sock);
                                delete pPerHandle;
                                delete pPerIo;
                                continue;
                            }
                        }
                    }
                    break;

                case OP_READ: // Read
                    printf("recv client <%s : %d> data: %s/n", inet_ntoa(pPerHandleData->addr.sin_addr), ntohs(pPerHandleData->addr.sin_port), pPerIoData->buf);
                    pPerIoData->opType = OP_WRITE;
                    memset(&(pPerIoData->ol), 0, sizeof(pPerIoData->ol));
                    if(SOCKET_ERROR == WSASend(pPerHandleData->sock, &(pPerIoData->wsaBuf), 1, &dwTrans, dwFlags, &(pPerIoData->ol), NULL))
                    {
                        if(WSA_IO_PENDING != WSAGetLastError())
                        {
                            printf("WSASend failed with error code: %d./n", WSAGetLastError());
                            continue;
                        }
                    }
                    break;

                case OP_WRITE: // Write
                    {
                        pPerIoData->opType = OP_READ;
                        dwFlags = 0;
                        memset(&(pPerIoData->ol), 0, sizeof(pPerIoData->ol));
                        memset(pPerIoData->buf, 0, sizeof(pPerIoData->buf));
                        pPerIoData->wsaBuf.buf = pPerIoData->buf;
                        dwTrans = pPerIoData->wsaBuf.len = MAX_BUFFER;
                        if(SOCKET_ERROR == WSARecv(pPerHandleData->sock, &(pPerIoData->wsaBuf), 1, &dwTrans, &dwFlags, &(pPerIoData->ol), NULL))
                        {
                            if(WSA_IO_PENDING != WSAGetLastError())
                            {
                                printf("WSARecv failed with error code: %d./n", WSAGetLastError());
                                continue;
                            }
                        }
                    }
                    break;

                default:
                    break;
                }
            }
        }
    }
    return 0;
}

 

转载于:https://www.cnblogs.com/thbCode/p/4434848.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/188840.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号