【redis源码学习】事件机制

2021-12-29 08:11:20 浏览数 (1)

文章目录

    • redis事件机制概述
    • redis的事件循环器:aeEventLoop
    • redis启动
    • 事件循环

redis事件机制概述

1、redis使用 IO 复用 实现网络通信。 2、在Linux环境下选用epoll模式。

redis的事件循环器:aeEventLoop

acEventLoop 是 redis 的事件循环器,负责管理事件。

代码语言:javascript复制
/* State of an event based program */
typedef struct aeEventLoop {
	int maxfd; 		//当前已注册的最大文件描述符 
    int setsize; 	//该事件循环允许监听的最大文件描述符
    long long timeEventNextId;	//下一个时间事件ID
    time_t lastTime;     //用于校验系统时钟偏移
    
    aeFileEvent *events; //已注册的文件事件表
    aeFiredEvent *fired; //已就绪的事件表
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; //用于存放IO复用层的附加数据
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
    //进程阻塞前后调用的钩子函数
    int flags;
} aeEventLoop;
代码语言:javascript复制
/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;	//附加数据
} aeFileEvent;

acFileEvent 中没有 fd 文件描述符的消息,这是一个骚操作我们可以看一下。 首先: 1、POSIX规定了 0/1/2 三个文件描述符的去向 2、POSIX同时还规定了文件描述符的分配方式为递增

依次递增的还有什么?数组的下标嘛。 于是在 aeEventLoop.events 中,以下标为 fd,数组内存储事件。

如果事件已就绪,会被放到 aeEventLoop.fired 中,结构如下:

代码语言:javascript复制
/* A fired event */
typedef struct aeFiredEvent {
    int fd;
    int mask;
} aeFiredEvent;

redis启动

redis启动时,在initServer里面会调用 aeCreateEventLoop 函数创建一个事件循环器,存储于server.el。事件循环器会监听 TCP Socket,并使用指定函数处理读写事件。

redis 启动时也调用 acCreateTimeEvent 函数创建了一个处理函数为 serverCron 的时间事件,负责处理 Redis 中的定时任务。

serverCron 时间事件负责完成大部分内部任务,包括定时持久化、清除过期数据等。另一部分任务在那俩钩子函数中触发。

事件循环

代码语言:javascript复制
/* Process every pending time event, then every pending file event
 * (that may be registered by time event callbacks just processed).
 * Without special flags the function sleeps until some file event
 * fires, or when the next time event occurs (if any).
 *
 * If flags is 0, the function does nothing and returns.
 * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
 * if flags has AE_FILE_EVENTS set, file events are processed.
 * if flags has AE_TIME_EVENTS set, time events are processed.
 * if flags has AE_DONT_WAIT set the function returns ASAP until all
 * the events that's possible to process without to wait are processed.
 * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
 * if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called.
 *
 * The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms =
                (shortest->when_sec - now_sec)*1000  
                shortest->when_ms - now_ms;

            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        if (eventLoop->flags & AE_DONT_WAIT) {
            tv.tv_sec = tv.tv_usec = 0;
            tvp = &tv;
        }

        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j  ) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            /* Normally we execute the readable event first, and the writable
             * event laster. This is useful as sometimes we may be able
             * to serve the reply of a query immediately after processing the
             * query.
             *
             * However if AE_BARRIER is set in the mask, our application is
             * asking us to do the reverse: never fire the writable event
             * after the readable. In such a case, we invert the calls.
             * This is useful when, for instance, we want to do things
             * in the beforeSleep() hook, like fsynching a file to disk,
             * before replying to a client. */
            int invert = fe->mask & AE_BARRIER;

            /* Note the "fe->mask & mask & ..." code: maybe an already
             * processed event removed an element that fired and we still
             * didn't processed, so we check if the event is still valid.
             *
             * Fire the readable event if the call sequence is not
             * inverted. */
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired  ;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired  ;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired  ;
                }
            }

            processed  ;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed  = processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

今天没啥灵感呀。。。

0 人点赞