(八)高性能服务器架构设计总结2——以flamigo服务器代码为例

2018-04-04 15:21:57 浏览数 (1)

说了这么多,我们来以flamingo的服务器程序的网络框架设计为例来验证上述介绍的理论。flamingo的网络框架是基于陈硕的muduo库,改成C 11的版本,并修改了一些bug。在此感谢原作者陈硕。flamingo的源码可以在这里下载:https://github.com/baloonwj/flamingo,打不开github的可以移步csdn:http://download.csdn.net/detail/analogous_love/9805797。

上文介绍的核心线程函数的while循环位于eventloop.cpp中:

代码语言:javascript复制
void EventLoop::loop()  
{  
    assert(!looping_);  
    assertInLoopThread();  
    looping_ = true;  
    quit_ = false;  // FIXME: what if someone calls quit() before loop() ?  
    LOG_TRACE << "EventLoop " << this << " start looping";  
  
    while (!quit_)  
    {  
        activeChannels_.clear();  
        pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);  
          iteration_;  
        if (Logger::logLevel() <= Logger::TRACE)  
        {  
            printActiveChannels();  
        }  
        // TODO sort channel by priority  
        eventHandling_ = true;  
        for (ChannelList::iterator it = activeChannels_.begin();  
            it != activeChannels_.end();   it)  
        {  
            currentActiveChannel_ = *it;  
            currentActiveChannel_->handleEvent(pollReturnTime_);  
        }  
        currentActiveChannel_ = NULL;  
        eventHandling_ = false;  
        doPendingFunctors();  
  
        if (frameFunctor_)  
        {  
            frameFunctor_();  
        }         
    }  
  
    LOG_TRACE << "EventLoop " << this << " stop looping";  
    looping_ = false;  
} 

poller_->poll利用epoll分离网络事件,然后接着处理分离出来的网络事件,每一个客户端socket对应一个连接,即一个TcpConnection和Channel通道对象。currentActiveChannel_->handleEvent(pollReturnTime_)根据是可读、可写、出错事件来调用对应的处理函数,这些函数都是回调函数,程序初始化阶段设置进来的:

代码语言:javascript复制
void Channel::handleEvent(Timestamp receiveTime)  
{  
    std::shared_ptr<void> guard;  
    if (tied_)  
    {  
        guard = tie_.lock();  
        if (guard)  
        {  
            handleEventWithGuard(receiveTime);  
        }  
    }  
    else  
    {  
        handleEventWithGuard(receiveTime);  
    }  
}  
  
void Channel::handleEventWithGuard(Timestamp receiveTime)  
{  
    eventHandling_ = true;  
    LOG_TRACE << reventsToString();  
    if ((revents_ & POLLHUP) && !(revents_ & POLLIN))  
    {  
        if (logHup_)  
        {  
            LOG_WARN << "Channel::handle_event() POLLHUP";  
        }  
        if (closeCallback_) closeCallback_();  
    }  
  
    if (revents_ & POLLNVAL)  
    {  
        LOG_WARN << "Channel::handle_event() POLLNVAL";  
    }  
  
    if (revents_ & (POLLERR | POLLNVAL))  
    {  
        if (errorCallback_) errorCallback_();  
    }  
    if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))  
    {  
        //当是侦听socket时,readCallback_指向Acceptor::handleRead  
        //当是客户端socket时,调用TcpConnection::handleRead   
        if (readCallback_) readCallback_(receiveTime);  
    }  
    if (revents_ & POLLOUT)  
    {  
        //如果是连接状态服的socket,则writeCallback_指向Connector::handleWrite()  
        if (writeCallback_) writeCallback_();  
    }  
    eventHandling_ = false;  
}  

当然,这里利用了Channel对象的“多态性”,如果是普通socket,可读事件就会调用预先设置的回调函数;但是如果是侦听socket,则调用Aceptor对象的handleRead()来接收新连接:

代码语言:javascript复制
void Acceptor::handleRead()  
{  
    loop_->assertInLoopThread();  
    InetAddress peerAddr;  
    //FIXME loop until no more  
    int connfd = acceptSocket_.accept(&peerAddr);  
    if (connfd >= 0)  
    {  
        // string hostport = peerAddr.toIpPort();  
        // LOG_TRACE << "Accepts of " << hostport;  
        //newConnectionCallback_实际指向TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)  
        if (newConnectionCallback_)  
        {  
            newConnectionCallback_(connfd, peerAddr);  
        }  
        else  
        {  
            sockets::close(connfd);  
        }  
    }  
    else  
    {  
        LOG_SYSERR << "in Acceptor::handleRead";  
        // Read the section named "The special problem of  
        // accept()ing when you can't" in libev's doc.  
        // By Marc Lehmann, author of livev.  
        if (errno == EMFILE)  
        {  
            ::close(idleFd_);  
            idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);  
            ::close(idleFd_);  
            idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);  
        }  
    }  
}  

