常见的多线程并发模型
异步IO多线程并发模型通常由监听线程组 工作线程组构成,监听线程负责接收新连接,然后把新连接转给工作线程。
这种模式的缺点:以“横向分工”方式来划分线程组的模式会造成线程或CPU负载不均衡,当处理大量TCP长连接时工作线程负载偏重,当应对大量短连接时监听线程负载偏重,为使线程和cpu负载不成为瓶颈,需要规则好监听线程和工作线程的平衡。
常见的多进程并发模型
进程并发模型,最熟悉的就是nginx了,效率很高。
这种模型是这样的:父进程负责监听端口,创建监听文件描述符(socket),fork创建多个子进程(工作进程),所有子进程都共享父进程的监听socket,接收新连接并处理。这种模型很好地解决了负载不均衡问题,并发很高。
但这种模式并不适合需要频繁开启/关闭监听端口的场景,熟悉linux系统进程关系的同学肯定知道,文件描述是属于进程空间的,父进程新创建的文件描述符对已存在的子进程来说是不可用的。因此,在这个模型中,每新开启一个监听端口,父进程为了使子进程能使用新创建的文件描述符,它就要通知子进子程退出,同时重新fork新的子进程,由于父进程先创建文件描述符后创建子进程,此时父进程的文件描述符对子进程是可用的。
并发模型的改进
此改进方案适合多线程和多进程,为了方便描述,暂定为多进程。
本模型类似nginx,各子进程是平等的,并不存在“横向分工”,但解决了频繁fork子进程问题。其过程是:父进程把新创建的监听文件描术符的“内存”副本发送给各子进程,子进程收到这份“内存”副本再重新恢复成自身进程空间的文件描述符,也就是说父进程发送的不是socket这个值 ,而这它对应的“内存”数据,通过这种方式很好地避免了重新fork子进程问题。
注意:只有多进程才需地发送socket内存副本,多线程直接发送socket值就行了。
怎样发送socket内存副本?这个过程是内核完成的,我们只使用sendmsg和recvmsg这两个接口就行了,这两个接口定义在系统头文件include/sys/socket.h,网上也有很多使用sendmsg和recvmsg的例子。
各进程需要创建一对socketpair
通信通道,用作进程通信
int fds[2] = {0};
int ret = socketpair(AF_UNIX, SOCK_DGRAM, 0, fds);
if (-1 == ret) {
LOG_E("Create unix socket failed: " << strerror(errno));
return EXCE_FAILE;
}
m_rfd = fds[0];
m_wfd = fds[1];
// 设置非阻塞
if ((-1 == fcntl(m_rfd, F_SETFL, O_NONBLOCK))/* || (-1 == fcntl(m_wfd, F_SETFL, O_NONBLOCK))*/) {
LOG_E("Unix socket, failed to call fcntl(O_NONBLOCK): " << strerror(errno));
close(m_rfd);
close(m_wfd);
return EXCE_FAILE;
}
父进程发送socket副本:
代码语言:javascript复制int Task::sendMsg(int fd, void *ptr, size_t nbytes, int sendfd, int flag) {
struct msghdr msg;
struct iovec iov[1];
union {
struct cmsghdr cm;
char control[CMSG_SPACE(sizeof(int))];
} control_un;
msg.msg_control = 0;
msg.msg_controllen = 0;
if (sendfd > 0) {
msg.msg_control = control_un.control;
msg.msg_controllen = sizeof(control_un.control);
struct cmsghdr *cmptr;
cmptr = CMSG_FIRSTHDR(&msg);
cmptr->cmsg_len = CMSG_LEN(sizeof(int));
cmptr->cmsg_level = SOL_SOCKET;
cmptr->cmsg_type = SCM_RIGHTS;
*((int*)CMSG_DATA(cmptr)) = sendfd;
}
msg.msg_name = nullptr;
msg.msg_namelen = 0;
iov[0].iov_base = ptr;
iov[0].iov_len = nbytes;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
return sendmsg(fd, &msg, flag);
}
各子进程接收socket副本:
代码语言:javascript复制int Task::recvMsg(int sockfd, void *data, size_t size, int *fd, int flag) {
int num;
struct msghdr msg;
struct cmsghdr *cmsgptr;
struct iovec vec[1];
union {
struct cmsghdr cmsg;
char control[CMSG_SPACE(sizeof(int))];
} control_un;
msg.msg_control = control_un.control;
msg.msg_controllen = sizeof(control_un.control);
vec[0].iov_base = data;
vec[0].iov_len = size;
msg.msg_name = nullptr;
msg.msg_namelen = 0;
msg.msg_iov = vec;
msg.msg_iovlen = 1;
if ((num = recvmsg(sockfd, &msg, flag)) == -1){
return num;
}
if ((cmsgptr = CMSG_FIRSTHDR(&msg)) != nullptr && cmsgptr->cmsg_len == CMSG_LEN(sizeof(int))) {
*fd = *((int *)CMSG_DATA(cmsgptr));
} else {
*fd = -1;
}
return num;
}
通过这两个接口可以把文件描述符发送给其它进程(不要求是父子关系)。
进程通信
最后说一下进程之间怎样收发消息和快速寻址,对异步IO模型, 每个进程都要分配一个消息通道(SOCK_DGRAM类型的socketpair
),然后把这个消息通道加入到epoll_wait的监听列表中。
每个进程都维持着一个保存监听端口状态和连接状态的数组(context列表),消息通道的context固定放在首位,index为0
然后把context的index放到event.data.fd,加入到epoll_wait,这里要注意,赋值给event.data.fd的是index,不是socket
代码语言:javascript复制struct epoll_event event;
bzero(&event, sizeof(event));
event.events = EPOLLIN;
event.data.fd = m_pipeCxt->getIndex();
if (epoll_ctl(m_epollFd, EPOLL_CTL_ADD, m_rfd, &event) < 0) {
LOG_E("Failed to call epoll_ctl: " << strerror(errno));
return;
}
struct epoll_event* pEvents = new epoll_event[pollNumMax];
while(true) {
// 等待有事件发生, 第四个参数: -1是永久等待, 0是立即返回, 大于0值表示多少毫秒返回
int iNfds = epoll_wait(m_epollFd, pEvents, pollNumMax, -1);
if (iNfds == -1) {
LOG_E("Failed to call epoll_wait " << strerror(errno));
continue;
}
/* 处理所有事件 */
for (int i = 0; i < iNfds; i) {
if (pEvents[i].data.fd == 0) {
msgProc();
} else {
epolleventhandle(pEvents[i].data.fd, pEvents[i].events);
}
}
}
delete []pEvents;
当进程收到事件知道时,从event.data.fd得到数组索引,快速从数组得到对应的连接上下文(context)。
以上介绍了并发模型的关键点, 本文就写到这,文笔有限,欢迎指正。