System|多路复用IO|Kqueue事件通知

2021-11-22 10:26:13 浏览数 (1)

Kqueue和其他的多路复用IO的核心是,单消费者同时监听不同种类的生产者,从而提供高性能的单线程IO,减少调度开销。而Kqueue通过在内核态维持状态提供了更高的性能。

生产者消费者模型

单Producer和单Consumer

生产者/消费者模型是常见的通信模型,通过共享内核缓冲区环形队列,实现异步的事件通知。双方只关注缓冲区内的数据,而不关注彼此,因此常常被用于网络通信。

信号量

为了避免消费者在缓存区未满时无意义的轮询,消费者block直到生产者通知。wait时线程设置信号量并且block,notify时内核通知所有等待信号的线程状态改为RUNNABLE。

事实上就是Linux的pthread_cond_wait和phread_cond_signal原语。consumer之所以要带锁wait,是因为在内部进行调度yield_wait前要放掉锁,否则其他线程无法进入临界区;唤醒之后重新获得锁。(这里指的锁是外部事务的锁)

wait和notify需要增加锁,防止notify先于wait进行。(这里的锁指的是内部事务的锁)

wait调用的yield_wait在调度时需要临时释放并随后获取内部事务锁,否则会阻塞其他的notify造成全员block。

代码语言:javascript复制
 send(bb, msg):
    acquire(bb.lock)
    while True:
      if bb.in - bb.out < N:
        bb.buf[bb.in mod N] <- msg
        bb.in <- bb.in   1
        release(bb.lock)
        notify(bb.not_empty)
        return
    wait(bb.not_full, bb.block)

 receive(bb):
    acquire(bb.lock)
    while True:
      if bb.in > bb.out:
        msg <- bb.buf[bb.out mod N]
        bb.out <- bb.out   1
        release(bb.lock)
        wait(bb.not_full)
        return
    wait(bb.not_empty, bb.block)

Eventcount & Sequencer

这是1979年提出的算法,作为信号量的可替换实现。Sequencer的目的是处理多producer。

semaphores

代码语言:javascript复制
send(Buffer& buffer,Message msg) {
  t=TICKET(T);
  AWAIT(buffer.in, t);
  AWAIT(buffer.out, READ(buffer.in)-N);
  buffer[READ(buffer.in)%N]=msg;
  ADVANCE(in);
}

receive(Buffer& buffer) {
 AWAIT(buffer.in, READ(buffer.out));
 msg = buffer[READ(buffer.out)%N];
 ADVANCE(buffer.out);
 return msg;
}
  • AWAIT(event*,val) - 比较event.count和val,如果大于则返回,否则存入线程TCB并yield
  • ADVANCE(event*) - 自增event.count并将所有同event且event.count>val的线程唤醒
  • TICKET(sequencer*) - 原子性自增序号,目的是处理并发的sender
  • READ(event*) - 原子性读event.count,因为可能读操作涉及多memory cell

send等待in超过ticket,相当于拿排队锁轮到自己。然后等待缓存区未满时写入数据。

receive等待缓冲区存在数据时读取数据。


Kqueue

https://people.freebsd.org/~jlemon/papers/kqueue.pdf

问题在于,上面提到的做法本质上都是监听着一个事件,如果我们想要处理多个监听事件,操作系统必须提供新的原语,例如每个socket都对应着一个file descriptor,需要同时监听所有socket的事件。BSD的Kqueue和Linux的enpoll都是解决这种问题的方式,本质上它们就是IPC,但是单纯从IO的角度看叫做多路复用IO。目前epoll用于netty的底层,是单线程实现高性能网络的关键。

传统的select和poll仅仅适用于file descriptor,但是无法关注其他IPC机制,例如信号、文件系统变化、异步IO完成、进程存在;并且也不具备scalability。