主循环里面的业务逻辑处理对应:

代码语言:javascript复制
doPendingFunctors();  
  
if (frameFunctor_)  
{  
   frameFunctor_();  
}         
代码语言:javascript复制
void EventLoop::doPendingFunctors()  
{  
    std::vector<Functor> functors;  
    callingPendingFunctors_ = true;  
  
    {  
        std::unique_lock<std::mutex> lock(mutex_);  
        functors.swap(pendingFunctors_);  
    }  
  
    for (size_t i = 0; i < functors.size();   i)  
    {  
        functors[i]();  
    }  
    callingPendingFunctors_ = false;  
}  

这里增加业务逻辑是增加执行任务的函数指针的,增加的任务保存在成员变量pendingFunctors_中,这个变量是一个函数指针数组(vector对象),执行的时候,调用每个函数就可以了。上面的代码先利用一个栈变量将成员变量pendingFunctors_里面的函数指针换过来,接下来对这个栈变量进行操作就可以了,这样减少了锁的粒度。因为成员变量pendingFunctors_在增加任务的时候,也会被用到,设计到多个线程操作,所以要加锁,增加任务的地方是:

代码语言:javascript复制
void EventLoop::queueInLoop(const Functor& cb)  
{  
    {  
        std::unique_lock<std::mutex> lock(mutex_);  
        pendingFunctors_.push_back(cb);  
    }  
  
    if (!isInLoopThread() || callingPendingFunctors_)  
    {  
        wakeup();  
    }  
}  

而frameFunctor_就更简单了,就是通过设置一个函数指针就可以了。当然这里有个技巧性的东西,即增加任务的时候,为了能够立即执行,使用唤醒机制,通过往一个fd里面写入简单的几个字节,来唤醒epoll,使其立刻返回,因为此时没有其它的socke有事件,这样接下来就执行刚才添加的任务了。

我们看一下数据收取的逻辑:

代码语言:javascript复制
void TcpConnection::handleRead(Timestamp receiveTime)  
{  
    loop_->assertInLoopThread();  
    int savedErrno = 0;  
    ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);  
    if (n > 0)  
    {  
        //messageCallback_指向CTcpSession::OnRead(const std::shared_ptr<TcpConnection>& conn, Buffer* pBuffer, Timestamp receiveTime)  
        messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);  
    }  
    else if (n == 0)  
    {  
        handleClose();  
    }  
    else  
    {  
        errno = savedErrno;  
        LOG_SYSERR << "TcpConnection::handleRead";  
        handleError();  
    }  
}  

将收到的数据放到接收缓冲区里面,将来我们来解包:

代码语言:javascript复制
void ClientSession::OnRead(const std::shared_ptr<TcpConnection>& conn, Buffer* pBuffer, Timestamp receivTime)  
{  
    while (true)  
    {  
        //不够一个包头大小  
        if (pBuffer->readableBytes() < (size_t)sizeof(msg))  
        {  
            LOG_INFO << "buffer is not enough for a package header, pBuffer->readableBytes()=" << pBuffer->readableBytes() << ", sizeof(msg)=" << sizeof(msg);  
            return;  
        }  
  
        //不够一个整包大小  
        msg header;  
        memcpy(&header, pBuffer->peek(), sizeof(msg));  
        if (pBuffer->readableBytes() < (size_t)header.packagesize   sizeof(msg))  
            return;  
  
        pBuffer->retrieve(sizeof(msg));  
        std::string inbuf;  
        inbuf.append(pBuffer->peek(), header.packagesize);  
        pBuffer->retrieve(header.packagesize);  
        if (!Process(conn, inbuf.c_str(), inbuf.length()))  
        {  
            LOG_WARN << "Process error, close TcpConnection";  
            conn->forceClose();  
        }  
    }// end while-loop  
  
}  

先判断接收缓冲区里面的数据是否够一个包头大小,如果够再判断够不够包头指定的包体大小,如果还是够的话,接着在Process函数里面处理该包。

