Reactor 模型
reactor 是一种管理网络 I/O 的模型,我们知道,内核对于网络 I/O 的管理方式是用的 select/poll epoll ,那么应用程序之间可能也需要一种管理 I/O 的方式,reactor 模型就此诞生。
客户端发送数据,内核接受数据返回给应用层,这就好比去会所都要有个老鸨接待,而应用程序也需要一个接待处,可以假想成在应用程序和内核之间加入了一层接待处。
代码实现
reactor 模型,我们既然要管理 I/O ,那我们要怎么管理,如何管理。
首先我们需要确认的是,我们做的是应用层的网络 I/O 管理,除此之外,我们也无法改变内核如何管理 I/O ,因此我们做这个模型,需要借助内核 epoll 的管理方式。
而我们直到 epoll 会将 socket 的描述符加入红黑树 epfd
代码语言:c复制int epfd = epoll_create(1);
然后产生消息的事件会生成就绪队列,我们首先需要一个维护红黑树关系的数据结构,另外需要对每个 socket 描述符进行封装。
数据结构定义
socket 描述符可以封装成一个事件,而红黑树负责管理事件。
代码语言:c复制#define BUFFER_LENGTH 4096
struct ntyevent{
int fd;
int events; //文件符事件
void *arg;
int (*callback)(int fd, int events, void* arg); //回调函数
int status; //状态为 1 表示修改,0 为添加
char buffer[BUFFER_LENGTH]; //缓存
int length;
long last_active;
};
而需要管理这些封装的事件就是反应堆模型。
代码语言:c复制struct ntyreactor{
int epfd; //红黑树管理
struct ntyevent *events //可以使用链表管理事件,也可以使用数组
};
对数据结构的操作
首先就是对 ntyevent 结构体的操作,首先就是对事件类似增删查改的函数。
结构体肯定需要初始化,而我们把这个函数表示为设置。
代码语言:c复制//事件设置
void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK callback, void *arg)
{
//对events赋值
ev->fd = fd;
//回调函数
ev->callback = callback;
ev->events = 0;
ev->arg = arg;
//当前的事件时间
ev->last_active = time(NULL);
return ;
}
这个函数类似很多初始化函数,当然他也承担一部分改的功能。
然后就是一个增一个删的函数,为什么没有查函数,因为只要声明了结构体指针然后直接引用函数内容即可。因此对于结构体来说增删是最需要的。
代码语言:c复制//主体功能将事件添加到 epfd 的红黑树
int nty_event_add(int epfd, int events, struct ntyevent *ev)
{
//创建 epoll_event
struct epoll_event ep_ev = {0, {0}};
//将事件赋值给 epoll_event 和 ev
ep_ev.events = ev->events = events;
int op;
//状态为 1 op 设置为可更改
if(ev->status == 1){
op = EPOLL_CTL_MOD;
}else{
op = EPOLL_CTL_ADD;
ev->status = 1;
}
//epoll 事件的管理
if(epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0){
printf("event add failed[fd = %d], events[%d]n", ev->fd,events);
return -1;
}
return 0;
}
而删除函数为:
代码语言:c复制int nty_event_del(int epfd, struct ntyevent *ev){
struct epoll_event ep_ev = {0, {0}};
if(ev->status != 1){ //状态不是修改
return -1;
}
ep_ev.data.ptr = ev; //将ev添加到 epoll 结构体中
ev->status = 0;
epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);
return 0;
}
说完对与 ntyevent 结构体的管理,那么即使 reactor 结构体管理。
首先就是初始化结构体。
代码语言:c复制#define MAX_EPOLL_EVENTS 1024
int ntyreactor_init(struct ntyreactor *reactor) {
if (reactor == NULL) return -1;
memset(reactor, 0, sizeof(struct ntyreactor));
//创建一个 epfd
reactor->epfd = epoll_create(1);
if (reactor->epfd <= 0) {
printf("create epfd in %s err %sn", __func__, strerror(errno));
return -2;
}
//创建事件数组 也可以使用链表进行管理
reactor->events = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
if (reactor->events == NULL) {
printf("create epfd in %s err %sn", __func__, strerror(errno));
close(reactor->epfd);
return -3;
}
}
初始化好 reactor 结构体,我们然后要确认,在网络 I/O 过程中,我们期望这个结构体能干什么事情。
在一般的网络模型,服务端就是生成 socket, bind 端口,然后listen,有客户端连接需要 accept,同时接受数据,发送数据。listen 是一个状态需要管理,而 accept 表示未接收数据的一个状态,这个状态都是需要管理的,而且状态只会改变,比方说从 listen 变成 accept,因此 reactor 管理跟普通的管理不太一样
创建 socket ,bind 和 listen基本不存在双方交互的一个情况,而另外四个方式都涉及与客户端的交互。
,那么首先就是监听事件添加
代码语言:c复制//添加监听事件
int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NCALLBACK *acceptor) {
if (reactor == NULL) return -1;
if (reactor->events == NULL) return -1;
nty_event_set(&reactor->events[sockfd], sockfd, acceptor, reactor); //设置事件
nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[sockfd]); //将事件添加到 epoll 红黑树 EPOLLIN 表示socket描述符为可读事件
return 0;
}
然后即使监听事件的不同状态的转换
代码语言:c复制int ntyreactor_run(struct ntyreactor *reactor) {
if (reactor == NULL) return -1;
if (reactor->epfd < 0) return -1;
if (reactor->events == NULL) return -1;
//epoll事件
struct epoll_event events[MAX_EPOLL_EVENTS 1];
int checkpos = 0, i;
while (1) {
long now = time(NULL); //当前时间
for (i = 0;i < 100; i , checkpos ) { //每次检查 100个 socket 描述符
if (checkpos == MAX_EPOLL_EVENTS) { //当循环到时间是最大时间
checkpos = 0;
}
if (reactor->events[checkpos].status != 1) { //表示状态不是添加,时间是修改
continue;
}
long duration = now - reactor->events[checkpos].last_active;//时间上次活跃时间
if (duration >= 60) { //两个时间间隔超过 60s
close(reactor->events[checkpos].fd); //关闭 socket 描述符
printf("[fd=%d] timeoutn", reactor->events[checkpos].fd);
nty_event_del(reactor->epfd, &reactor->events[checkpos]); //将socket描述符删除
}
}
int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000); //从红黑树中获取就绪数据数
if (nready < 0) { //没有就绪时间
printf("epoll_wait error, exitn");
continue;
}
for (i = 0;i < nready;i ) {
struct ntyevent *ev = (struct ntyevent*)events[i].data.ptr; // epoll 的时间
if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) { //事件为可读
ev->callback(ev->fd, events[i].events, ev->arg); //使用回调函数
}
if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) { //事件为可写
ev->callback(ev->fd, events[i].events, ev->arg); //使用对应回调回到应用程序处理事件
}
}
}
}
然后就是数据结构的销毁。
代码语言:c复制int ntyreactor_destory(struct ntyreactor *reactor) {
close(reactor->epfd);
free(reactor->events);
}
比较简单,就是关闭 epoll 的描述符,然后释放内存。
回调函数
回调函数使用内核将事件复制到应用程序,我们需要处理的逻辑,就网络 I/O 来说,从服务器来说,我们从监听开始,因此第一个函数就是 accept 函数。
代码语言:c复制typedef int NCALLBACK(int, int, void*);//基础函数模型
//找出所有的可读的 accept 事件,然后将事件下一个回调设置为 recv 然后将事件添加到红黑树中
int accept_cb(int cb, int events, void*arg)
{
struct ntyreactor *reactor = (struct ntyreactor *)arg; //通过 arg 表示reactor 数据结构体
if(reactor == NULL) return -1;
struct sockaddr_in client_addr; //初始化接收需要的数据结构
socklen_t len = sizeof(client_addr);
int clientfd;
if((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1){ //接受函数
if(errno != EAGAIN && errno != ENTER){ //错误处理
printf("accept: %sn", strerror(errno));
return -1;
}
}
int i = 0;
do{
for(i = 0; i < MAX_EPOLL_EVENTS; i ){
if(reactor->events[i].status == 0){ 找到 epoll 事件中第一个事件为 EPOLLIN 可读事件
break;
}
}
if (i == MAX_EPOLL_EVENTS) { //没有相关的事件
printf("%s: max connect limit[%d]n", __func__, MAX_EPOLL_EVENTS);
break;
}
int flag = 0;
if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) { //设置为非阻塞 I/O
printf("%s: fcntl nonblocking failed, %dn", __func__, MAX_EPOLL_EVENTS);
break;
}
nty_event_set(&reactor->events[clientfd], clientfd, recv_cb, reactor); //accept 之后要将事件下一个状态设置为recv
nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]); //将事件添加到 epfd 中
} while (0);
printf("new connect [%s:%d][time:%ld], pos[%d]n",
inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), reactor->events[i].last_active, i);
return 0;
}
accept 之后就是 recv 函数,recv 的回调函数为
代码语言:c复制int recv_cb(int fd, int events, void * args)
{
struct ntyreactor *reactor = (struct ntyreactor *)arg; //传入 reactor
struct ntyevent *ev = reactor->events fd; //传入 events 的位置,因为使用数组,链表就要进行遍历了
int len = recv(fd, ev->buffer, BUFFER_LENGTH, 0); //接受数据
nty_event_del(reactor->epfd, ev); //将事件从红黑树中删除
if(len > 0){ //接受到数据
ev->length = len;
ev->buffer[len] = '