第一个问题在于参数传递,每次都必须传递整个事件组,并且动态在内核中创建和销毁内存。第二个问题在于内核必须遍历整个fd列表去找活跃的fd。初始遍历一次确定没有active的fd才能沉睡,如果没有active还要再遍历一次设定回调来唤醒,最后唤醒时还要再遍历一次来看是哪个fd唤醒了。

问题出在这个syscall无状态上,无法利用之前的信息,每次都得重新计算。因此Kqueue的机制就在于内核中维持一个队列储存状态。

代码语言:javascript复制
int
kqueue(void);
int
kevent(int kq,const struct kevent *changelist, int nchanges,
struct kevent *eventlist, int nevents,
const struct timespec *timeout);

struct kevent{  
uintpt t ident; // 事件关注对象的ID,kq,ident,filter确定唯一的event
short filter; // 事件类型,ident,fflags,data应该如何被解释?
u short flags; // 输入: 增加/减少,使能/禁止, 执行后重置/删除;输出: 发生EOF或者ERROR
u int fflags; // 活跃时应该怎么做,是否返回event?
intptr t data; // filter和fflags规定的数据传输方式
void *udata; // 自定义的数据传输方式
__uint64_t	ext[4]; //在末尾增加的额外信息Hint
}

EV_SET(&kev, ident, filter, flags, fflags, data, udata);

kevent()用于创建kqueue并且返回对应的capability(权限控制的抽象)。

kevent()用于注册event,并设定超时,changelist是指kqueue注册的event如何变化,eventlist则是返回的event。当event触发时,会调用内核的回调函数,通知进程。

filter

  • EVFILT READ :poll近似的实现,当socket_buffer大于SO_LOWAT时触发将size写入data或者断连时触发EOF,帮助应用处理数据。
  • EVFILT WRITE: 类似READ
  • EVFILT AIO: aio_read/write请求后通过事件进行aio_error轮询,事件返回后aio_return
  • EVFILT SIGNAL: id为信号值,返回data为信号计数,通知后clear
  • EVFILT VNODE: 监听文件系统vnode,id为fd, fflags监听下列事件并返回所有发生事件
代码语言:javascript复制
NOTE DELETE
NOTE WRITE
NOTE EXTEND
NOTE ATTRIB
NOTE LINK
NOTE RENAME
  • EVFILT PROC:监听进程状态,id为PID,fflags监听下列事件
代码语言:javascript复制
NOTE EXIT/FORK/EXEC 监听exit,fork,execve等原语
NOTE TRACK 若父进程设定为Track则fork后子进程为CHILD
输出:
NOTE CHILD 子进程fork后设定child,并且父进程id存入data
NOTE TRACKERR 无法添加子进程事件,通常因为资源限制

sample

代码语言:javascript复制
handle_events()
{
int i, n;
struct timespec timeout =
{ TMOUT_SEC, TMOUT_NSEC };
n = kevent(kq, ch, nchanges,
evi, nevents, &timeout);
if (n <= 0)
   goto error_or_timeout;
for (i = 0; i < n; i  ) {
if (evi.flags & EV_ERROR)
/* error */
if (evi.filter == EVFILT_READ)
readable_fd(evi.ident);
if (evi.filter == EVFILT_WRITE)
writeable_fd(evi.ident);
}
...
}
update_fd(int fd, int action,int filter)
{
EV_SET(&chnchanges, fd, filter,action == ADD ? 
EV_ADD : EV_DELETE,
0, 0, 0);
nchanges  ;
}

Kqueue实现

Knote

  • 计算当前节点的活跃度
  • 链接其他knote
  • 存储自己所在的Kqueue的指针
