后台并发模型改进经验分享

2019-08-26 13:02:35 浏览数 (1)

常见的多线程并发模型

异步IO多线程并发模型通常由监听线程组 工作线程组构成,监听线程负责接收新连接,然后把新连接转给工作线程。

这种模式的缺点:以“横向分工”方式来划分线程组的模式会造成线程或CPU负载不均衡,当处理大量TCP长连接时工作线程负载偏重,当应对大量短连接时监听线程负载偏重,为使线程和cpu负载不成为瓶颈,需要规则好监听线程和工作线程的平衡。

常见的多进程并发模型

进程并发模型,最熟悉的就是nginx了,效率很高。

这种模型是这样的:父进程负责监听端口,创建监听文件描述符(socket),fork创建多个子进程(工作进程),所有子进程都共享父进程的监听socket,接收新连接并处理。这种模型很好地解决了负载不均衡问题,并发很高。

但这种模式并不适合需要频繁开启/关闭监听端口的场景,熟悉linux系统进程关系的同学肯定知道,文件描述是属于进程空间的,父进程新创建的文件描述符对已存在的子进程来说是不可用的。因此,在这个模型中,每新开启一个监听端口,父进程为了使子进程能使用新创建的文件描述符,它就要通知子进子程退出,同时重新fork新的子进程,由于父进程先创建文件描述符后创建子进程,此时父进程的文件描述符对子进程是可用的。

并发模型的改进

此改进方案适合多线程和多进程,为了方便描述,暂定为多进程。

本模型类似nginx,各子进程是平等的,并不存在“横向分工”,但解决了频繁fork子进程问题。其过程是:父进程把新创建的监听文件描术符的“内存”副本发送给各子进程,子进程收到这份“内存”副本再重新恢复成自身进程空间的文件描述符,也就是说父进程发送的不是socket这个值 ,而这它对应的“内存”数据,通过这种方式很好地避免了重新fork子进程问题。

注意:只有多进程才需地发送socket内存副本,多线程直接发送socket值就行了。

怎样发送socket内存副本?这个过程是内核完成的,我们只使用sendmsg和recvmsg这两个接口就行了,这两个接口定义在系统头文件include/sys/socket.h,网上也有很多使用sendmsg和recvmsg的例子。

各进程需要创建一对socketpair通信通道,用作进程通信

代码语言:javascript复制
    int fds[2] = {0};
    int ret = socketpair(AF_UNIX, SOCK_DGRAM, 0, fds);
    if (-1 == ret) {

        LOG_E("Create unix socket failed: " << strerror(errno));
        return EXCE_FAILE;
    }

    m_rfd = fds[0];
    m_wfd = fds[1];

    // 设置非阻塞
    if ((-1 == fcntl(m_rfd, F_SETFL, O_NONBLOCK))/* || (-1 == fcntl(m_wfd, F_SETFL, O_NONBLOCK))*/) {
        LOG_E("Unix socket, failed to call fcntl(O_NONBLOCK): " << strerror(errno));
        close(m_rfd);
        close(m_wfd);
        return EXCE_FAILE;
    }

父进程发送socket副本:

代码语言:javascript复制
int Task::sendMsg(int fd, void *ptr, size_t nbytes, int sendfd, int flag) {
    struct msghdr msg;  
    struct iovec iov[1];  
    union {
      struct cmsghdr cm;  
      char control[CMSG_SPACE(sizeof(int))];  
    } control_un;

    msg.msg_control = 0;  
    msg.msg_controllen = 0;  

    if (sendfd > 0) {
        msg.msg_control = control_un.control;  
        msg.msg_controllen = sizeof(control_un.control);  
      
        struct cmsghdr  *cmptr;
        cmptr = CMSG_FIRSTHDR(&msg);  
        cmptr->cmsg_len = CMSG_LEN(sizeof(int));
        cmptr->cmsg_level = SOL_SOCKET;
        cmptr->cmsg_type = SCM_RIGHTS;  
        *((int*)CMSG_DATA(cmptr)) = sendfd;  
    }

    msg.msg_name = nullptr;  
    msg.msg_namelen = 0;  
  
    iov[0].iov_base = ptr;  
    iov[0].iov_len = nbytes;  
    msg.msg_iov = iov;  
    msg.msg_iovlen = 1;  
    return sendmsg(fd, &msg, flag);  
}

各子进程接收socket副本:

代码语言:javascript复制
int Task::recvMsg(int sockfd, void *data, size_t size, int *fd, int flag) {  
    int num;  
    struct msghdr msg;  
    struct cmsghdr *cmsgptr;  
    struct iovec vec[1];  
    union {  
        struct cmsghdr cmsg;  
        char control[CMSG_SPACE(sizeof(int))];      
    } control_un;  
      
    msg.msg_control = control_un.control;  
    msg.msg_controllen = sizeof(control_un.control);  
      
    vec[0].iov_base = data;  
    vec[0].iov_len = size;  
      
    msg.msg_name = nullptr;  
    msg.msg_namelen = 0;  
    msg.msg_iov = vec;  
    msg.msg_iovlen = 1;     
      
    if ((num = recvmsg(sockfd, &msg, flag)) == -1){  
        return num;  
    }  
      
    if ((cmsgptr = CMSG_FIRSTHDR(&msg)) != nullptr && cmsgptr->cmsg_len == CMSG_LEN(sizeof(int))) {
        *fd = *((int *)CMSG_DATA(cmsgptr));  
    } else {
        *fd = -1;
    }  
      
    return num;  
} 

通过这两个接口可以把文件描述符发送给其它进程(不要求是父子关系)。

进程通信

最后说一下进程之间怎样收发消息和快速寻址,对异步IO模型, 每个进程都要分配一个消息通道(SOCK_DGRAM类型的socketpair),然后把这个消息通道加入到epoll_wait的监听列表中。

每个进程都维持着一个保存监听端口状态和连接状态的数组(context列表),消息通道的context固定放在首位,index为0

然后把context的index放到event.data.fd,加入到epoll_wait,这里要注意,赋值给event.data.fd的是index,不是socket

代码语言:javascript复制
struct epoll_event event;
    bzero(&event, sizeof(event));
    event.events = EPOLLIN;
    event.data.fd = m_pipeCxt->getIndex();
    if (epoll_ctl(m_epollFd, EPOLL_CTL_ADD, m_rfd, &event) < 0) {
        LOG_E("Failed to call epoll_ctl: " << strerror(errno));
        return;
    }
    struct epoll_event* pEvents = new epoll_event[pollNumMax];
    while(true) {
        // 等待有事件发生, 第四个参数: -1是永久等待, 0是立即返回, 大于0值表示多少毫秒返回
        int iNfds = epoll_wait(m_epollFd, pEvents, pollNumMax, -1);
        if (iNfds == -1) {
            LOG_E("Failed to call epoll_wait " << strerror(errno));
            continue;
        }

        /* 处理所有事件 */
        for (int i = 0; i < iNfds;   i) {
            if (pEvents[i].data.fd == 0) {
                msgProc();
            } else {
                epolleventhandle(pEvents[i].data.fd, pEvents[i].events);
            }
        }
    }

    delete []pEvents;

当进程收到事件知道时,从event.data.fd得到数组索引,快速从数组得到对应的连接上下文(context)。

以上介绍了并发模型的关键点, 本文就写到这,文笔有限,欢迎指正。

0 人点赞