Reactor 模型

2024-05-20 22:47:36 浏览数 (5)

Reactor 模型

reactor 是一种管理网络 I/O 的模型,我们知道,内核对于网络 I/O 的管理方式是用的 select/poll epoll ,那么应用程序之间可能也需要一种管理 I/O 的方式,reactor 模型就此诞生。

应用程序应用程序

客户端发送数据,内核接受数据返回给应用层,这就好比去会所都要有个老鸨接待,而应用程序也需要一个接待处,可以假想成在应用程序和内核之间加入了一层接待处。

代码实现

reactor 模型,我们既然要管理 I/O ,那我们要怎么管理,如何管理。

首先我们需要确认的是,我们做的是应用层的网络 I/O 管理,除此之外,我们也无法改变内核如何管理 I/O ,因此我们做这个模型,需要借助内核 epoll 的管理方式。

而我们直到 epoll 会将 socket 的描述符加入红黑树 epfd

代码语言:c复制
int epfd = epoll_create(1);

然后产生消息的事件会生成就绪队列,我们首先需要一个维护红黑树关系的数据结构,另外需要对每个 socket 描述符进行封装。

数据结构定义

socket 描述符可以封装成一个事件,而红黑树负责管理事件。

代码语言:c复制
#define BUFFER_LENGTH    4096
struct ntyevent{
	int fd;
	int events;                //文件符事件
	void *arg;
	int (*callback)(int fd, int events, void* arg);  //回调函数

	int status;                           //状态为 1 表示修改,0 为添加
	char buffer[BUFFER_LENGTH];   //缓存
	int length;
	long last_active;
};

而需要管理这些封装的事件就是反应堆模型。

代码语言:c复制
struct ntyreactor{
	int epfd;                         //红黑树管理
	struct ntyevent *events //可以使用链表管理事件,也可以使用数组
};

对数据结构的操作

首先就是对 ntyevent 结构体的操作,首先就是对事件类似增删查改的函数。

结构体肯定需要初始化,而我们把这个函数表示为设置。

代码语言:c复制
//事件设置
void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK callback, void *arg)
{
     //对events赋值
     ev->fd = fd;
	 //回调函数
	 ev->callback = callback;
	 ev->events = 0;
	 ev->arg = arg;
	 //当前的事件时间
	 ev->last_active = time(NULL);

	 return ;
}

这个函数类似很多初始化函数,当然他也承担一部分改的功能。

然后就是一个增一个删的函数,为什么没有查函数,因为只要声明了结构体指针然后直接引用函数内容即可。因此对于结构体来说增删是最需要的。

代码语言:c复制
//主体功能将事件添加到 epfd 的红黑树
int nty_event_add(int epfd, int events, struct ntyevent *ev)
{
    //创建 epoll_event
    struct epoll_event ep_ev = {0, {0}};
	//将事件赋值给 epoll_event 和 ev
	ep_ev.events = ev->events = events;

	int op;
	//状态为 1 op 设置为可更改
	if(ev->status == 1){
		op = EPOLL_CTL_MOD;
	}else{
	    op = EPOLL_CTL_ADD;
		ev->status = 1;
	}

    //epoll 事件的管理
	if(epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0){
		printf("event add failed[fd = %d], events[%d]n", ev->fd,events);
		return -1;
	}

	return 0;
}

而删除函数为:

代码语言:c复制
int nty_event_del(int epfd, struct ntyevent *ev){
    struct epoll_event ep_ev = {0, {0}};

	if(ev->status != 1){  //状态不是修改
		return -1;
	}

	ep_ev.data.ptr = ev;  //将ev添加到 epoll 结构体中
	ev->status = 0;
	epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);

	return 0;
}

说完对与 ntyevent 结构体的管理,那么即使 reactor 结构体管理。

首先就是初始化结构体。

代码语言:c复制
#define MAX_EPOLL_EVENTS 1024
int ntyreactor_init(struct ntyreactor *reactor) {

	if (reactor == NULL) return -1;
	memset(reactor, 0, sizeof(struct ntyreactor));

    //创建一个 epfd 
	reactor->epfd = epoll_create(1);
	if (reactor->epfd <= 0) {
		printf("create epfd in %s err %sn", __func__, strerror(errno));
		return -2;
	}

    //创建事件数组  也可以使用链表进行管理
	reactor->events = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
	if (reactor->events == NULL) {
		printf("create epfd in %s err %sn", __func__, strerror(errno));
		close(reactor->epfd);
		return -3;
	}
}

