(八)高性能服务器架构设计总结3——以flamigo服务器代码为例

2018-04-04 15:22:25 浏览数 (1)

再看filezilla,一款ftp工具的服务器端,它采用的是Windows的WSAAsyncSelect模型(代码下载地址:https://github.com/baloonwj/filezilla):

代码语言:javascript复制
//Processes event notifications sent by the sockets or the layers  
    static LRESULT CALLBACK WindowProc(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam)  
    {  
        if (message>=WM_SOCKETEX_NOTIFY)  
        {  
            //Verify parameters  
            ASSERT(hWnd);  
            CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);  
            ASSERT(pWnd);  
            if (!pWnd)  
                return 0;  
  
            if (message < static_cast<UINT>(WM_SOCKETEX_NOTIFY pWnd->m_nWindowDataSize)) //Index is within socket storage  
            {  
                //Lookup socket and verify if it's valid  
                CAsyncSocketEx *pSocket=pWnd->m_pAsyncSocketExWindowData[message - WM_SOCKETEX_NOTIFY].m_pSocket;  
                SOCKET hSocket = wParam;  
                if (!pSocket)  
                    return 0;  
                if (hSocket == INVALID_SOCKET)  
                    return 0;  
                if (pSocket->m_SocketData.hSocket != hSocket)  
                    return 0;  
  
                int nEvent = lParam & 0xFFFF;  
                int nErrorCode = lParam >> 16;  
  
                //Dispatch notification  
                if (!pSocket->m_pFirstLayer) {  
                    //Dispatch to CAsyncSocketEx instance  
                    switch (nEvent)  
                    {  
                    case FD_READ:  
#ifndef NOSOCKETSTATES  
                        if (pSocket->GetState() == connecting && !nErrorCode)  
                        {  
                            pSocket->m_nPendingEvents |= FD_READ;  
                            break;  
                        }  
                        else if (pSocket->GetState() == attached)  
                            pSocket->SetState(connected);  
                        if (pSocket->GetState() != connected)  
                            break;  
  
                        // Ignore further FD_READ events after FD_CLOSE has been received  
                        if (pSocket->m_SocketData.onCloseCalled)  
                            break;  
#endif //NOSOCKETSTATES  
  
#ifndef NOSOCKETSTATES  
                        if (nErrorCode)  
                            pSocket->SetState(aborted);  
#endif //NOSOCKETSTATES  
                        if (pSocket->m_lEvent & FD_READ) {  
                            pSocket->OnReceive(nErrorCode);  
                        }  
                        break;  
                    case FD_FORCEREAD: //Forceread does not check if there's data waiting  
#ifndef NOSOCKETSTATES  
                        if (pSocket->GetState() == connecting && !nErrorCode)  
                        {  
                            pSocket->m_nPendingEvents |= FD_FORCEREAD;  
                            break;  
                        }  
                        else if (pSocket->GetState() == attached)  
                            pSocket->SetState(connected);  
                        if (pSocket->GetState() != connected)  
                            break;  
#endif //NOSOCKETSTATES  
                        if (pSocket->m_lEvent & FD_READ)  
                        {  
#ifndef NOSOCKETSTATES  
                            if (nErrorCode)  
                                pSocket->SetState(aborted);  
#endif //NOSOCKETSTATES  
                            pSocket->OnReceive(nErrorCode);  
                        }  
                        break;  
                    case FD_WRITE:  
#ifndef NOSOCKETSTATES  
                        if (pSocket->GetState() == connecting && !nErrorCode)  
                        {  
                            pSocket->m_nPendingEvents |= FD_WRITE;  
                            break;  
                        }  
                        else if (pSocket->GetState() == attached && !nErrorCode)  
                            pSocket->SetState(connected);  
                        if (pSocket->GetState() != connected)  
                            break;  
#endif //NOSOCKETSTATES  
                        if (pSocket->m_lEvent & FD_WRITE)  
                        {  
#ifndef NOSOCKETSTATES  
                            if (nErrorCode)  
                                pSocket->SetState(aborted);  
#endif //NOSOCKETSTATES  
                            pSocket->OnSend(nErrorCode);  
                        }  
                        break;  
                    case FD_CONNECT:  
#ifndef NOSOCKETSTATES  
                        if (pSocket->GetState() == connecting)  
                        {  
                            if (nErrorCode && pSocket->m_SocketData.nextAddr)  
                            {  
                                if (pSocket->TryNextProtocol())  
                                    break;  
                            }  
                            pSocket->SetState(connected);  
                        }  
                        else if (pSocket->GetState() == attached && !nErrorCode)  
                            pSocket->SetState(connected);  
#endif //NOSOCKETSTATES  
                        if (pSocket->m_lEvent & FD_CONNECT)  
                            pSocket->OnConnect(nErrorCode);  
#ifndef NOSOCKETSTATES  
                        if (!nErrorCode)  
                        {  
                            if ((pSocket->m_nPendingEvents&FD_READ) && pSocket->GetState() == connected)  
                                pSocket->OnReceive(0);  
                            if ((pSocket->m_nPendingEvents&FD_FORCEREAD) && pSocket->GetState() == connected)  
                                pSocket->OnReceive(0);  
                            if ((pSocket->m_nPendingEvents&FD_WRITE) && pSocket->GetState() == connected)  
                                pSocket->OnSend(0);  
                        }  
                        pSocket->m_nPendingEvents = 0;  
#endif  
                        break;  
                    case FD_ACCEPT:  
#ifndef NOSOCKETSTATES  
                        if (pSocket->GetState() != listening && pSocket->GetState() != attached)  
                            break;  
#endif //NOSOCKETSTATES  
                        if (pSocket->m_lEvent & FD_ACCEPT)  
                            pSocket->OnAccept(nErrorCode);  
                        break;  
                    case FD_CLOSE:  
#ifndef NOSOCKETSTATES  
                        if (pSocket->GetState() != connected && pSocket->GetState() != attached)  
                            break;  
  
                        // If there are still bytes left to read, call OnReceive instead of  
                        // OnClose and trigger a new OnClose  
                        DWORD nBytes = 0;  
                        if (!nErrorCode && pSocket->IOCtl(FIONREAD, &nBytes))  
                        {  
                            if (nBytes > 0)  
                            {  
                                // Just repeat message.  
                                pSocket->ResendCloseNotify();  
                                pSocket->m_SocketData.onCloseCalled = true;  
                                pSocket->OnReceive(WSAESHUTDOWN);  
                                break;  
                            }  
                        }  
  
                        pSocket->SetState(nErrorCode ? aborted : closed);  
#endif //NOSOCKETSTATES  
                        pSocket->OnClose(nErrorCode);  
                        break;  
                    }  
                }  
                else //Dispatch notification to the lowest layer  
                {  
                    if (nEvent == FD_READ)  
                    {  
                        // Ignore further FD_READ events after FD_CLOSE has been received  
                        if (pSocket->m_SocketData.onCloseCalled)  
                            return 0;  
  
                        DWORD nBytes;  
                        if (!pSocket->IOCtl(FIONREAD, &nBytes))  
                            nErrorCode = WSAGetLastError();  
                        if (pSocket->m_pLastLayer)  
                            pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode);  
                    }  
                    else if (nEvent == FD_CLOSE)  
                    {  
                        // If there are still bytes left to read, call OnReceive instead of  
                        // OnClose and trigger a new OnClose  
                        DWORD nBytes = 0;  
                        if (!nErrorCode && pSocket->IOCtl(FIONREAD, &nBytes))  
                        {  
                            if (nBytes > 0)  
                            {  
                                // Just repeat message.  
                                pSocket->ResendCloseNotify();  
                                if (pSocket->m_pLastLayer)  
                                    pSocket->m_pLastLayer->CallEvent(FD_READ, 0);  
                                return 0;  
                            }  
                        }  
                        pSocket->m_SocketData.onCloseCalled = true;  
                        if (pSocket->m_pLastLayer)  
                            pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode);  
                    }  
                    else if (pSocket->m_pLastLayer)  
                        pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode);  
                }  
            }  
            return 0;  
        }  
        else if (message == WM_USER) //Notification event sent by a layer  
        {  
            //Verify parameters, lookup socket and notification message  
            //Verify parameters  
            ASSERT(hWnd);  
            CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);  
            ASSERT(pWnd);  
            if (!pWnd)  
                return 0;  
  
            if (wParam >= static_cast<UINT>(pWnd->m_nWindowDataSize)) //Index is within socket storage  
            {  
                return 0;  
            }  
  
            CAsyncSocketEx *pSocket = pWnd->m_pAsyncSocketExWindowData[wParam].m_pSocket;  
            CAsyncSocketExLayer::t_LayerNotifyMsg *pMsg = (CAsyncSocketExLayer::t_LayerNotifyMsg *)lParam;  
            if (!pMsg || !pSocket || pSocket->m_SocketData.hSocket != pMsg->hSocket)  
            {  
                delete pMsg;  
                return 0;  
            }  
            int nEvent=pMsg->lEvent&0xFFFF;  
            int nErrorCode=pMsg->lEvent>>16;  
  
            //Dispatch to layer  
            if (pMsg->pLayer)  
                pMsg->pLayer->CallEvent(nEvent, nErrorCode);  
            else  
            {  
                //Dispatch to CAsyncSocketEx instance  
                switch (nEvent)  
                {  
                case FD_READ:  
#ifndef NOSOCKETSTATES  
                    if (pSocket->GetState() == connecting && !nErrorCode)  
                    {  
                        pSocket->m_nPendingEvents |= FD_READ;  
                        break;  
                    }  
                    else if (pSocket->GetState() == attached && !nErrorCode)  
                        pSocket->SetState(connected);  
                    if (pSocket->GetState() != connected)  
                        break;  
#endif //NOSOCKETSTATES  
                    if (pSocket->m_lEvent & FD_READ)  
                    {  
#ifndef NOSOCKETSTATES  
                        if (nErrorCode)  
                            pSocket->SetState(aborted);  
#endif //NOSOCKETSTATES  
                        pSocket->OnReceive(nErrorCode);  
                    }  
                    break;  
                case FD_FORCEREAD: //Forceread does not check if there's data waiting  
#ifndef NOSOCKETSTATES  
                    if (pSocket->GetState() == connecting && !nErrorCode)  
                    {  
                        pSocket->m_nPendingEvents |= FD_FORCEREAD;  
                        break;  
                    }  
                    else if (pSocket->GetState() == attached && !nErrorCode)  
                        pSocket->SetState(connected);  
                    if (pSocket->GetState() != connected)  
                        break;  
#endif //NOSOCKETSTATES  
                    if (pSocket->m_lEvent & FD_READ)  
                    {  
#ifndef NOSOCKETSTATES  
                        if (nErrorCode)  
                            pSocket->SetState(aborted);  
#endif //NOSOCKETSTATES  
                        pSocket->OnReceive(nErrorCode);  
                    }  
                    break;  
                case FD_WRITE:  
#ifndef NOSOCKETSTATES  
                    if (pSocket->GetState() == connecting && !nErrorCode)  
                    {  
                        pSocket->m_nPendingEvents |= FD_WRITE;  
                        break;  
                    }  
                    else if (pSocket->GetState() == attached && !nErrorCode)  
                        pSocket->SetState(connected);  
                    if (pSocket->GetState() != connected)  
                        break;  
#endif //NOSOCKETSTATES  
                    if (pSocket->m_lEvent & FD_WRITE)  
                    {  
#ifndef NOSOCKETSTATES  
                        if (nErrorCode)  
                            pSocket->SetState(aborted);  
#endif //NOSOCKETSTATES  
                        pSocket->OnSend(nErrorCode);  
                    }  
                    break;  
                case FD_CONNECT:  
#ifndef NOSOCKETSTATES  
                    if (pSocket->GetState() == connecting)  
                        pSocket->SetState(connected);  
                    else if (pSocket->GetState() == attached && !nErrorCode)  
                        pSocket->SetState(connected);  
#endif //NOSOCKETSTATES  
                    if (pSocket->m_lEvent & FD_CONNECT)  
                        pSocket->OnConnect(nErrorCode);  
#ifndef NOSOCKETSTATES  
                    if (!nErrorCode)  
                    {  
                        if (((pSocket->m_nPendingEvents&FD_READ) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_READ))  
                            pSocket->OnReceive(0);  
                        if (((pSocket->m_nPendingEvents&FD_FORCEREAD) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_READ))  
                            pSocket->OnReceive(0);  
                        if (((pSocket->m_nPendingEvents&FD_WRITE) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_WRITE))  
                            pSocket->OnSend(0);  
                    }  
                    pSocket->m_nPendingEvents = 0;  
#endif //NOSOCKETSTATES  
                    break;  
                case FD_ACCEPT:  
#ifndef NOSOCKETSTATES  
                    if ((pSocket->GetState() == listening || pSocket->GetState() == attached) && (pSocket->m_lEvent & FD_ACCEPT))  
#endif //NOSOCKETSTATES  
                    {  
                        pSocket->OnAccept(nErrorCode);  
                    }  
                    break;  
                case FD_CLOSE:  
#ifndef NOSOCKETSTATES  
                    if ((pSocket->GetState() == connected || pSocket->GetState() == attached) && (pSocket->m_lEvent & FD_CLOSE))  
                    {  
                        pSocket->SetState(nErrorCode?aborted:closed);  
#else  
                    {  
#endif //NOSOCKETSTATES  
                        pSocket->OnClose(nErrorCode);  
                    }  
                    break;  
                }  
            }  
            delete pMsg;  
            return 0;  
        }  
        else if (message == WM_USER 1)  
        {  
            // WSAAsyncGetHostByName reply  
  
            // Verify parameters  
            ASSERT(hWnd);  
            CAsyncSocketExHelperWindow *pWnd = (CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);  
            ASSERT(pWnd);  
            if (!pWnd)  
                return 0;  
  
            CAsyncSocketEx *pSocket = NULL;  
            for (int i = 0; i < pWnd->m_nWindowDataSize;   i) {  
                pSocket = pWnd->m_pAsyncSocketExWindowData[i].m_pSocket;  
                if (pSocket && pSocket->m_hAsyncGetHostByNameHandle &&  
                    pSocket->m_hAsyncGetHostByNameHandle == (HANDLE)wParam &&  
                    pSocket->m_pAsyncGetHostByNameBuffer)  
                    break;  
            }  
            if (!pSocket || !pSocket->m_pAsyncGetHostByNameBuffer)  
                return 0;  
  
            int nErrorCode = lParam >> 16;  
            if (nErrorCode) {  
                pSocket->OnConnect(nErrorCode);  
                return 0;  
            }  
  
            SOCKADDR_IN sockAddr{};  
            sockAddr.sin_family = AF_INET;  
            sockAddr.sin_addr.s_addr = ((LPIN_ADDR)((LPHOSTENT)pSocket->m_pAsyncGetHostByNameBuffer)->h_addr)->s_addr;  
  
            sockAddr.sin_port = htons(pSocket->m_nAsyncGetHostByNamePort);  
  
            BOOL res = pSocket->Connect((SOCKADDR*)&sockAddr, sizeof(sockAddr));  
            delete [] pSocket->m_pAsyncGetHostByNameBuffer;  
            pSocket->m_pAsyncGetHostByNameBuffer = 0;  
            pSocket->m_hAsyncGetHostByNameHandle = 0;  
  
            if (!res)  
                if (GetLastError() != WSAEWOULDBLOCK)  
                    pSocket->OnConnect(GetLastError());  
            return 0;  
        }  
        else if (message == WM_USER   2)  
        {  
            //Verify parameters, lookup socket and notification message  
            //Verify parameters  
            if (!hWnd)  
                return 0;  
  
            CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);  
            if (!pWnd)  
                return 0;  
  
            if (wParam >= static_cast<UINT>(pWnd->m_nWindowDataSize)) //Index is within socket storage  
                return 0;  
  
            CAsyncSocketEx *pSocket = pWnd->m_pAsyncSocketExWindowData[wParam].m_pSocket;  
            if (!pSocket)  
                return 0;  
  
            // Process pending callbacks  
            std::list<t_callbackMsg> tmp;  
            tmp.swap(pSocket->m_pendingCallbacks);  
            pSocket->OnLayerCallback(tmp);  
  
            for (auto & cb : tmp) {  
                delete [] cb.str;  
            }  
        }  
        else if (message == WM_TIMER)  
        {  
            if (wParam != 1)  
                return 0;  
  
            ASSERT(hWnd);  
            CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);  
            ASSERT(pWnd && pWnd->m_pThreadData);  
            if (!pWnd || !pWnd->m_pThreadData)  
                return 0;  
  
            if (pWnd->m_pThreadData->layerCloseNotify.empty())  
            {  
                KillTimer(hWnd, 1);  
                return 0;  
            }  
            CAsyncSocketEx* socket = pWnd->m_pThreadData->layerCloseNotify.front();  
            pWnd->m_pThreadData->layerCloseNotify.pop_front();  
            if (pWnd->m_pThreadData->layerCloseNotify.empty())  
                KillTimer(hWnd, 1);  
  
            if (socket)  
                PostMessage(hWnd, socket->m_SocketData.nSocketIndex   WM_SOCKETEX_NOTIFY, socket->m_SocketData.hSocket, FD_CLOSE);  
            return 0;  
        }  
        return DefWindowProc(hWnd, message, wParam, lParam);  
    } 

关于单个服务程序的框架,我已经介绍完了,如果你能完全理解我要表达的意思,我相信你也能构建出一套高性能服务程序来。

由于微信公众号文章字数的限制,本篇文章未完,下一篇是《服务器端编程心得(八)——高性能服务器架构设计总结4——以flamigo服务器代码为例》。

0 人点赞