再看看发送数据的逻辑:

代码语言:javascript复制
void TcpConnection::sendInLoop(const void* data, size_t len)  
{  
    loop_->assertInLoopThread();  
    ssize_t nwrote = 0;  
    size_t remaining = len;  
    bool faultError = false;  
    if (state_ == kDisconnected)  
    {  
        LOG_WARN << "disconnected, give up writing";  
        return;  
    }  
    // if no thing in output queue, try writing directly  
    if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)  
    {  
        nwrote = sockets::write(channel_->fd(), data, len);  
        if (nwrote >= 0)  
        {  
            remaining = len - nwrote;  
            if (remaining == 0 && writeCompleteCallback_)  
            {  
                loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));  
            }  
        }  
        else // nwrote < 0  
        {  
            nwrote = 0;  
            if (errno != EWOULDBLOCK)  
            {  
                LOG_SYSERR << "TcpConnection::sendInLoop";  
                if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?  
                {  
                    faultError = true;  
                }  
            }  
        }  
    }  
  
    assert(remaining <= len);  
    if (!faultError && remaining > 0)  
    {  
        size_t oldLen = outputBuffer_.readableBytes();  
        if (oldLen   remaining >= highWaterMark_                          
            && oldLen < highWaterMark_  
            && highWaterMarkCallback_)  
        {  
            loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen   remaining));  
        }  
        outputBuffer_.append(static_cast<const char*>(data) nwrote, remaining);  
        if (!channel_->isWriting())  
        {  
            channel_->enableWriting();  
        }  
    }  
}  

如果剩余的数据remaining大于则调用channel_->enableWriting();开始监听可写事件,可写事件处理如下:

代码语言:javascript复制
void TcpConnection::handleWrite()  
{  
    loop_->assertInLoopThread();  
    if (channel_->isWriting())  
    {  
        ssize_t n = sockets::write(channel_->fd(),  
            outputBuffer_.peek(),  
            outputBuffer_.readableBytes());  
        if (n > 0)  
        {  
            outputBuffer_.retrieve(n);  
            if (outputBuffer_.readableBytes() == 0)  
            {  
                channel_->disableWriting();  
                if (writeCompleteCallback_)  
                {  
                    loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));  
                }  
                if (state_ == kDisconnecting)  
                {  
                    shutdownInLoop();  
                }  
            }  
        }  
        else  
        {  
            LOG_SYSERR << "TcpConnection::handleWrite";  
            // if (state_ == kDisconnecting)  
            // {  
            //   shutdownInLoop();  
            // }  
        }  
    }  
    else  
    {  
        LOG_TRACE << "Connection fd = " << channel_->fd()  
            << " is down, no more writing";  
    }  
} 

如果发送完数据以后调用channel_->disableWriting();移除监听可写事件。

很多读者可能一直想问,文中不是说解包数据并处理逻辑是业务代码而非网络通信的代码,你这里貌似都混在一起了,其实没有,这里实际的业务代码处理都是框架曾提供的回调函数里面处理的,具体怎么处理,由框架使用者——业务层自己定义。

总结起来,实际上就是一个线程函数里一个loop那么点事情,不信你再看我曾经工作上的一个交易系统项目代码:

