本篇从源码的角度介绍下Redis的代码初始化流程和事件循环的结构。
【server.c】main函数入口:
Redis实现了一个简单的事件驱动程序库,即 ae.c 的代码,它屏蔽了系统底层在事件处理上的差异,并实现了事件循环机制。
Redis将事件处理分为两大类:文件事件与时间事件。文件事件即客户端和服务器在交互过程中socket的可读可写事件,时间事件即需要周期性执行的一些定时任务(如定时清除超时客户端连接,定时删除过期键等)。Redis采用比较成熟的I/O多路复用模型(select/epoll等)处理文件事件,并对这些I/O多路复用模型进行简单封装。
系统底层的I/O多路复用机制:能够同时等待I/O和timer这两种事件的发生。在不同的系统上,存在多种不同的I/O多路复用机制。
事件处理框架非常简单,从初始化、服务到结束,分别对应的函数:aeCreateEventLoop、aeMain、aeDeleteEventLoop。 其中,aeMain是事件循环的主体函数,它又会调用 aeProcessEvents函数,三个主体函数会调用aeApiCreate、aeApiPool、aeApiFree三个接口函数进行处理。
事件机制处理流程:
一、阶段一:初始化阶段
(1) 配置加载和初始化
Redis 服务器基本数据结构和各种参数的初始化。
initServerConfig
函数初始化 redisServer ==> 保证Redis的内部数据结构及参数都有缺省值
struct redisServer {
//...
char *configfile; // 配置文件绝对路径
redisDb *db; // 数据库数组
dict *commands; // 命令字典,Redis支持的所有命令都存储在这个字典中
aeEventLoop *el; // 事件循环结构
int port; // 服务器监听的端口号
char *bindaddr[CONFIG_BINDADDR_MAX]; // 绑定的ip地址
int ipfd[CONFIG_BINDADDR_MAX]; // 针对ip地址创建的文件描述符
list *clients; // 当前连接到Redis服务器的所有客户端
int maxidletime; // 最大空闲时间
// 持久化/主从/集群等相关的参数
}
然后,从 redis.conf 中加载并解析配置文件 ==> 自定义配置,对某些参数进行覆盖
代码语言:javascript复制void loadServerConfig(char *filename, char *options)
(2) 创建事件循环
Redis服务器是典型的事件驱动程序,它将事件处理分为两大类:文件事件与时间事件,它们都封装在结构体aeEventLoop中:
代码语言:javascript复制typedef struct aeEventLoop {
int stop; // 标识事件循环是否结束
aeFileEvent *events; // 文件事件数组,存储已经注册的文件事件
aeFiredEvent *fired; // 储被触发的文件事件
aeTimeEvent *timeEventHead; // 多个时间事件形成链表,为时间事件链表头节点
void *apidata; // 对4种I/O多路复用模型的进一步封装
aeBeforeSleepProc *beforesleep; // 阻塞等待文件事件发生之前会调用beforesleep函数
aeBeforeSleepProc *aftersleep; // 进程被唤醒之后会调用aftersleep函数
} aeEventLoop;
因此,我们需要创建 aeEventLoop,分配结构体所需内存,并初始化结构体各字段 ==> 依赖系统底层的I/O多路复用机制。
代码语言:javascript复制aeEventLoop *aeCreateEventLoop(int setsize) {
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (aeApiCreate(eventLoop) == -1) goto err;
}
(3) 开始socket监听
三种监听:TCP连接、Unix domain socket连接、TLS连接
Unix domain socket:一种高效的进程间通信机制,省去了协议栈的开销,比使用TCP协议性能更好。 从 Redis 6 开始支持 SSL / TLS,这是一项可选功能,需要在编译时启用;但 TLS当前不支持I / O多线程。
监听socket主要是为了获取文件描述符,后面需要根据文件描述符去注册I/O事件回调。
代码语言:javascript复制int listenToPort(int port, int *fds, int *count) {
for (j = 0; j < server.bindaddr_count || j == 0; j ) {
//创建socket并启动监听,文件描述符存储在fds数组作为返回参数
fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
server.tcp_backlog);
//设置socket非阻塞
anetNonBlock(NULL,fds[*count]);
(*count) ;
}
}
注意:所有创建的socket都会设置为非阻塞模式,原因在于Redis使用了IO多路复用模式,其要求socket读写必须是非阻塞的,函数anetNonBlock通过系统调用fcntl设置socket非阻塞模式。
(4) 注册timer事件(时间事件) 回调
Redis服务器只维护了一个时间事件,该时间事件处理函数为serverCron,执行了所有需要周期性执行的一些定时任务,初次创建时1毫秒后就会被触发
代码语言:javascript复制if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
exit(1);
}
serverCron函数:周期性地执行过期key的回收,主从重连、Cluster节点间的重连、BGSAVE、AOF rewrite的触发执行等 ==> 通过事件循环调度一些异步执行的任务
代码语言:javascript复制int serverCron(struct aeEventLoop *eventLoop, long long id, void
*clientData) {
run_with_period(100) {
//100毫秒周期执行
}
run_with_period(5000) {
//5000毫秒周期执行
}
//清除超时客户端连接
clientsCron();
//处理数据库
databasesCron();
server.cronloops ;
return 1000/server.hz;
}
serverCron由事件来驱动,执行还是在Redis主线程上,相当于和主线程上执行的其他操作(主要是对命令请求的执行)按时间进行分片
疑问:服务器内部不是有很多定时任务吗,为什么只有一个时间事件呢?
原因:Redis创建时间事件节点的函数为aeCreateTimeEvent,会创建时间事件并添加到时间事件链表。
(5) 注册I/O事件(文件事件) 回调
文件事件:
Redis客户端通过TCP socket与服务端交互,文件事件指的就是socket的可读可写事件。
socket的读写事件被抽象为文件事件,因此,对于监听的socket还需要创建对应的文件事件
代码语言:javascript复制for (j = 0; j < server.ipfd_count; j ) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR){
}
}
TCP 连接的I/O事件回调:acceptTcpHandler
TLS 连接的I/O事件回调:acceptTLSHandler
Unix domain socket连接的I/O事件回调:acceptUnixHandler
其他:通过pipe机制与module进行双向通信
(6) 初始化后台线程
通过bioInit
函数,在后台执行的一些额外的线程,用于处理一些比较耗时且可以被延迟执行的任务,如可以延迟执行的文件关闭操作(unlink)、AOF的持久化写库操作(fsync)、大key的清除操作
(7) 启动事件循环
Redis的事件驱动也是通过while循环等待事件发生并处理:
代码语言:javascript复制void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
// 开始事件循环
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 事件处理主函数
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
二、阶段二:事件循环阶段
(1) 为什么要循环?
Redis作为一个服务端程序,需要对客户端不停发送的请求做响应的处理,因此需要进入一个无线循环中。在每一次的循环中,如果有I/O事件发生,就会去处理这些事件。如果没有事件发生,则等待,把整个循环阻塞住一段时间,阻塞时间根据时间事件间隔所决定。
(2) 什么时候恢复执行呢?
等待的事件发生的时候,程序会被重新唤醒,循环继续。
等待和唤醒的操作需要依赖底层系统实现。
(3) 如何统一调度timer事件和I/O事件?
需要能够同时等待timer和I/O两种事件的发生。要做到这一点,我们依赖系统底层的I/O多路复用机制(I/O multiplexing)。这种机制一般是这样设计的:它允许我们针对多个文件描述符来等待对应的I/O事件发生,并同时可以指定一个最长的阻塞超时时间。如果在这段阻塞时间内,有I/O事件发生,那么程序会被唤醒继续执行;如果一直没有I/O事件发生,而是指定的时间先超时了,那么程序也会被唤醒。对于timer事件的等待,就是依靠这里的超时机制。
(4) Redis中的IO多路复用是怎样的?
IO多路复用:多个网络 I/O 复用一个或少量的线程来处理 Socket
socket读写操作有阻塞与非阻塞之分。采用阻塞模式时,一个进程只能处理一条网络连接的读写事件,为了同时处理多条网络连接,通常会采用多线程或者多进程,效率低下;非阻塞模式下,可以使用目前比较成熟的I/O多路复用模型,如select/epoll/kqueue/event ports,视不同操作系统而定。
Redis同时支持4种I/O多路复用模型,并将这些模型的API进一步统一封装,由文件ae_evport.c、ae_epoll.c、ae_kqueue.c和ae_select.c实现。
而Redis在编译阶段,会检查操作系统支持的I/O多路复用模型,并按照一定规则决定使用哪种模型。
【例如,在macOS上编译Redis,那么它底层会选用kqueue
;而如果在Linux上编译则会选择epoll】
以epoll为例,在Redis中对应的源文件为:ae_epoll.c
epoll是linux中IO多路复用的一种机制,通过一个进程监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),通过 callbak 回调通知机制,能够通知程序进行相应的读写操作。Redis并没有直接使用epoll提供的API,而是将其API进一步统一封装
主要有三个函数:
Redis封装函数 | linux函数 | 备注 |
---|---|---|
aeApiCreate | int epoll_create(int size) | 创建事件 |
aeApiAddEvent | int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) | 添加事件 |
aeApiDelEvent | int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) | 删除事件 |
aeApiPoll | int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout); | 等待事件 |
a.int epoll_create(int size);
创建一个epoll的句柄,当创建好epoll句柄后,它就是会占用一个fd值;size用来告诉内核这个监听的数目一共有多大。
b.int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。
- 第一个参数是epoll_create()的返回值;
- 第二个参数表示动作,用三个宏来表示: EPOLL_CTL_ADD:注册新的fd到epfd中; EPOLL_CTL_MOD:修改已经注册的fd的监听事件; EPOLL_CTL_DEL:从epfd中删除一个fd;
- 第三个参数是需要监听的fd;
- 第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
c.int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待事件的产生,类似于select()调用。返回需要处理的事件数目,如返回0表示已超时
epoll相比select/poll的优势:
- select/poll每次调用都要传递所要监控的所有fd给select/poll系统调用(这意味着每次调用都要将fd列表从用户态拷贝到内核态,当fd数目很多时,这会造成低效)。而每次调用epoll_wait时(作用相当于调用select/poll),不需要再传递fd列表给内核,因为已经在epoll_ctl中将需要监控的fd告诉了内核(epoll_ctl不需要每次都拷贝所有的fd,只需要进行增量式操作)。所以,在调用epoll_create之后,内核已经在内核态开始准备数据结构存放要监控的fd了。每次epoll_ctl只是对这个数据结构进行简单的维护。
- select/poll一个致命弱点就是当你拥有一个很大的socket集合,不过由于网络延时,任一时间只有部分的socket是"活跃"的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。但是epoll不存在这个问题,它只会对"活跃"的socket进行操作---这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的。
- 当我们调用epoll_ctl往里塞入百万个fd时,epoll_wait仍然可以飞快的返回,并有效的将发生事件的fd给我们用户。这是由于我们在调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个红黑树用于存储以后epoll_ctl传来的fd外,还会再建立一个list链表,用于存储准备就绪的事件,当epoll_wait调用时,仅仅观察这个list链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。而且,通常情况下即使我们要监控百万计的fd,大多一次也只返回很少量的准备就绪fd而已,所以,epoll_wait仅需要从内核态copy少量的fd到用户态而已。那么,这个准备就绪list链表是怎么维护的呢?当我们执行epoll_ctl时,除了把fd放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个fd的中断到了,就把它放到准备就绪list链表里。所以,当一个fd(例如socket)上有数据到了,内核在把设备(例如网卡)上的数据copy到内核中后就来把fd(socket)插入到准备就绪list链表里了。
整个 I/O 多路复用模块在事件循环看来就是一个输入事件、输出 aeFiredEvent
数组的一个黑箱
在这个黑箱中,使用 aeCreateFileEvent
、 aeDeleteFileEvent
来添加删除需要监听的文件描述符以及事件。
在对应事件发生时,当前单元格会“变色”表示发生了可读(黄色)或可写(绿色)事件,调用 aeApiPoll
时会把对应的文件描述符和事件放入 aeFiredEvent
数组,并在 processEvents
方法中执行事件对应的回调。
(5) 如何进行事件循环?
无论是文件事件还是时间事件都封装在结构体aeEventLoop中:
代码语言:javascript复制typedef struct aeEventLoop {
int stop; // 标识事件循环是否结束
aeFileEvent *events; // 文件事件数组,存储已经注册的文件事件
aeFiredEvent *fired; // 存储被触发的文件事件
aeTimeEvent *timeEventHead; // 多个时间事件形成链表,timeEventHead 为时间事件链表头节点
void *apidata; // 对4种I/O多路复用模型(kqueue、epoll等)的进一步封装
aeBeforeSleepProc *beforesleep; // 阻塞等待文件事件的生之前会调用beforesleep函数
aeBeforeSleepProc *aftersleep; // 阻塞进程被唤醒之后调用aftersleep函数
} aeEventLoop;
事件驱动程序通常存在while/for循环,循环等待事件发生并处理,Redis也不例外,其事件循环如下:
代码语言:javascript复制while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
函数aeProcessEvents
为事件处理主函数,其第2个参数是一个标志位,AE_ALL_EVENTS表示函数需要处理文件事件与时间事件,AE_CALL_AFTER_SLEEP表示阻塞等待文件事件之后需要执行aftersleep函数。
事件循环执行函数aeProcessEvents的主要逻辑:①查找最早会发生的时间事件,计算超时时间;②阻塞等待文件事件的产生;③处理文件事件;④处理时间事件。
补充说明:
Redis对于timer事件回调的处理设计了一个小机制:timer事件的回调函数可以返回一个需要下次执行的毫秒数。如果返回值是正常的正值,那么Redis就不会把这个timer事件从事件循环的队列中删除,这样它后面还有机会再次执行。例如,按照默认的设置,serverCron
返回值是100,因此它每隔100毫秒会执行一次(当然这个执行频率可以在redis.conf中通过hz
变量来调整)。
(6) 底层是如何支持了Redis的事件循环?(事件循环的底层实现)
a. 注册回调函数
首先,向事件循环中注册I/O事件回调的时候,需要指定哪个回调函数注册到哪个事件上(事件用文件描述符来表示)。事件和回调函数的对应关系,由Redis上层封装的事件驱动程序库来维护。
代码语言:javascript复制// 客户端连接的事件处理器
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL);
// 命令请求的事件处理器
aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c);
// 命令回复的事件处理器
aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c);
b. 阻塞等待事件发生
类似地,向事件循环中注册timer事件回调的时候,需要指定多长时间之后执行哪个回调函数。这里需要记录哪个回调函数预期在哪个时刻被调用,这也是由Redis上层封装的事件驱动程序库来维护的。
Redis创建时间事件节点的函数为aeCreateTimeEvent,内部实现只是创建时间事件并添加到时间事件链表。
代码语言:javascript复制long long aeCreateTimeEvent(aeEventLoop *eventLoop, // 输入参数指向事件循环结构体
long long milliseconds, // 此时间事件触发时间,单位毫秒
aeTimeProc *proc, // 时间事件的处理函数
void *clientData, // 指向对应的结构体对象
aeEventFinalizerProc *finalizerProc); // 函数指针
c. 执行回调
底层的各种事件机制都会提供一个等待事件的操作,比如epoll提供的epoll_wait API。这个等待操作一般可以指定预期等待的事件列表(事件用文件描述符来表示),并同时可以指定一个超时时间(即最大等待多长时间)。在事件循环中需要等待事件发生的时候,就调用这个等待操作,传入之前注册过的所有I/O事件,并把最近的timer事件所对应的时刻转换成这里需要的超时时间。主要在aeProcessEvents函数进行处理。
aeProcessEvents函数在调用aeApiPoll之前会遍历Redis的时间事件链表,查找最早会发生的时间事件,以此作为aeApiPoll需要传入的超时时间:
代码语言:javascript复制int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
// 最早发生的时间事件
shortest = aeSearchNearestTimer(eventLoop);
long long ms =
shortest->when_sec - now_sec)*1000
shortest->when_ms - now_ms;
…………
// 阻塞等待文件事件发生
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j ) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
// 处理文件事件,即根据类型执行rfileProc或wfileProc
}
// 处理时间事件
processed = processTimeEvents(eventLoop);
}
aeProcessEvents
都会先计算最近的时间事件发生所需要等待的时间,然后调用 aeApiPoll
方法在这段时间中等待事件的发生,在这段时间中如果发生了文件事件,就会优先处理文件事件,否则就会一直等待,直到最近的时间事件需要触发
d. 从上一步的等待操作中唤醒,有两种情况:如果是I/O事件发生了,那么就根据触发的事件查到I/O回调函数,进行调用;如果是超时了,那么检查所有注册过的timer事件,对于预期调用时刻超过当前时间的回调函数都进行调用。
三、总结
- Redis主要的处理流程包括接收请求、执行命令,以及周期性地执行后台任务(serverCron),这些都是由这个事件循环驱动的。
- 当请求到来时,I/O事件被触发,事件循环被唤醒,根据请求执行命令并返回响应结果;
- 同时,后台异步任务(如回收过期的key)被拆分成若干小段,由timer事件所触发,夹杂在I/O事件处理的间隙来周期性地运行。
- 这种执行方式允许仅仅使用一个线程来处理大量的请求,并能提供快速的响应时间。当然,这种实现方式之所以能够高效运转,除了事件循环的结构之外,还得益于系统提供的异步的I/O多路复用机制(I/O multiplexing)。
- 事件循环利用I/O多路复用机制,对 CPU 进行时分复用 (多个事件流将 CPU 切割成多个时间片,不同事件流的时间片交替进行),使得多个事件流就可以并发进行。
- 而且,使用单线程事件机制可以避免代码的并发执行,在访问各种数据结构的时候都无需考虑线程安全问题,从而大大降低了实现的复杂度。