Redis源码阅读(三)初始化与事件循环

2022-01-28 16:48:25 浏览数 (3)

本篇从源码的角度介绍下Redis的代码初始化流程和事件循环的结构。

【server.c】main函数入口:

main主函数main主函数

Redis实现了一个简单的事件驱动程序库,即 ae.c 的代码,它屏蔽了系统底层在事件处理上的差异,并实现了事件循环机制。

Redis将事件处理分为两大类:文件事件与时间事件。文件事件即客户端和服务器在交互过程中socket的可读可写事件,时间事件即需要周期性执行的一些定时任务(如定时清除超时客户端连接,定时删除过期键等)。Redis采用比较成熟的I/O多路复用模型(select/epoll等)处理文件事件,并对这些I/O多路复用模型进行简单封装。

系统底层的I/O多路复用机制:能够同时等待I/O和timer这两种事件的发生。在不同的系统上,存在多种不同的I/O多路复用机制。

事件处理框架非常简单,从初始化、服务到结束,分别对应的函数:aeCreateEventLoop、aeMain、aeDeleteEventLoop。 其中,aeMain是事件循环的主体函数,它又会调用 aeProcessEvents函数,三个主体函数会调用aeApiCreate、aeApiPool、aeApiFree三个接口函数进行处理。

事件机制处理流程:

一、阶段一:初始化阶段

(1) 配置加载和初始化

Redis 服务器基本数据结构和各种参数的初始化。

initServerConfig 函数初始化 redisServer ==> 保证Redis的内部数据结构及参数都有缺省值

代码语言:javascript复制
struct redisServer {
    //...
    char *configfile;    // 配置文件绝对路径
    redisDb *db;         // 数据库数组
    dict *commands;      // 命令字典,Redis支持的所有命令都存储在这个字典中
    aeEventLoop *el;     // 事件循环结构
​
    int port;            // 服务器监听的端口号
    char *bindaddr[CONFIG_BINDADDR_MAX];      // 绑定的ip地址
    int ipfd[CONFIG_BINDADDR_MAX];            // 针对ip地址创建的文件描述符
    
    list *clients;       // 当前连接到Redis服务器的所有客户端
    int maxidletime;     // 最大空闲时间
    
  // 持久化/主从/集群等相关的参数
}

然后,从 redis.conf 中加载并解析配置文件 ==> 自定义配置,对某些参数进行覆盖

代码语言:javascript复制
void loadServerConfig(char *filename, char *options)

(2) 创建事件循环

Redis服务器是典型的事件驱动程序,它将事件处理分为两大类:文件事件与时间事件,它们都封装在结构体aeEventLoop中:

代码语言:javascript复制
typedef struct aeEventLoop {
    int stop;        // 标识事件循环是否结束
    
    aeFileEvent *events;           // 文件事件数组,存储已经注册的文件事件
    aeFiredEvent *fired;           // 储被触发的文件事件
    aeTimeEvent *timeEventHead;    // 多个时间事件形成链表,为时间事件链表头节点
    
    void *apidata;                 // 对4种I/O多路复用模型的进一步封装
    aeBeforeSleepProc *beforesleep;    // 阻塞等待文件事件发生之前会调用beforesleep函数
    aeBeforeSleepProc *aftersleep;     // 进程被唤醒之后会调用aftersleep函数
} aeEventLoop;

因此,我们需要创建 aeEventLoop,分配结构体所需内存,并初始化结构体各字段 ==> 依赖系统底层的I/O多路复用机制。

代码语言:javascript复制
aeEventLoop *aeCreateEventLoop(int setsize) {
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
​
    if (aeApiCreate(eventLoop) == -1) goto err;
}

(3) 开始socket监听

三种监听:TCP连接、Unix domain socket连接、TLS连接

Unix domain socket:一种高效的进程间通信机制,省去了协议栈的开销,比使用TCP协议性能更好。 从 Redis 6 开始支持 SSL / TLS,这是一项可选功能,需要在编译时启用;但 TLS当前不支持I / O多线程。

监听socket主要是为了获取文件描述符,后面需要根据文件描述符去注册I/O事件回调。

代码语言:javascript复制
int listenToPort(int port, int *fds, int *count) {
    for (j = 0; j < server.bindaddr_count || j == 0; j  ) {
        //创建socket并启动监听,文件描述符存储在fds数组作为返回参数
        fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
            server.tcp_backlog);
        //设置socket非阻塞
        anetNonBlock(NULL,fds[*count]);
        (*count)  ;
    }
}

注意:所有创建的socket都会设置为非阻塞模式,原因在于Redis使用了IO多路复用模式,其要求socket读写必须是非阻塞的,函数anetNonBlock通过系统调用fcntl设置socket非阻塞模式。

(4) 注册timer事件(时间事件) 回调

