一、Reactor网络模型简介
什么是并发:网络并发,通俗的讲就是服务器可以承载的客户端数量,即服务器可以稳定保证客户端同时接入的数量。
Reactor模型开发效率比直接使用IO多路复用要高,它一般是单线程的,设计目标是希望一个线程使用CPU的全部资源;带来的优点是,在每个事件处理中很多时候不需要考虑共享资源的互斥访问。
Reactor模式是处理并发IO比较常见的模式,用于同步IO,核心思想是将所有要处理的IO事件注册到一个中心IO多路复用器上,同时主线程或进程阻塞在IO多路复用器上;一旦有事件到来或准备就绪,多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中。
二、Reactor的优点
1、响应快;不必为单个同步事件阻塞,虽然Reactor本身依然是同步的。 2、编程相对简单;可以最大程度的避免复杂的多线程及同步问题,尽可能的避免多线程、多进程的切换开销。 3、可扩展性;可通过增加Reactor实例个数,充分利用CPU资源。 4、高复用性;Reactor模型本身与事件处理逻辑无关,具有很高的复用性。
三、实现过程
step 1:定义Reactor模型相关结构体
reactor数据结构设计图如下:
结构说明:以fd作为索引,存放在block中;当一个fd到来时,通过fd/MAX先找到fd对应的block号,再通过fd%MAX找到对应的偏移地址。例如来了个fd=10000,每个块存放的最大item数量MAX=1024,那么fd对应的block序号等于10000/1024=9;偏移量等于1000024=784。这样就可以找到fd对应的数据存放地址item。
数据结构的代码实现如下:
代码语言:javascript复制struct ntyevnt{
int fd;//事件fd
char buffer[BUFFER_LENGTH];//缓冲区
int length;//缓存长度
int status;//状态
int events;//事件
void *arg;//callback的参数
int(*callback)(int fd, int events, void* arg);//回调函数
};
struct eventblock{
struct *sock_items;//事件集合
struct eventblock *next;//指向下一个内存块
};
struct reactor{
int epfd;//epoll的文件描述符
int blkcnt;//事件块的数量
struct eventblock *evtblk;//事件块的起始地址
};
step 2:实现Reactor容器初始化功能
我们这里使用epoll作为IO多路复用器。 思路:初始化reactor内存块,避免脏数据;创建events和block并初始化,将events添加到block中,将block添加到reactor的链表中管理。
代码语言:javascript复制int ntyreactor_init(struct ntyreactor *reactor)
{
if (reactor == NULL)
return -1;
memset(reactor, 0, sizeof(struct ntyreactor));
//创建epoll,作为IO多路复用器
reactor->epfd = epoll_create(1);
if (reactor->epfd <= 0)
{
printf("create epfd in %s error %sn", __func__, strerror(errno));
return -2;
}
// 创建事件集
struct ntyevnt *events = (struct ntyevnt *)malloc(MAX_EPOLL_EVENTS * sizeof(struct ntyevnt));
if (events == NULL)
{
printf("create ntyevnt in %s error %sn", __func__, strerror(errno));
close(reactor->epfd);
return -3;
}
memset(events, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
//创建事件内存块
struct eventblock *block = (struct eventblock*)malloc(sizeof(struct eventblock));
if (block == NULL)
{
printf("create eventblock in %s error %sn", __func__, strerror(errno));
free(events);
close(reactor->epfd);
return -4;
}
block->events = events;
block->next = NULL;
// reactor初始化赋值
reactor->evblks=block;
reactor->blkcnt = 1;
return 0;
}
step 3:实现socket初始化功能
定义成一个函数,方便初始化多个监听端口。
代码语言:javascript复制int init_sock(short port)
{
int ret = 0;
int fd = socket(AF_INET, SOCK_STREAM, 0);//创建套字接
if (fd == -1)
{
printf("create socket in %s error %sn", __func__, strerror(errno));
return -1;
}
ret=fcntl(fd, F_SETFL, O_NONBLOCK);//设置非阻塞
if (ret == -1)
{
printf("fcntl O_NONBLOCK in %s error %sn", __func__, strerror(errno));
return -1;
}
// 设置属性
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;// IPV4
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(port);
// 绑定
ret = bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr));
if (ret == -1)
{
printf("bind() in %s error %sn", __func__, strerror(errno));
return -1;
}
//监听
ret = listen(fd, 20);
if (ret < 0)
{
printf("listen failed : %sn", strerror(errno));
return -1;
}
printf("listen server port : %dn", port);
return fd;
}
step 4:实现Reactor动态扩容功能
为了实现高并发,服务器需要监听多个端口。当高并发时需要reactor容器进行扩容管理。 核心思路:找到链表的末端,分别为events和block分配内存并初始化,将events添加到block中,将block添加到reactor的链表中管理。
代码语言:javascript复制int ntyreactor_alloc(struct ntyreactor *reactor)
{
if (reactor == NULL)
return -1;
if (reactor->evblks == NULL)
return -1;
//找到链表末端
struct eventblock *blk = reactor->evblks;
while (blk->next != NULL)
blk = blk->next;
// 创建事件集
struct ntyevent *evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
if (evs == NULL)
{
printf("ntyreactor_alloc ntyevent failedn");
return -2;
}
memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
// 创建事件块
struct eventblock *block = (struct eventblock*)malloc(sizeof(struct eventblock));
if (block == NULL)
{
printf("ntyreactor_alloc eventblock failedn");
return -3;
}
block->events = evs;
block->next = NULL;
//实现扩容
blk->next = block;
reactor->blkcnt ;
return 0;
}
step 5:实现Reactor索引功能
思路:通过fd/MAX先找到fd对应的block号,再通过fd%MAX找到对应的偏移地址。 例如来了个fd=10000,每个块存放的最大item数量MAX=1024,那么fd对应的block序号等于10000/1024=9;偏移量等于1000024=784。这样就可以找到fd对应的数据存放地址item。
代码语言:javascript复制struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd)
{
if (reactor == NULL)
return NULL;
if (reactor->evblks == NULL)
return NULL;
// fd所在block序号
int blkidx = sockfd / MAX_EPOLL_EVENTS;
while (blkidx >= reactor->blkcnt)
{
// 扩容
ntyreactor_alloc(reactor);
}
//找到fd对应block的位置
int i = 0;
struct eventblock *blk = reactor->evblks;
while (i != blkidx && blk != NULL)
{
blk = blk->next;
}
// 返回item 地址
return &blk->events[sockfd%MAX_EPOLL_EVENTS];
}
step 6:实现设置事件信息功能
将事件的相关信息保存到数据结构中。主要实现填充关键信息到event结构体中。
代码语言:javascript复制void nty_event_set(struct ntyevent *ev,int fd,NCALLBACK callback,void *arg)
{
ev->fd = fd;
ev->events = 0;
ev->callback = callback;
ev->arg = arg;
}
step 7:实现IO事件监听功能
这里使用epoll作为IO多路复用器,将事件添加到epoll中监听。 思路:主要是epoll_ctl操作,将事件添加到reactor的event结构体中。
代码语言:javascript复制int nty_event_add(int epfd, int events, struct ntyevent *ev)
{
// 设置epoll事件信息
struct epoll_event ep_ev = {
0,{
0} };
ep_ev.data.ptr = ev;
ep_ev.events = ev->events = events;
// 判断,设置epfd的操作模式
int op;
if (ev->status == 1)
op = EPOLL_CTL_MOD;
else
{
op = EPOLL_CTL_ADD;
ev->status = 1;
}
// 设置epoll
int ret = epoll_ctl(epfd, op, ev->fd, &ep_ev);
if (ret < 0)
{
printf("event add failed [fd=%d], events[%d],ret:%dn", ev->fd, events,ret);
printf("event add failed in %s error %sn", __func__, strerror(errno));
return -1;
}
return 0;
}
step 8:实现IO事件移除功能
由于设置了非阻塞模式,当事件到来时,需要暂时移除监听,避免干扰。
代码语言:javascript复制int nty_event_del(int epfd, struct ntyevent *event)
{
if (event->status != 1)
return -1;
struct epoll_event ep_ev = {
0,{
0} };
ep_ev.data.ptr = event;
event->status = 0;
// 移除fd的监听
epoll_ctl(epfd, EPOLL_CTR_DEL, &ep_ev);
return 0;
}
step 9:实现Reactor事件监听功能
思路:设置fd的事件信息,添加事件到epoll监听。
代码语言:javascript复制int ntyreactor_addlistener(struct ntyreactor *reactor,int sockfd,NCALLBACK *acceptor)
{
if (reactor == NULL)
return -1;
if (reactor->evblks == NULL)
return -1;
// 找到fd对应的event地址
struct ntyevent *event = ntyreactor_idx(reactor, sockfd);
if (event == NULL)
return -1;
// 设置fd的事件信息
nty_event_set(event, sockfd, acceptor, reactor);
// 添加事件到epoll监听
nty_event_add(reactor->epfd, EPOLLIN, event);
return 0;
}
step 10:实现recv回调函数
思路:找到fd对应的信息内存块;使用recv接收数据;暂时移除该事件的监听;如果接收成功,设置监听事件为是否可写,添加到IO多路复用器(epoll)中;返回收到的数据长度。
代码语言:javascript复制int recv_cb(int fd, int events, void *arg)
{
struct ntyreactor *reactor = (struct ntyreactor *)arg;
if (reactor == NULL)
return -1;
// 找到fd对应的event地址
struct ntyevent *event = ntyreactor_idx(reactor, fd);
if (event == NULL)
return -1;
// 接收数据
int len = recv(fd, event->buffer, BUFFER_LENGTH, 0);
// 暂时移除监听
nty_event_del(reactor->epfd, event);
if (len > 0)
{
event->length = len;
event->buffer[len] = '