代码语言:javascript复制
struct knote {
	SLIST_ENTRY(knote)	kn_link;	/* for kq */
	SLIST_ENTRY(knote)	kn_selnext;	/* for struct selinfo */
	struct			knlist *kn_knlist;	/* f_attach populated */
	TAILQ_ENTRY(knote)	kn_tqe;
	struct			kqueue *kn_kq;	/* which queue we are on */
	struct 			kevent kn_kevent;
	void			*kn_hook;
	int			kn_hookid;
	int			kn_status;	/* protected by kq lock */
#define KN_ACTIVE	0x01			/* event has been triggered */
#define KN_QUEUED	0x02			/* event is on queue */
#define KN_DISABLED	0x04			/* event is disabled */
#define KN_DETACHED	0x08			/* knote is detached */
#define KN_MARKER	0x20			/* ignore this knote */
#define KN_KQUEUE	0x40			/* this knote belongs to a kq */
#define	KN_SCAN		0x100			/* flux set in kqueue_scan() */
	int			kn_influx;
	int			kn_sfflags;	/* saved filter flags */
	int64_t			kn_sdata;	/* saved data field */
	union {
		struct		file *p_fp;	/* file data pointer */
		struct		proc *p_proc;	/* proc pointer */
		struct		kaiocb *p_aio;	/* AIO job pointer */
		struct		aioliojob *p_lio;	/* LIO job pointer */
		void		*p_v;		/* generic other pointer */
	} kn_ptr;
	struct			filterops *kn_fop;

#define kn_id		kn_kevent.ident
#define kn_filter	kn_kevent.filter
#define kn_flags	kn_kevent.flags
#define kn_fflags	kn_kevent.fflags
#define kn_data		kn_kevent.data
#define kn_fp		kn_ptr.p_fp
};

Kqueue

  • kp_knlist存所有knode用于GC
  • kp_head存存储所有标记为active的knode
  • kq_knhash存储iden->descriptor的映射
  • kq_fdp fd索引的数组(同open file table)用于关闭fd时删除对应的knode
代码语言:javascript复制
struct kqueue {
	struct		mtx kq_lock;
	int		kq_refcnt;
	TAILQ_ENTRY(kqueue)	kq_list;
	TAILQ_HEAD(, knote)	kq_head;	/* list of pending event */
	int		kq_count;		/* number of pending events */
	struct		selinfo kq_sel;
	struct		sigio *kq_sigio;
	struct		filedesc *kq_fdp;
	int		kq_state;
#define KQ_SEL		0x01
#define KQ_SLEEP	0x02
#define KQ_FLUXWAIT	0x04			/* waiting for a in flux kn */
#define KQ_ASYNC	0x08
#define KQ_CLOSING	0x10
#define	KQ_TASKSCHED	0x20			/* task scheduled */
#define	KQ_TASKDRAIN	0x40			/* waiting for task to drain */
	int		kq_knlistsize;		/* size of knlist */
	struct		klist *kq_knlist;	/* list of knotes */
	u_long		kq_knhashmask;		/* size of knhash */
	struct		klist *kq_knhash;	/* hash table for knotes */
	struct		task kq_task;
	struct		ucred *kq_cred;
};

Registration

kqueue

kqueue本身作为文件抽象看待,在OFT里注册entry创建内核对象并赋予descriptor索引。hash和内部的array并不分配。

kevent

代码语言:javascript复制
int
kevent(int kq, const struct kevent *changelist, int nchanges,
    struct kevent *eventlist, int nevents, const struct timespec *timeout)
{

	return (((int (*)(int, const struct kevent *, int,
	    struct kevent *, int, const struct timespec *))
	    __libc_interposing[INTERPOS_kevent])(kq, changelist, nchanges,
	   eventlist, nevents, timeout));
}

这里调用了kqueue_register来对changeList进行注册。首先根据线程和fd获取文件的FCB,kq对于fp引用计数 ,然后调用实际的注册函数。注册的代码太长了,大体就是先根据<Iden,filter>寻找knote节点,找不到如果是EV_ADD则增加knote,找到了就把事件增加到knote上去。