代码语言:javascript复制
void CEventDispatcher::Run()  
{  
    m_bShouldRun = true;  
    while(m_bShouldRun)  
    {  
        DispatchIOs();        
        SyncTime();  
        CheckTimer();  
        DispatchEvents();  
    }  
}  
代码语言:javascript复制
void CEpollReactor::DispatchIOs()  
{  
    DWORD dwSelectTimeOut = SR_DEFAULT_EPOLL_TIMEOUT;  
    if (HandleOtherTask())  
    {  
        dwSelectTimeOut = 0;  
    }  
  
    struct epoll_event ev;  
    CEventHandlerIdMap::iterator itor = m_mapEventHandlerId.begin();  
    for(; itor!=m_mapEventHandlerId.end(); itor  )  
    {  
        CEventHandler *pEventHandler = (CEventHandler *)(*itor).first;  
        if(pEventHandler == NULL){  
            continue;  
        }  
        ev.data.ptr = pEventHandler;  
        ev.events = 0;  
        int nReadID, nWriteID;  
        pEventHandler->GetIds(&nReadID, &nWriteID);    
        if (nReadID > 0)  
        {  
            ev.events |= EPOLLIN;  
        }  
        if (nWriteID > 0)  
        {  
            ev.events |= EPOLLOUT;  
        }  
          
        epoll_ctl(m_fdEpoll, EPOLL_CTL_MOD, (*itor).second, &ev);  
    }  
      
    struct epoll_event events[EPOLL_MAX_EVENTS];  
  
    int nfds = epoll_wait(m_fdEpoll, events, EPOLL_MAX_EVENTS, dwSelectTimeOut/1000);  
  
    for (int i=0; i<nfds; i  )  
    {  
        struct epoll_event &evref = events[i];  
        CEventHandler *pEventHandler = (CEventHandler *)evref.data.ptr;  
        if ((evref.events|EPOLLIN)!=0 && m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end())  
        {  
            pEventHandler->HandleInput();  
        }  
        if ((evref.events|EPOLLOUT)!=0 && m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end())  
        {  
            pEventHandler->HandleOutput();  
        }  
    }     
}  
代码语言:javascript复制
void CEventDispatcher::DispatchEvents()  
{  
    CEvent event;  
    CSyncEvent *pSyncEvent;  
    while(m_queueEvent.PeekEvent(event))  
    {  
        int nRetval;  
  
        if(event.pEventHandler != NULL)  
        {  
            nRetval = event.pEventHandler->HandleEvent(event.nEventID, event.dwParam, event.pParam);  
        }  
        else  
        {  
            nRetval = HandleEvent(event.nEventID, event.dwParam, event.pParam);  
        }  
          
        if(event.pAdd != NULL)      //同步消息  
        {  
            pSyncEvent=(CSyncEvent *)event.pAdd;  
            pSyncEvent->nRetval = nRetval;  
            pSyncEvent->sem.UnLock();  
        }  
    }  
}  

再看看蘑菇街开源的TeamTalk的源码(代码下载地址:https://github.com/baloonwj/TeamTalk):

代码语言:javascript复制
void CEventDispatch::StartDispatch(uint32_t wait_timeout)  
{  
    fd_set read_set, write_set, excep_set;  
    timeval timeout;  
    timeout.tv_sec = 0;  
    timeout.tv_usec = wait_timeout * 1000;  // 10 millisecond  
  
    if(running)  
        return;  
    running = true;  
      
    while (running)  
   {  
        _CheckTimer();  
        _CheckLoop();  
  
        if (!m_read_set.fd_count && !m_write_set.fd_count && !m_excep_set.fd_count)  
        {  
            Sleep(MIN_TIMER_DURATION);  
            continue;  
        }  
  
        m_lock.lock();  
        memcpy(&read_set, &m_read_set, sizeof(fd_set));  
        memcpy(&write_set, &m_write_set, sizeof(fd_set));  
        memcpy(&excep_set, &m_excep_set, sizeof(fd_set));  
        m_lock.unlock();  
  
        int nfds = select(0, &read_set, &write_set, &excep_set, &timeout);  
  
        if (nfds == SOCKET_ERROR)  
        {  
            log("select failed, error code: %d", GetLastError());  
            Sleep(MIN_TIMER_DURATION);  
            continue;           // select again  
        }  
  
        if (nfds == 0)  
        {  
            continue;  
        }  
  
        for (u_int i = 0; i < read_set.fd_count; i  )  
        {  
            //log("select return read count=%dn", read_set.fd_count);  
            SOCKET fd = read_set.fd_array[i];  
            CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);  
            if (pSocket)  
            {  
                pSocket->OnRead();  
                pSocket->ReleaseRef();  
            }  
        }  
  
        for (u_int i = 0; i < write_set.fd_count; i  )  
        {  
            //log("select return write count=%dn", write_set.fd_count);  
            SOCKET fd = write_set.fd_array[i];  
            CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);  
            if (pSocket)  
            {  
                pSocket->OnWrite();  
                pSocket->ReleaseRef();  
            }  
        }  
  
        for (u_int i = 0; i < excep_set.fd_count; i  )  
        {  
            //log("select return exception count=%dn", excep_set.fd_count);  
            SOCKET fd = excep_set.fd_array[i];  
            CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);  
            if (pSocket)  
            {  
                pSocket->OnClose();  
                pSocket->ReleaseRef();  
            }  
        }  
  
    }  
}  

由于微信公众号文章字数的限制,本篇文章未完,下一篇是《服务器端编程心得(八)——高性能服务器架构设计总结3——以flamigo服务器代码为例》。

0 人点赞