Haproxy事件循环

2022-10-27 09:40:41 浏览数 (2)

在haproxy启动的时候,main方法会在socket建立连接之后调用run_poll_loop方法进行事件循环处理:

代码语言:javascript复制
static void run_poll_loop()
{
	int next, wake;

	tv_update_date(0,1);
	while (1) {
		wake_expired_tasks();

		/* Process a few tasks */
		process_runnable_tasks();

		/* check if we caught some signals and process them in the
		 first thread */
		if (tid == 0)
			signal_process_queue();

		/* stop when there's nothing left to do */
		if ((jobs - unstoppable_jobs) == 0)
			break;

		/* also stop  if we failed to cleanly stop all tasks */
		if (killed > 1)
			break;

		/* expire immediately if events are pending */
		wake = 1;
		if (thread_has_tasks())
			activity[tid].wake_tasks  ;
		else if (signal_queue_len && tid == 0)
			activity[tid].wake_signal  ;
		else {
			_HA_ATOMIC_OR(&sleeping_thread_mask, tid_bit);
			__ha_barrier_atomic_store();
			if ((global_tasks_mask & tid_bit) || thread_has_tasks()) {
				activity[tid].wake_tasks  ;
				_HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
			} else
				wake = 0;
		}

		/* If we have to sleep, measure how long */
		next = wake ? TICK_ETERNITY : next_timer_expiry();

		/* The poller will ensure it returns around <next> */
		cur_poller.poll(&cur_poller, next, wake);

		activity[tid].loops  ;
	}
}

处理信号

haproxy 封装了自己的信号处理机制。接受到信号之后,将该信号放到信号队列中。signal_register_fct,signal_register_task接口提供了注册函数回调和任务类型回调两种方式。在程序运行到signal_process_queue() 时处理所有位于信号队列中的信号。

代码语言:javascript复制
void __signal_process_queue()
{
	int sig, cur_pos = 0;
	struct signal_descriptor *desc;
	sigset_t old_sig;

	/* block signal delivery during processing */
	ha_sigmask(SIG_SETMASK, &blocked_sig, &old_sig);

	/* It is important that we scan the queue forwards so that we can
	 * catch any signal that would have been queued by another signal
	 * handler. That allows real signal handlers to redistribute signals
	 * to tasks subscribed to signal zero.
	 */
	for (cur_pos = 0; cur_pos < signal_queue_len; cur_pos  ) {
		sig  = signal_queue[cur_pos];
		desc = &signal_state[sig];
		if (desc->count) {
			struct sig_handler *sh, *shb;
			list_for_each_entry_safe(sh, shb, &desc->handlers, list) {
				if ((sh->flags & SIG_F_TYPE_FCT) && sh->handler)
					((void (*)(struct sig_handler *))sh->handler)(sh);
				else if ((sh->flags & SIG_F_TYPE_TASK) && sh->handler)
					task_wakeup(sh->handler, TASK_WOKEN_SIGNAL);
			}
			desc->count = 0;
		}
	}
	signal_queue_len = 0;

	/* restore signal delivery */
	ha_sigmask(SIG_SETMASK, &old_sig, NULL);
}

信号注册时注册SIG_F_TYPE_FCT标识则直接调用信号回调处理;SIG_F_TYPE_TASK标识说明注册时回调函数是一个task指针,这时需要唤醒task,并指明任务状态为TASK_WOKEN_SIGNAL,此后对应处理函数将在task管理下处理。下面来看看task管理。

唤醒超时任务

haproxy 的顶层处理逻辑是 task,task 上存储着要处理的任务的全部信息。task 的管理是采用ebtree树形队列方式,同时分为 wait queue 和 run queue。顾名思义,wait queue 是需要等 待一定时间的 task 的集合,而 run queue 则代表需要立即执行的 task 的集合。