代码语言:javascript复制
int 
kqfd_register(int fd, struct kevent *kev, struct thread *td, int mflag)
{
	struct kqueue *kq;
	struct file *fp;
	cap_rights_t rights;
	int error;

	error = fget(td, fd, cap_rights_init(&rights, CAP_KQUEUE_CHANGE), &fp);
	if (error != 0)
		return (error);
	if ((error = kqueue_acquire(fp, &kq)) != 0)
		goto noacquire;

	error = kqueue_register(kq, kev, td, mflag);
	kqueue_release(kq, 0);

noacquire:
	fdrop(fp, td);
	return (error);
}

Filter

filter的作用就是对于事件源进行过滤,事件源所有的活动都会调用filter,但是只有符合filter规则的事件才会报告给应用,也就是返回布尔值,同时他也会修改fflags和data产生副作用(上面提到的输出参数)。filter封装了事件,kqueue只能询问他是否活跃,而对事件的细节一无所知。因此只需要增加filter,就能拓展事件的内容。

Activity

在所有触发这些活动的地方插入hook函数,调用knote()函数遍历自己维护的klist(注册的时候维护的),调用filter。

如果事件触发则激活,通过knote找到其所属的kqueue,并且将knote加入kqueue的active链末尾。如果已经在了,那么不用增加knote,但是filter还是会记录activity(e.g.上文提到的副作用)。

这里有些special case,例如fork需要看是不是TRACK,来判断是否报告子节点的PID

Additionally, for each knote attached to the parent, check whether user wants to track the new process. If so, attach a new knote to it, and immediately report an event with the child's pid.

首先,激活父进程的knote,然后创建新的knote分配给子节点,并且设置CHILD flag和对应的父进程PID。同时这里还提到了可能存在事件可能改变data,因此为EXIT额外分配一个节点。

代码语言:javascript复制
		/*
		 * Activate existing knote and register tracking knotes with
		 * new process.
		 *
		 * First register a knote to get just the child notice. This
		 * must be a separate note from a potential NOTE_EXIT
		 * notification since both NOTE_CHILD and NOTE_EXIT are defined
		 * to use the data field (in conflicting ways).
		 */
		kev.ident = pid;
		kev.filter = kn->kn_filter;
		kev.flags = kn->kn_flags | EV_ADD | EV_ENABLE | EV_ONESHOT |
		    EV_FLAG2;
		kev.fflags = kn->kn_sfflags;
		kev.data = kn->kn_id;		/* parent */
		kev.udata = kn->kn_kevent.udata;/* preserve udata */
		error = kqueue_register(kq, &kev, NULL, M_NOWAIT);
		if (error)
			kn->kn_fflags |= NOTE_TRACKERR;

		/*
		 * Then register another knote to track other potential events
		 * from the new process.
		 */
		kev.ident = pid;
		kev.filter = kn->kn_filter;
		kev.flags = kn->kn_flags | EV_ADD | EV_ENABLE | EV_FLAG1;
		kev.fflags = kn->kn_sfflags;
		kev.data = kn->kn_id;		/* parent */
		kev.udata = kn->kn_kevent.udata;/* preserve udata */
		error = kqueue_register(kq, &kev, NULL, M_NOWAIT);
		if (error)
			kn->kn_fflags |= NOTE_TRACKERR;
		if (kn->kn_fop->f_event(kn, NOTE_FORK))
			KNOTE_ACTIVATE(kn, 0);
		list->kl_lock(list->kl_lockarg);
		KQ_LOCK(kq);
		kn_leave_flux(kn);
		KQ_UNLOCK_FLUX(kq);

Delivery

kqueue_scan在active链末尾加入哨兵,如果scan时扔出了哨兵,那么遍历结束。

每次都从active移除一个节点(如果激活的节点被DISABLE,延迟到此时移除),如果不是ONESHOP,那么filter带着query hint重新检查,以确保获得的是最新的数据。

The rationale for this is the case where data arrives for a socket, which causes the knote to be queued, but the application happens to call read() and empty the socket buffer before calling kevent. If the knote was still queued, then an event would be returned telling the application to read an empty buffer.