Redis服务器只维护了一个时间事件,该时间事件处理函数为serverCron,执行了所有需要周期性执行的一些定时任务,初次创建时1毫秒后就会被触发

代码语言:javascript复制
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
    exit(1);
}

serverCron函数:周期性地执行过期key的回收,主从重连、Cluster节点间的重连、BGSAVE、AOF rewrite的触发执行等 ==> 通过事件循环调度一些异步执行的任务

代码语言:javascript复制
int serverCron(struct aeEventLoop *eventLoop, long long id, void 
               *clientData) {
    run_with_period(100) {
        //100毫秒周期执行
    }
    run_with_period(5000) {
        //5000毫秒周期执行
    }
    //清除超时客户端连接
    clientsCron();
    //处理数据库
    databasesCron();
​
    server.cronloops  ;
    return 1000/server.hz;
}

serverCron由事件来驱动,执行还是在Redis主线程上,相当于和主线程上执行的其他操作(主要是对命令请求的执行)按时间进行分片

疑问:服务器内部不是有很多定时任务吗,为什么只有一个时间事件呢?

原因:Redis创建时间事件节点的函数为aeCreateTimeEvent,会创建时间事件并添加到时间事件链表。

(5) 注册I/O事件(文件事件) 回调

文件事件:

Redis客户端通过TCP socket与服务端交互,文件事件指的就是socket的可读可写事件。

socket的读写事件被抽象为文件事件,因此,对于监听的socket还需要创建对应的文件事件

代码语言:javascript复制
for (j = 0; j < server.ipfd_count; j  ) {
    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
        acceptTcpHandler,NULL) == AE_ERR){
        }
}

TCP 连接的I/O事件回调:acceptTcpHandler

TLS 连接的I/O事件回调:acceptTLSHandler

Unix domain socket连接的I/O事件回调:acceptUnixHandler

其他:通过pipe机制与module进行双向通信

(6) 初始化后台线程

通过bioInit函数,在后台执行的一些额外的线程,用于处理一些比较耗时且可以被延迟执行的任务,如可以延迟执行的文件关闭操作(unlink)、AOF的持久化写库操作(fsync)、大key的清除操作

(7) 启动事件循环

Redis的事件驱动也是通过while循环等待事件发生并处理:

代码语言:javascript复制
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    // 开始事件循环
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        // 事件处理主函数
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

二、阶段二:事件循环阶段

(1) 为什么要循环?

Redis作为一个服务端程序,需要对客户端不停发送的请求做响应的处理,因此需要进入一个无线循环中。在每一次的循环中,如果有I/O事件发生,就会去处理这些事件。如果没有事件发生,则等待,把整个循环阻塞住一段时间,阻塞时间根据时间事件间隔所决定。

(2) 什么时候恢复执行呢?

等待的事件发生的时候,程序会被重新唤醒,循环继续。

等待和唤醒的操作需要依赖底层系统实现。

(3) 如何统一调度timer事件和I/O事件?

需要能够同时等待timer和I/O两种事件的发生。要做到这一点,我们依赖系统底层的I/O多路复用机制(I/O multiplexing)。这种机制一般是这样设计的:它允许我们针对多个文件描述符来等待对应的I/O事件发生,并同时可以指定一个最长的阻塞超时时间。如果在这段阻塞时间内,有I/O事件发生,那么程序会被唤醒继续执行;如果一直没有I/O事件发生,而是指定的时间先超时了,那么程序也会被唤醒。对于timer事件的等待,就是依靠这里的超时机制。

(4) Redis中的IO多路复用是怎样的?

IO多路复用:多个网络 I/O 复用一个或少量的线程来处理 Socket

socket读写操作有阻塞与非阻塞之分。采用阻塞模式时,一个进程只能处理一条网络连接的读写事件,为了同时处理多条网络连接,通常会采用多线程或者多进程,效率低下;非阻塞模式下,可以使用目前比较成熟的I/O多路复用模型,如select/epoll/kqueue/event ports,视不同操作系统而定。

Redis同时支持4种I/O多路复用模型,并将这些模型的API进一步统一封装,由文件ae_evport.c、ae_epoll.c、ae_kqueue.c和ae_select.c实现。

而Redis在编译阶段,会检查操作系统支持的I/O多路复用模型,并按照一定规则决定使用哪种模型。

【例如,在macOS上编译Redis,那么它底层会选用kqueue;而如果在Linux上编译则会选择epoll

以epoll为例,在Redis中对应的源文件为:ae_epoll.c

epoll是linux中IO多路复用的一种机制,通过一个进程监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),通过 callbak 回调通知机制,能够通知程序进行相应的读写操作。Redis并没有直接使用epoll提供的API,而是将其API进一步统一封装

主要有三个函数:

Redis封装函数

linux函数

备注

aeApiCreate