初始化好 reactor 结构体,我们然后要确认,在网络 I/O 过程中,我们期望这个结构体能干什么事情。

在一般的网络模型,服务端就是生成 socket, bind 端口,然后listen,有客户端连接需要 accept,同时接受数据,发送数据。listen 是一个状态需要管理,而 accept 表示未接收数据的一个状态,这个状态都是需要管理的,而且状态只会改变,比方说从 listen 变成 accept,因此 reactor 管理跟普通的管理不太一样

创建 socket ,bind 和 listen基本不存在双方交互的一个情况,而另外四个方式都涉及与客户端的交互。

,那么首先就是监听事件添加

代码语言:c复制
//添加监听事件
int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NCALLBACK *acceptor) {  

	if (reactor == NULL) return -1;
	if (reactor->events == NULL) return -1;

	nty_event_set(&reactor->events[sockfd], sockfd, acceptor, reactor); //设置事件
	nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[sockfd]); //将事件添加到 epoll 红黑树  EPOLLIN 表示socket描述符为可读事件
 
	return 0;
}

然后即使监听事件的不同状态的转换

代码语言:c复制
int ntyreactor_run(struct ntyreactor *reactor) {
	if (reactor == NULL) return -1;
	if (reactor->epfd < 0) return -1;
	if (reactor->events == NULL) return -1;
	
    //epoll事件
	struct epoll_event events[MAX_EPOLL_EVENTS 1];
	
	int checkpos = 0, i;

	while (1) {

		long now = time(NULL);  //当前时间
		for (i = 0;i < 100; i  , checkpos  ) {  //每次检查 100个 socket 描述符
			if (checkpos == MAX_EPOLL_EVENTS) {  //当循环到时间是最大时间
				checkpos = 0;
			}

			if (reactor->events[checkpos].status != 1) { //表示状态不是添加,时间是修改
				continue;
			}

			long duration = now - reactor->events[checkpos].last_active;//时间上次活跃时间

			if (duration >= 60) {  //两个时间间隔超过 60s
				close(reactor->events[checkpos].fd);  //关闭 socket 描述符
				printf("[fd=%d] timeoutn", reactor->events[checkpos].fd);
				nty_event_del(reactor->epfd, &reactor->events[checkpos]); //将socket描述符删除
			}
		}


		int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);  //从红黑树中获取就绪数据数
		if (nready < 0) {  //没有就绪时间
			printf("epoll_wait error, exitn");
			continue;
		}

		for (i = 0;i < nready;i   ) {

			struct ntyevent *ev = (struct ntyevent*)events[i].data.ptr; // epoll 的时间

			if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) { //事件为可读
				ev->callback(ev->fd, events[i].events, ev->arg);  //使用回调函数
			}
			if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) { //事件为可写
				ev->callback(ev->fd, events[i].events, ev->arg); //使用对应回调回到应用程序处理事件
			}
			
		}

	}
}

然后就是数据结构的销毁。

代码语言:c复制
int ntyreactor_destory(struct ntyreactor *reactor) {

	close(reactor->epfd);
	free(reactor->events);

}

比较简单,就是关闭 epoll 的描述符,然后释放内存。

回调函数

回调函数使用内核将事件复制到应用程序,我们需要处理的逻辑,就网络 I/O 来说,从服务器来说,我们从监听开始,因此第一个函数就是 accept 函数。

代码语言:c复制
typedef int NCALLBACK(int, int, void*);//基础函数模型
//找出所有的可读的 accept 事件,然后将事件下一个回调设置为 recv 然后将事件添加到红黑树中
int accept_cb(int cb, int events, void*arg)
{
    struct ntyreactor *reactor = (struct ntyreactor *)arg; //通过 arg 表示reactor 数据结构体
	if(reactor == NULL) return -1;

	struct sockaddr_in client_addr;   //初始化接收需要的数据结构
	socklen_t len = sizeof(client_addr);

	int clientfd;

	if((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1){  //接受函数
		if(errno != EAGAIN && errno != ENTER){  //错误处理
			printf("accept: %sn", strerror(errno));
			return -1;
		}
	}

	int i = 0;
	do{
		for(i = 0; i < MAX_EPOLL_EVENTS; i  ){
			if(reactor->events[i].status == 0){  找到 epoll 事件中第一个事件为 EPOLLIN 可读事件
				break;
			}
		}

		
        if (i == MAX_EPOLL_EVENTS) { //没有相关的事件
			printf("%s: max connect limit[%d]n", __func__, MAX_EPOLL_EVENTS);
			break;
		}

		int flag = 0;
		if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {  //设置为非阻塞 I/O
			printf("%s: fcntl nonblocking failed, %dn", __func__, MAX_EPOLL_EVENTS);
			break;
		}

		nty_event_set(&reactor->events[clientfd], clientfd, recv_cb, reactor);  //accept 之后要将事件下一个状态设置为recv
		nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]); //将事件添加到 epfd 中

	} while (0);

	printf("new connect [%s:%d][time:%ld], pos[%d]n", 
		inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), reactor->events[i].last_active, i);

	return 0;

}