确认激活的knote的信息将会拷贝到kevnet通过eventlist返回给应用进行通知。如果ONESHOP则直接从kqueue中移除,否则如果filter看它仍然active,就把它重新放到active链末尾(上次扫描的哨兵之后)。直到哨兵被出列,scan完成。

Miscellaneous Notes

1.论文的版本fork的时候不复制kqueue的df除非vfork。如果复制的话需要在fork时进行整个kqueue复制或者标记为COW。(现在不知道是不是这么做的)

2.kqueue是通过维护klist来对整条链涉及的所有进程进行通知的,而不是像poll或者select那样在sellInfo持有pid。下面这段话看不懂了,没看过poll不知道啥叫collision。

While this may be a natural outcome from the way knotes are implemented, it also means that the kqueue system is not susceptible to select collisions. As each knote is queued in the active list, only processes sleeping on that kqueue are woken up

3.考虑同一个klist有不同类型的filter,调用knode时应该给予额外信息通知他到底是什么事件触发的(例如PROC和SIGNAL容易混淆),因此利用hint确定activity和哪个相关

4. kevent要经历两次拷贝,增加了overhead。因此如果采用AIO更好,kernel直接修改user状态下的control block。那么为什么不这么做呢?根本原因在于如果内核不允许直接写用户态数据的话,bug会更好定位,同时应用也不需要考虑状态。

总结

精妙之处在于kqueue维持在内核中,因此socket如果满了可以直接将knote加入进程kqueue的活跃链,而不需要等到下次syscall的时候再检查。例如,即使我长期不kevent,knote()依然会将他们的activity存储在knote上并且插入active list,下次只需要遍历active list而不需要重头遍历整个queue。

同时因为kqueue有状态,进行修改也开销很小,只需要改变变化的那部分就行了。

看的时候还是有些地方比较难理解,加上源代码也很复杂,如果有纠错请指正。

附录

filechange

代码语言:javascript复制
struct kevent ev;
struct timespec nullts = { 0, 0 };
EV_SET(&ev, fd, EVFILT_VNODE,
EV_ADD | EV_ENABLE | EV_CLEAR,
NOTE_RENAME | NOTE_WRITE |
NOTE_DELETE | NOTE_ATTRIB, 0, 0);
kevent(kq, &ev, 1, NULL, 0, &nullts);
for (;;) {
n = kevent(kq, NULL, 0, &ev, 1, NULL);
if (n > 0) {
printf("The file was");
if (ev.fflags & NOTE_RENAME)
printf(" renamed");
if (ev.fflags & NOTE_WRITE)
printf(" written");
if (ev.fflags & NOTE_DELETE)
printf(" deleted");
if (ev.fflags & NOTE_ATTRIB)
printf(" chmod/chowned");
printf("n");
}

signal

代码语言:javascript复制
struct kevent ev;
struct timespec nullts = { 0, 0 };
EV_SET(&ev, SIGHUP, EVFILT_SIGNAL,
EV_ADD | EV_ENABLE, 0, 0, 0);
kevent(kq, &ev, 1, NULL, 0, &nullts);
signal(SIGHUP, SIG_IGN);
for (;;) {
n = kevent(kq, NULL, 0, &ev, 1, NULL);
if (n > 0)
printf("signal %d delivered"
" %d timesn",
ev.ident, ev.data);
}

udata

代码语言:javascript复制
int i, n;
struct timespec timeout =
{ TMOUT_SEC, TMOUT_NSEC };
void (* fcn)(struct kevent *);
n = kevent(kq, ch, nchanges,
ev, nevents, &timeout);
if (n <= 0)
goto error_or_timeout;
for (i = 0; i < n; i  ) {
if (evi.flags & EV_ERROR)
/* error */
fcn = evi.udata;
fcn(&evi);
}

0 人点赞