Redis 事件机制是如何实现的?

2023-10-18 14:25:52 浏览数 (2)

前言

我们都知道,Redis 是单线程(非严谨),你是否想过,一个线程要如何处理来自各个客户端的各种请求呢?它忙的过来吗?没错,它还真的能忙过来,并且还井井有条。其中多亏了 IO 多路复用,而不仅仅是它,事件机制在其中也是一个不错的设计。

之前我提到过有关于 IO 多路复用对于 Redis 的影响,IO多路复用和多线程会影响Redis分布式锁吗? 其中有部分内容其实已经提到了,所以本文会更加关注于事件机制本身。

PS:Redis 高版本已经支持多线程处理某些事情,为了简化,这里不做讨论,故下文出现的单线程仅是描述那些必须单线程执行的场景。

前置知识

  • IO 多路复用

尝试思考

首先,让我们来思考一下,如果是我们自己来实现,会尝试如何去做。

对于请求连接处理的思考

最笨的方法,那么就是来一个客户端 accept 一次,然后给什么请求做什么事情,先来先做,做完走人,对吧。那显然这样太慢了,要知道作为一个缓存,这样设计要把人给急死。

当然,我们也可以说,来一个我开一个线程单独处理你,相当于你一来我就单独找人为你服务,而服务的人最终会将请求给到一个处理中心,让处理中心统一去处理,然后将结果返回。但显然 Redis 没有那么多资源让你浪费。

于是要找人帮忙,那就是 IO 多路复用,至少它能帮我解决前面服务的问题,fd 我就不管了,直接告诉我哪些人来了,并且告诉我有事的是那些人。

反观机制的思考

既然 epoll_wait 能 告诉我们有那些 socket 已经就绪,那么我们就处理就绪的这些就可以了。但我们需要一个合理的机制来帮我们来优雅的处理他们,毕竟 Redis 后面只有个单线程在处理。由于处理没这么快,肯定需要一个地方来存放未处理的这些事件,那很合理就能想到需要一个类似 buffer 的东西。

所以,对于这个事件机制,我第一个想法就是弄个队列,或者 ringbuffer 来搞,那不就是一个生产消费者模型吗?

事件机制

那么下面我们就来看看 Redis 它是如何设计。

分类

首先 Redis 分了两类事件

  • fileEvents 文件事件,就是我们之前提到的请求的处理,我们也主要讨论这个
  • timedEvents 定时事件,没错肯定有一些定时任务触发的事件在里面

文件事件处理

OK,看完图我们就有了一个大致的印象,为了灵活的处理不同的事件,需要将事件分配给处理器去处理,这里也是我们之前思考的时候没有想到的一个设计。通常来说对于任何的处理往往都有这样一个分配器去分配所有的任务,这样可以让扩展更加灵活,如果后续有新的类型,只需要扩展出一个新的处理器就可以了。

源码分析

https://github.com/redis/redis/blob/9b1d4f003de1b141ea850f01e7104e7e5c670620/src/ae.c#L493

首先入口在 aeMain 这个简单,就是循环,也正是这个循环处理着所有的事件,我们可以看到,只要不停(stop),就会一直循环处理

代码语言:javascript复制
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

然后就是我们重点的 aeProcessEvents 方法,其中重点就是调用 aeApiPoll 获取当前就绪的事件,然后你就能看到我们的 aeFileEvent 也就是文件事件了,最后还有 processTimeEvents 处理定时事件。那么事件本身,是如何处理的呢?就是 rfileProc 和 wfileProc 一个处理读一个处理写。那么问题来了,这两个方法具体是什么呢?卖个关子,我们先瞅一眼 aeApiPoll

代码语言:javascript复制
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        struct timeval tv, *tvp = NULL; /* NULL means infinite wait. */
        int64_t usUntilTimer;

        if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP))
            eventLoop->beforesleep(eventLoop);

        if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
            tv.tv_sec = tv.tv_usec = 0;
            tvp = &tv;
        } else if (flags & AE_TIME_EVENTS) {
            usUntilTimer = usUntilEarliestTimer(eventLoop);
            if (usUntilTimer >= 0) {
                tv.tv_sec = usUntilTimer / 1000000;
                tv.tv_usec = usUntilTimer % 1000000;
                tvp = &tv;
            }
        }
        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. 注意这里!!!!!!!!!!!!!! */
        numevents = aeApiPoll(eventLoop, tvp);

        /* Don't process file events if not requested. */
        if (!(flags & AE_FILE_EVENTS)) {
            numevents = 0;
        }

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j  ) {
            int fd = eventLoop->fired[j].fd;
            aeFileEvent *fe = &eventLoop->events[fd];
            int mask = eventLoop->fired[j].mask;
            int fired = 0; /* Number of events fired for current fd. */

            int invert = fe->mask & AE_BARRIER;

            if (!invert && fe->mask & mask & AE_READABLE) {
                /* rfileProc 在处理什么事件呢? */
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired  ;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                /* wfileProc 在处理什么事件呢? */
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired  ;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired  ;
                }
            }

            processed  ;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed  = processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

这里其他都不重要,重点就在我们熟悉的 epoll_wait ,获取所有就绪的 fd 也就能知道所有需要处理的事件了。

代码语言:javascript复制
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000   (tvp->tv_usec   999)/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j  ) {
            int mask = 0;
            struct epoll_event *e = state->events j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    } else if (retval == -1 && errno != EINTR) {
        panic("aeApiPoll: epoll_wait, %s", strerror(errno));
    }

    return numevents;
}

好了,我们来解密究竟 rfileProcwfileProc 是什么,aeCreateFileEvent 方法是用于创建 FileEvent 的方法,其中的入参里面有 aeFileProc 没错就是它了。根据不同的类型用不同的 handler 创建不同的 event。也就是说,最终的处理方式是通过参数传递进去的。

代码语言:javascript复制
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

0 人点赞