accept 之后就是 recv 函数,recv 的回调函数为

代码语言:c复制
int recv_cb(int fd, int events, void * args)
{
    struct ntyreactor *reactor = (struct ntyreactor *)arg;  //传入 reactor
	struct ntyevent *ev = reactor->events   fd; //传入 events 的位置,因为使用数组,链表就要进行遍历了

	int len = recv(fd, ev->buffer, BUFFER_LENGTH, 0);  //接受数据
	nty_event_del(reactor->epfd, ev);  //将事件从红黑树中删除

	if(len > 0){ //接受到数据
		ev->length = len;
		ev->buffer[len] = '';

		printf("C[%d]:%sn", fd, ev->buffer); //打印接受数据
		nty_event_set(ev, fd, send_cb, reactor);  //设置数据为send 发送事件
		nty_event_add(reactor->epfd, EPOLLOUT, ev); //添加事件为可写事件
	}else if(len === 0){//没有数据
		close(ev->fd);
		printf("[fd=%d] pos[%d], closedn", fd, ev->reactor->events);
	}else{//错误
		close(ev->fd);
		printf("recv[fd=%d] error[%d]:%sn", fd, errno, strerror(errno));
	}

	return len;
}

最后就是发送的回调函数。

代码语言:c复制
int send_cb(int fd, int events, void * args)  //发送回调函数
{
    struct ntyreactor *reactor = (struct ntyreactor *)arg;
	struct ntyevent *ev = reactor->events   fd;

	int len = send(fd, ev->buffer, ev->length, 0);  //发送数据
	if(len > 0){
		printf("send[fd=%d], [%d]%sn", fd, len, ev->buffer);

	    nty_event_del(reactor->epfd, ev);  //socket描述符删除
	    nty_event_set(ev, fd, recv_cb, reactor); //因为不确定是否接受完毕,所以还要接收
	    nty_event_add(reactor->epfd, EPOLLOUT, ev);  //添加到epoll 描述符中
	}else{//接受完毕或者错误,直接删除数据
		close(ev->fd);
		nty_event_del(reactor->epfd, ev);
		printf("send[fd=%d] error %sn", fd, strerror(errno));
	}

	return len;
}

从服务端最基础做起

上述就是利用 Linux 内核 epoll 对网络 I/O 进行管理的状态基本已经做完,而作为服务端,最开始就是要创建 socket 这些老套的东西。

代码语言:c复制
int init_sock(short port) {

	int fd = socket(AF_INET, SOCK_STREAM, 0); //创建socket
	fcntl(fd, F_SETFL, O_NONBLOCK); //设置非阻塞 I/O

	struct sockaddr_in server_addr;
	memset(&server_addr, 0, sizeof(server_addr));
	server_addr.sin_family = AF_INET;
	server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
	server_addr.sin_port = htons(port);

	bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr)); //绑定socket

	if (listen(fd, 20) < 0) { //监听接受队列的长度
		printf("listen failed : %sn", strerror(errno));
	}

	return fd;  //返回连接符
}

最后

最后就是一个总体函数使用,main 函数来了。

代码语言:c复制
int main(int argc, char *argv[]) {

	unsigned short port = SERVER_PORT;
	if (argc == 2) {
		port = atoi(argv[1]);
	}

	int sockfd = init_sock(port);  //获取连接符

    //分配内存
	struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor));
	ntyreactor_init(reactor);

	//设置其中数据然后将 socket 添加到相应的管理中
	ntyreactor_addlistener(reactor, sockfd, accept_cb);
	ntyreactor_run(reactor);

	ntyreactor_destory(reactor);
	close(sockfd);
	

	return 0;
}

综上所述,其实不管是什么模型,其实就是定义数据结构后,首先对数据结构进行管理和业务逻辑,然后跟我们操作系统机制进行交互。当然对于 Java 这种封装比较完善的语言,可能就主要就是数据结构管理然后加上业务逻辑。和操作系统管理可能就是跟另一个数据结构进行交互了。

1 人点赞