int epoll_create(int size)

创建事件

aeApiAddEvent

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

添加事件

aeApiDelEvent

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

删除事件

aeApiPoll

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

等待事件

a.int epoll_create(int size);

创建一个epoll的句柄,当创建好epoll句柄后,它就是会占用一个fd值;size用来告诉内核这个监听的数目一共有多大。

b.int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。

  • 第一个参数是epoll_create()的返回值;
  • 第二个参数表示动作,用三个宏来表示: ​ EPOLL_CTL_ADD:注册新的fd到epfd中; ​ EPOLL_CTL_MOD:修改已经注册的fd的监听事件; ​ EPOLL_CTL_DEL:从epfd中删除一个fd;
  • 第三个参数是需要监听的fd;
  • 第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:
代码语言:javascript复制
struct epoll_event {
  __uint32_t events;  /* Epoll events */
  epoll_data_t data;  /* User data variable */
};

c.int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

等待事件的产生,类似于select()调用。返回需要处理的事件数目,如返回0表示已超时

epoll相比select/poll的优势

  • select/poll每次调用都要传递所要监控的所有fd给select/poll系统调用(这意味着每次调用都要将fd列表从用户态拷贝到内核态,当fd数目很多时,这会造成低效)。而每次调用epoll_wait时(作用相当于调用select/poll),不需要再传递fd列表给内核,因为已经在epoll_ctl中将需要监控的fd告诉了内核(epoll_ctl不需要每次都拷贝所有的fd,只需要进行增量式操作)。所以,在调用epoll_create之后,内核已经在内核态开始准备数据结构存放要监控的fd了。每次epoll_ctl只是对这个数据结构进行简单的维护。
  • select/poll一个致命弱点就是当你拥有一个很大的socket集合,不过由于网络延时,任一时间只有部分的socket是"活跃"的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。但是epoll不存在这个问题,它只会对"活跃"的socket进行操作---这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的。
  • 当我们调用epoll_ctl往里塞入百万个fd时,epoll_wait仍然可以飞快的返回,并有效的将发生事件的fd给我们用户。这是由于我们在调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个红黑树用于存储以后epoll_ctl传来的fd外,还会再建立一个list链表,用于存储准备就绪的事件,当epoll_wait调用时,仅仅观察这个list链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。而且,通常情况下即使我们要监控百万计的fd,大多一次也只返回很少量的准备就绪fd而已,所以,epoll_wait仅需要从内核态copy少量的fd到用户态而已。那么,这个准备就绪list链表是怎么维护的呢?当我们执行epoll_ctl时,除了把fd放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个fd的中断到了,就把它放到准备就绪list链表里。所以,当一个fd(例如socket)上有数据到了,内核在把设备(例如网卡)上的数据copy到内核中后就来把fd(socket)插入到准备就绪list链表里了。

整个 I/O 多路复用模块在事件循环看来就是一个输入事件、输出 aeFiredEvent 数组的一个黑箱

在这个黑箱中,使用 aeCreateFileEventaeDeleteFileEvent 来添加删除需要监听的文件描述符以及事件。

在对应事件发生时,当前单元格会“变色”表示发生了可读(黄色)或可写(绿色)事件,调用 aeApiPoll 时会把对应的文件描述符和事件放入 aeFiredEvent 数组,并在 processEvents 方法中执行事件对应的回调。

(5) 如何进行事件循环?

无论是文件事件还是时间事件都封装在结构体aeEventLoop中:

代码语言:javascript复制
typedef struct aeEventLoop {
    int stop;         // 标识事件循环是否结束
​
    aeFileEvent *events;    // 文件事件数组,存储已经注册的文件事件
    aeFiredEvent *fired;    // 存储被触发的文件事件
    aeTimeEvent *timeEventHead; // 多个时间事件形成链表,timeEventHead 为时间事件链表头节点
​
    void *apidata;    // 对4种I/O多路复用模型(kqueue、epoll等)的进一步封装
    aeBeforeSleepProc *beforesleep;   // 阻塞等待文件事件的生之前会调用beforesleep函数
    aeBeforeSleepProc *aftersleep;    // 阻塞进程被唤醒之后调用aftersleep函数
} aeEventLoop;

事件驱动程序通常存在while/for循环,循环等待事件发生并处理,Redis也不例外,其事件循环如下:

代码语言:javascript复制
while (!eventLoop->stop) {
  if (eventLoop->beforesleep != NULL)
    eventLoop->beforesleep(eventLoop);
  aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}

函数aeProcessEvents为事件处理主函数,其第2个参数是一个标志位,AE_ALL_EVENTS表示函数需要处理文件事件与时间事件,AE_CALL_AFTER_SLEEP表示阻塞等待文件事件之后需要执行aftersleep函数。