该函数就是检查 wait queue 中那些超时的任务,并将其放到 run queue 中。haproxy 在 执行的过程中,会因为一些情况导致需要将当前的任务通过调用 task_queue 等接口放到 wait queue 中。

代码语言:javascript复制
while (1) {
lookup_next_local:
	eb = eb32_lookup_ge(&tt->timers, now_ms - TIMER_LOOK_BACK);
	if (!eb) {
		/* we might have reached the end of the tree, typically because
		* <now_ms> is in the first half and we're first scanning the last
		* half. Let's loop back to the beginning of the tree now.
		*/
		eb = eb32_first(&tt->timers);
		if (likely(!eb))
			break;
	}

	if (tick_is_lt(now_ms, eb->key))
		break;

	/* timer looks expired, detach it from the queue */
	task = eb32_entry(eb, struct task, wq);
	__task_unlink_wq(task);

	/* It is possible that this task was left at an earlier place in the
	 * tree because a recent call to task_queue() has not moved it. This
	 * happens when the new expiration date is later than the old one.
	 * Since it is very unlikely that we reach a timeout anyway, it's a
	 * lot cheaper to proceed like this because we almost never update
	 * the tree. We may also find disabled expiration dates there. Since
	 * we have detached the task from the tree, we simply call task_queue
	 * to take care of this. Note that we might occasionally requeue it at
	 * the same place, before <eb>, so we have to check if this happens,
	 * and adjust <eb>, otherwise we may skip it which is not what we want.
	 * We may also not requeue the task (and not point eb at it) if its
	 * expiration time is not set.
	 */
	if (!tick_is_expired(task->expire, now_ms)) {
		if (tick_isset(task->expire))
			__task_queue(task, &tt->timers);
		goto lookup_next_local;
	}
	task_wakeup(task, TASK_WOKEN_TIMER);
}

处理可运行的任务

处理位于 run queue 中的任务。

前面提到,wake_expired_tasks 可能将一些超时的任务放到 run queue 中。此外,haproxy 执行的过程中,还有可能通过调用 task_wakeup 直接讲某个 task 放到 run queue 中,这代表程序希望该任务下次尽可能快的被执行。

对于TCP或者HTTP业务流量的处理,该函数最终通过调用 process_session 来完成,包括解析已经接收到的数据, 并执行一系列 load balance 的特性,但不负责从 socket 收发数据,数据收发由poll完成。同时,也会因为一些情况导致需要将当前的任务通过调用 task_queue 等接口放到 wait queue 中,实现上在任务回调处理时返回非空任务则会把任务重新加入wait queue。

haproxy 中用 jobs 记录当前要处理的任务总数, 如果 jobs 为 0 的话,通常意味着 haproxy 要退出了,因为连 listener 都要释放了。jobs 的数值通常在 process_session 时更新。

poll消息驱动

haproxy 启动阶段,会检测当前系统可以启用那种异步处理的机制,比如 select、poll、 epoll、kqueue 等,并注册对应 poller 的 poll 方法。epoll 的相关函数接口在 ev_epoll.c 中。

这里就是执行已经注册的 poller 的 poll 方法,主要功能就是获取所有活动的 fd,并 调用对应的 handler,完成接受新建连接、数据收发等功能。

poller的poll方法执行时,程序会将某些符合条件以便再次执行 IO 处理的的fd放到 fd_cache中,之后fd_process_cached_events () 函数会再次执行这些fd的io handler。

可以大家有个疑问,这个poll方法到底是执行的哪个?下面我们一一道来

我们注意到工程中有这样的问题ev_xx.c,这里以ev_epoll.c为例,有这样一段代码