事件循环执行函数aeProcessEvents的主要逻辑:①查找最早会发生的时间事件,计算超时时间;②阻塞等待文件事件的产生;③处理文件事件;④处理时间事件。

补充说明:

Redis对于timer事件回调的处理设计了一个小机制:timer事件的回调函数可以返回一个需要下次执行的毫秒数。如果返回值是正常的正值,那么Redis就不会把这个timer事件从事件循环的队列中删除,这样它后面还有机会再次执行。例如,按照默认的设置,serverCron返回值是100,因此它每隔100毫秒会执行一次(当然这个执行频率可以在redis.conf中通过hz变量来调整)。

(6) 底层是如何支持了Redis的事件循环?(事件循环的底层实现)

a. 注册回调函数

首先,向事件循环中注册I/O事件回调的时候,需要指定哪个回调函数注册到哪个事件上(事件用文件描述符来表示)。事件和回调函数的对应关系,由Redis上层封装的事件驱动程序库来维护。

代码语言:javascript复制
// 客户端连接的事件处理器
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL);
// 命令请求的事件处理器
aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c);
// 命令回复的事件处理器
aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c);

b. 阻塞等待事件发生

类似地,向事件循环中注册timer事件回调的时候,需要指定多长时间之后执行哪个回调函数。这里需要记录哪个回调函数预期在哪个时刻被调用,这也是由Redis上层封装的事件驱动程序库来维护的。

Redis创建时间事件节点的函数为aeCreateTimeEvent,内部实现只是创建时间事件并添加到时间事件链表。

代码语言:javascript复制
long long aeCreateTimeEvent(aeEventLoop *eventLoop,      // 输入参数指向事件循环结构体
                  long long milliseconds,                // 此时间事件触发时间,单位毫秒
                  aeTimeProc *proc,                      // 时间事件的处理函数
                  void *clientData,                      // 指向对应的结构体对象
                  aeEventFinalizerProc *finalizerProc);  // 函数指针

c. 执行回调

底层的各种事件机制都会提供一个等待事件的操作,比如epoll提供的epoll_wait API。这个等待操作一般可以指定预期等待的事件列表(事件用文件描述符来表示),并同时可以指定一个超时时间(即最大等待多长时间)。在事件循环中需要等待事件发生的时候,就调用这个等待操作,传入之前注册过的所有I/O事件,并把最近的timer事件所对应的时刻转换成这里需要的超时时间。主要在aeProcessEvents函数进行处理。

aeProcessEvents函数在调用aeApiPoll之前会遍历Redis的时间事件链表,查找最早会发生的时间事件,以此作为aeApiPoll需要传入的超时时间:

代码语言:javascript复制
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
  // 最早发生的时间事件
  shortest = aeSearchNearestTimer(eventLoop);
  long long ms =
    shortest->when_sec - now_sec)*1000  
    shortest->when_ms - now_ms;
  …………
  // 阻塞等待文件事件发生
  numevents = aeApiPoll(eventLoop, tvp);
  
  for (j = 0; j < numevents; j  ) {
    aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
    // 处理文件事件,即根据类型执行rfileProc或wfileProc
  }
  // 处理时间事件
  processed  = processTimeEvents(eventLoop);
}

aeProcessEvents 都会先计算最近的时间事件发生所需要等待的时间,然后调用 aeApiPoll 方法在这段时间中等待事件的发生,在这段时间中如果发生了文件事件,就会优先处理文件事件,否则就会一直等待,直到最近的时间事件需要触发

d. 从上一步的等待操作中唤醒,有两种情况:如果是I/O事件发生了,那么就根据触发的事件查到I/O回调函数,进行调用;如果是超时了,那么检查所有注册过的timer事件,对于预期调用时刻超过当前时间的回调函数都进行调用。

三、总结

  • Redis主要的处理流程包括接收请求、执行命令,以及周期性地执行后台任务(serverCron),这些都是由这个事件循环驱动的。
  • 当请求到来时,I/O事件被触发,事件循环被唤醒,根据请求执行命令并返回响应结果;
  • 同时,后台异步任务(如回收过期的key)被拆分成若干小段,由timer事件所触发,夹杂在I/O事件处理的间隙来周期性地运行。
  • 这种执行方式允许仅仅使用一个线程来处理大量的请求,并能提供快速的响应时间。当然,这种实现方式之所以能够高效运转,除了事件循环的结构之外,还得益于系统提供的异步的I/O多路复用机制(I/O multiplexing)。
  • 事件循环利用I/O多路复用机制,对 CPU 进行时分复用 (多个事件流将 CPU 切割成多个时间片,不同事件流的时间片交替进行),使得多个事件流就可以并发进行。
  • 而且,使用单线程事件机制可以避免代码的并发执行,在访问各种数据结构的时候都无需考虑线程安全问题,从而大大降低了实现的复杂度。

0 人点赞