代码语言:javascript复制
__attribute__((constructor))
static void _do_register(void)
{
	struct poller *p;
	int i;

	if (nbpollers >= MAX_POLLERS)
		return;

	for (i = 0; i < MAX_THREADS; i  )
		epoll_fd[i] = -1;

	p = &pollers[nbpollers  ];

	p->name = "epoll";
	p->pref = 300;
	p->flags = HAP_POLL_F_ERRHUP; // note: RDHUP might be dynamically added
	p->private = NULL;

	p->clo  = __fd_clo;
	p->test = _do_test;
	p->init = _do_init;
	p->term = _do_term;
	p->poll = _do_poll;
	p->fork = _do_fork;
}

该方法会在main方法之前被自动执行,这里我们可以看到p->poll = _do_poll; 那么上面的poll对应的就是这个_do_poll。

下面进入该方法,看下具体是怎么执行的:

代码语言:javascript复制
/*
 * Linux epoll() poller
 */
REGPRM3 static void _do_poll(struct poller *p, int exp, int wake)
{
	int status;
	int fd;
	int count;
	int updt_idx;
	int wait_time;
	int old_fd;

	/* first, scan the update list to find polling changes */
	for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx  ) {
		fd = fd_updt[updt_idx];

		_HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit);
		if (!fdtab[fd].owner) {
			activity[tid].poll_drop  ;
			continue;
		}

		_update_fd(fd);
	}
	fd_nbupdt = 0;
	/* Scan the global update list */
	for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
		if (fd == -2) {
			fd = old_fd;
			continue;
		}
		else if (fd <= -3)
			fd = -fd -4;
		if (fd == -1)
			break;
		if (fdtab[fd].update_mask & tid_bit)
			done_update_polling(fd);
		else
			continue;
		if (!fdtab[fd].owner)
			continue;
		_update_fd(fd);
	}

	thread_harmless_now();

	/* now let's wait for polled events */
	wait_time = wake ? 0 : compute_poll_timeout(exp);
	tv_entering_poll();
	activity_count_runtime();
	do {
		int timeout = (global.tune.options & GTUNE_BUSY_POLLING) ? 0 : wait_time;

		status = epoll_wait(epoll_fd[tid], epoll_events, global.tune.maxpollevents, timeout);
		tv_update_date(timeout, status);

		if (status)
			break;
		if (timeout || !wait_time)
			break;
		if (signal_queue_len || wake)
			break;
		if (tick_isset(exp) && tick_is_expired(exp, now_ms))
			break;
	} while (1);

	tv_leaving_poll(wait_time, status);

	thread_harmless_end();
	if (sleeping_thread_mask & tid_bit)
		_HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);

	/* process polled events */

	for (count = 0; count < status; count  ) {
		unsigned int n;
		unsigned int e = epoll_events[count].events;
		fd = epoll_events[count].data.fd;

		if (!fdtab[fd].owner) {
			activity[tid].poll_dead  ;
			continue;
		}

		if (!(fdtab[fd].thread_mask & tid_bit)) {
			/* FD has been migrated */
			activity[tid].poll_skip  ;
			epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev);
			_HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
			_HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
			continue;
		}

		n = ((e & EPOLLIN)    ? FD_EV_READY_R : 0) |
		    ((e & EPOLLOUT)   ? FD_EV_READY_W : 0) |
		    ((e & EPOLLRDHUP) ? FD_EV_SHUT_R  : 0) |
		    ((e & EPOLLHUP)   ? FD_EV_SHUT_RW : 0) |
		    ((e & EPOLLERR)   ? FD_EV_ERR_RW  : 0);

		if ((e & EPOLLRDHUP) && !(cur_poller.flags & HAP_POLL_F_RDHUP))
			_HA_ATOMIC_OR(&cur_poller.flags, HAP_POLL_F_RDHUP);

		fd_update_events(fd, n);
	}
	/* the caller will take care of cached events */
}

该函数可以粗略分为三部分:

  • 检查 fd 更新列表,获取各个 fd event 的变化情况,并作 epoll 的设置
  • 计算 epoll_wait 的 delay 时间,并调用 epoll_wait,获取活动的 fd
  • 逐一处理所有有 IO 事件的 fd

0 人点赞