Redis源码阅读(六)Redis 6.0的多线程

2022-01-28 17:15:01 浏览数 (1)

Redis单线程是指处理的事件循环的线程是单一的,命令执行主要是依靠单一线程执行的。Redis使用单线程是因为基于内存速度快,而且多路复用也能确保redis能同时处理多个请求,在Redis 6.0引入多线程是因为在某些操作要优化,比如删除操作。

一、Redis 5.0 单线程实现

在客户端与Redis服务器建立连接之后,所有的请求都会执行到readQueryFromClient()方法中,readQueryFromClient()方法会从socket中读取数据放到输入缓冲区querybuf中,接着会调用processInputBufferAndReplicate 中的 processInputBuffer()方法按照RESP协议来解析参数。解析完参数之后会调用processCommand()方法执行具体的命令。在processCommand()中根据命令名称找到对应的命令并调用命令的call()完成具体的操作,命令在执行完成之后都会调用addReply()方法返回执行结果。

但是这里需要注意的是addReply()方法只是把返回的数据写入到输出缓冲区client->buf或者client->reply中,并不执行实际的网络发送操作。

Redis在每次进入事件循环之前,都会先调用beforeSleep()方法,实际的网络发送数据操作时在beforeSleep()方法中完成的。在beforeSleep()中,会调用handleClientsWithPendingWrites()返回数据给客户端:handleClientsWithPendingWrites()中会调用writeToClient()方法把输出缓冲区client->bufclient->reply中的数据通过socket发送给客户端。

二、Redis 6.0 多线程实现

引入多线程说明Redis在有些方面,单线程已经不具有优势了。

因为读写网络的read/write系统调用在Redis执行期间占用了大部分CPU时间,如果把网络读写做成多线程的方式对性能会有很大提升。Redis 的多线程部分只是用来处理网络数据的读写和协议解析,执行命令仍然是单线程。

Redis 引入多线程操作也是出于性能上的考虑,对于一些大键值对的删除操作,通过多线程非阻塞地释放内存空间也能减少对 Redis 主线程阻塞的时间,提高执行的效率。

网上有同学对Redis多线程和单线程版本进行了性能测试,对比显示,Redis的多线程版本性能至少比单线程版本提高了一倍。

接下来,我们Redis 6.0的多线程做个流程介绍:

详细流程:

  1. Redis 启动的时候会调用InitServerLast() 初始化 IO 线程(用户设置了线程数量,且允许多线程读),但是 IO 线程一开始处于阻塞状态。
  2. 每次有新的客户端请求时,主线程会执行到 readQueryFromClient(),在 readQueryFromClient() 中主线程会把 client 对象添加到 server.clients_pending_read 列表中。
  3. 在每次事件之后,也就是在 afterSleep() 中,Redis 主线程会调用 handleClientsWithPendingReadsUsingThreads() 方法,在方法中主线程会把server.clients_pending_read 列表中的 client 对象按照 RoundRobin 算法依次分配到 io_threads_list 队列数组中,并空循环等待所有的IO线程完成读数据操作。
  4. 在主线程等待的过程中,IO线程会从对应的 io_threads_list 队列中获取client对象,依次调用readQueryFromClient()方法读取数据并按照RESP协议解析参数。
  5. 等所有IO线程执行完毕后,主线程会调用 processCommandAndResetClient() 方法,该方法会调用processCommand() 执行具体的命令,并把执行结果写入到client对象的输出缓冲区中。
  6. 每次事件循环之前,也就是在 beforeSleep() 中,Redis主线程会调用handleClientsWithPendingWritesUsingThreads()方法,在该方法中,主线程会把所有需要返回数据的client 对象按照 RoundRobin 算法分配到 io_threads_list 队列数组中,并空循环等待所有的IO线程完成写数据的操作。
  7. IO线程会从对应的 io_threads_list 队列中获取client对象,依次调用 writeToClient() 方法把client对象输出缓冲区中的数据通过socket返回给客户端。

(1) 初始化:initThreadedIO

首先,在main()方法中会调用InitServerLast()方法,InitServerLast()方法中会调用initThreadedIO()方法,这个方法的主要作用是初始化IO线程。

代码语言:javascript复制
/* 初始化IO线程 */
void initThreadedIO(void) {
    // 设置标志位,0表示IO线程还没有被激活,1:已激活
    io_threads_active = 0;
 
    // 如果设置的IO线程数量为1,则不启动多余的线程,只使用主线程
    if (server.io_threads_num == 1) return;
 
    // 超过最大线程数128,报错
    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }
 
    // 依次初始化各个IO线程
    for (int i = 0; i < server.io_threads_num; i  ) {
 
        io_threads_list[i] = listCreate();
         
        // 如果io_threads_num=0,表示用户不需要开启多余的IO线程,直接使用主线程进行IO
        if (i == 0) continue;
 
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        io_threads_pending[i] = 0;
        // 当前线程(主线程)会先锁定所有的互斥锁
        pthread_mutex_lock(&io_threads_mutex[i]);
        // 生成新的IO线程,每个IO线程都是执行IOThreadMain()方法,方法参数是当前索引
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}

这里有一点需要注意的是:主线程和IO线程通过共享变量数组io_threads_pending来进行通信

主线程修改io_threads_pending,IO线程读取io_threads_pending,那么就有可能存在线程安全问题。

那么Redis是怎么避免线程安全问题的呢?答案是通过_Atomic限定符。

io_threads_pending变量在声明的时候加上了_Atomic限定符:

代码语言:javascript复制
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];

_Atomic是C11标准中引入的原子操作。被_Atomic修饰的变量被认为是原子变量,对原子变量的操作是不可分割的(Atomicity),且操作结果对其他线程可见,执行的顺序也不能被重排。

所以,io_threads_pending是属于线程安全的变量。

initThreadedIO()方法执行完成之后,io_threads_num个的IO线程已经启动了,且执行的是IOThreadMain()方法:

代码语言:javascript复制
void *IOThreadMain(void *myid) {
    // 首先获取当前线程在io_threads数组中的下标,在io_threads_pending和io_threads_list中的下标是一致的
    long id = (unsigned long)myid;

    while(1) {
        // 先自旋一会,如果自旋期间当前线程被分配了任务的话就可以不用抢夺互斥锁
        // 可以提高性能
        for (int j = 0; j < 1000000; j  ) {
            if (io_threads_pending[id] != 0) break;
        }

        // 如果自旋之后还没有任务分配,IO线程则会调用pthread_mutex_lock()方法来抢夺对应的互斥锁
        // 但主线程在生成具体的IO线程前已经把所有的互斥锁给锁上,所以IO线程此时会因为抢锁失败处于阻塞状态
        // 主线程可以借此停止该线程,因为任务的分配由主线程配置
        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }

        serverAssert(io_threads_pending[id] != 0);

        if (tio_debug) printf("[%ld] %d to handlen", id, (int)listLength(io_threads_list[id]));

        // 在pending计数降至0之前,主线程将永远不会触及io_threads_list
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            // 从io_threads_list列表中获取任务
            // 如果是写任务,则进行写操作
            // 如果是读任务,则进行读操作
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        // 清空列表
        listEmpty(io_threads_list[id]);
        io_threads_pending[id] = 0;

        if (tio_debug) printf("[%ld] Donen", id);
    }
}

IOThreadMain()在一个死循环中完成下面几件事:

  1. 判断当前线程有没有被分配新的任务。 io_threads_pending数组保存的是每个线程被分配的任务client对象的个数(由主线程来进行分配)。 如果io_threads_pending[id]>0,则表示有新的任务需要处理。 在判断io_threads_pending[id](id是当前线程在数组中的索引)是否大于0的时候,IO线程会先自旋一会。如果在自旋期间主线程就给当前IO线程分配了任务的话,IO线程就不会去抢夺互斥锁(可以节省了抢夺互斥锁的开销)。 如果自旋之后还没有任务分配,IO线程则会调用pthread_mutex_lock()方法来抢夺对应的互斥锁。 之前提到过在initThreadedIO()方法中主线程在生成具体的IO线程之前会先调用pthread_mutex_lock()把所有的互斥锁给锁上。所以IO线程此时会因为抢锁失败处于阻塞状态。
  2. 执行具体的读操作或者写操作。 如果IO线程被分配到读写任务,就会进行具体的读写操作。 每个IO线程都会遍历自己的io_threads_list[id]任务队列,对队列中的每一个client对象执行具体的读写操作。 变量io_threads_op标识当前线程需要进行的操作: 如果是IO_THREADS_OP_READ,表示读操作,则所有的IO线程都会调用readQueryFromClient()方法读取客户端的请求; 如果是IO_THREADS_OP_WRITE,表示写操作,则所有的IO线程都会调用writeToClient()方法把各个client对象的输出缓冲区数据通过socket返回给客户端。

注意:所有的IO线程,只会同时进行读操作或者进行写操作。

(2) 读请求:readQueryFromClient

每次有新客户端请求的时候,主线程会执行到readQueryFromClient(),用以读取客户端发送的请求。

代码语言:javascript复制
void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;

    // 判断是否需要把读数据请求放到IO线程中去执行
    if (postponeClientRead(c)) return;

    //... 省略代码

    // 处理输入buffer的主流程
    // client输入缓冲区中还有更多数据,需要继续对其进行解析,以检查是否有完整的命令要执行
    processInputBuffer(c);
}

readQueryFromClient()中,会调用postponeClientRead()方法来判断是否需要把读数据请求放到IO线程中去执行

代码语言:javascript复制
int postponeClientRead(client *c) {
    if (io_threads_active &&
        server.io_threads_do_reads &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
    {
        // 给client对象的标志位增加CLIENT_PENDING_READ,这很重要
        c->flags |= CLIENT_PENDING_READ;
        // 把client对象添加到server.clients_pending_read列表中
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
        return 0;
    }
}

如果IO线程已激活,并且当前client的标志位不包含 CLIENT_MASTER、CLIENT_SLAVE、CLIENT_PENDING_READ,则先给当前client对象增加 CLIENT_PENDING_READ 标志位,然后把当前client对象添加到server.clients_pending_read列表末尾并返回1。

postponeClientRead()返回1之后,readQueryFromClient()方法随即返回,结束执行。

(3) 事件循环阻塞后回调:handleClientsWithPendingReadsUsingThreads

Redis在每次事件之后都会调用afterSleep()方法,在afterSleep()方法中会调用handleClientsWithPendingReadsUsingThreads()方法。

代码语言:javascript复制
int handleClientsWithPendingReadsUsingThreads(void) {
    // 判断是否使用多线程进行读
    if (!io_threads_active || !server.io_threads_do_reads) return 0;
    // 需要读取数据的client对象保存在server.clients_pending_read中
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;

    if (tio_debug) printf("%d TOTAL READ pending clientsn", processed);

    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    // 1. 按照RoundRobin算法分配读任务
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id  ;
    }

    // 2. 设置读操作标志位并统计各个IO线程任务数
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 0; j < server.io_threads_num; j  ) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    // 3. 等待所有的线程处理完了所有的client的读数据操作
    while(1) {
        unsigned long pending = 0; 	// pending表示所有的线程加起来需要处理的client的数量
        for (int j = 0; j < server.io_threads_num; j  )
            pending  = io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O READ All threads finshedn");

    // 再次运行客户端列表以处理新缓冲区
    listRewind(server.clients_pending_read,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        if (c->flags & CLIENT_PENDING_COMMAND) {
            c->flags &= ~ CLIENT_PENDING_COMMAND;
            // 4. 执行命令
            processCommandAndResetClient(c);
        }
        // 5. 如果还有数据需要读取的话读取数据
        processInputBufferAndReplicate(c);
    }
    listEmpty(server.clients_pending_read);
    return processed;
}

handleClientsWithPendingReadsUsingThreads()方法主要完成下面几个任务:

1. 主线程按照RoundRobin算法给IO线程分配任务。

2. 主线程设置读操作标志位并统计各个IO线程任务数。

3. 主线程空循环等待所有的IO线程处理完了所有的client的读数据操作。

此时io_threads_op = IO_THREADS_OP_READ,IO线程会执行readQueryFromClient()方法进行读数据操作。

【之前看到这里时,产生了一个疑问在前面的readQueryFromClient()方法中会把client对象添加到server.clients_pending_read列表中。现在IO线程再次调用readQueryFromClient()方法,会不会又把当前client添加到server.clients_pending_read列表中然后形成死循环呢?】答案是不会的。

重新来看一下postponeClientRead()方法:

代码语言:javascript复制
int postponeClientRead(client *c) {
    if (io_threads_active &&
        server.io_threads_do_reads &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
    {
        // 给client对象的标志位增加CLIENT_PENDING_READ,这很重要
        c->flags |= CLIENT_PENDING_READ;
        // 把client对象添加到server.clients_pending_read列表中
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
        return 0;
    }
}

在主线程把client对象添加到server.clients_pending_read列表之前,会先设置对应的client的CLIENT_PENDING_READ标志位,所以在IO线程调用readQueryFromClient()方法的时候不会重复进行添加,会继续往下执行。

在Redis5的版本中,主线程调用readQueryFromClient()读取数据,readQueryFromClient()又会调用processInputBuffer()方法解析参数,解析完参数之后processInputBuffer()会立即调用processCommand()方法执行命令,并把执行结果写入到输出缓冲区中。也就是说,在Redis6之前的版本中只要调用了readQueryFromClient()方法就会执行具体的命令。但是,在Redis6中不能这样做,如果这样做的话,那IO线程就不只是读数据了,还会执行命令,这样的话多个IO线程同时执行命令的话,可能会出现线程安全问题。在Redis6中,readQueryFromClient()最终是调用processInputBuffer()来解析请求参数:

代码语言:javascript复制
void processInputBuffer(client *c) {
    /* Keep processing while there is something in the input buffer */
    while(c->qb_pos < sdslen(c->querybuf)) {

        //...其他省略解析参数的代码

        if (c->argc == 0) {
            resetClient(c);
        } else {
            // 判断当前client是否处于多线程环境
            // 如果是的话,只是给client新增CLIENT_PENDING_COMMAND标志位,不会继续执行命令
            if (c->flags & CLIENT_PENDING_READ) {
                c->flags |= CLIENT_PENDING_COMMAND;
                break;
            }

            // 执行命令
            if (processCommandAndResetClient(c) == C_ERR) {
                return;
            }
        }
    }
    // 省略代码
}

从代码里面可以看到,processInputBuffer()方法在调用processCommandAndResetClient()执行命令之前,会先判断当前的clien是否包含CLIENT_PENDING_READ标志位,如果是的话,则只是给当前的client添加CLIENT_PENDING_COMMAND标志位然后直接返回,并不会继续执行命令。【因此,在IO线程调用readQueryFromClient()方法读取数据之后,会继续调用processInputBuffer()完成参数的解析,但是不会继续执行命令。所以,IO线程只做读数据的操作。】

4. 等所有IO线程读取数据之后由主线程执行具体的命令。

主线程遍历server.clients_pending_read列表,对列表中的每一个client,会判断当前的client是否有CLIENT_PENDING_COMMAND标志位,如果有的话,则会继续调用processCommandAndResetClient(),而processCommandAndResetClient()会调用processCommand()执行具体的命令。

在上一步中分析过,IO线程在调用processInputBuffer()时如果发现client对象包含CLIENT_PENDING_READ标志位后会继续给当前client对象增加CLIENT_PENDING_COMMAND标志位。所以在这一步中,主线程会对server.clients_pending_read列中的所有的client调用processCommandAndResetClient()方法执行具体的命令

5. 如果还有数据没有读取完的话主线程则继续读取数据。

(4) 事件循环阻塞前回调:handleClientsWithPendingWritesUsingThreads

Redis在每次事件开始前都会先调用beforeSleep()方法,在beforeSleep()方法中会调用handleClientsWithPendingWritesUsingThreads()方法:

代码语言:javascript复制
int handleClientsWithPendingWritesUsingThreads(void) {
    // 1. 判断是否还有client对象需要写数据给客户端
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; 

    // 2. 判断是否的确需要使用多IO线程进行数据读写 (如果只有少量client,则不需要使用多线程)
    if (stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }

    // 3. 如果IO线程没有激活的话则开启IO线程
    if (!io_threads_active) startThreadedIO();

    if (tio_debug) printf("%d TOTAL WRITE pending clientsn", processed);

    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    // 4. 按照RoundRobin算法把需要返回数据的client对象分配给IO线程
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id  ;
    }

    // 5. 设置标志位为写操作,统计各个io线程需要处理的client的个数
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 0; j < server.io_threads_num; j  ) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }

    // 6. 空循环,监听、等待所有的IO线程完成IO读写
    while(1) {
        unsigned long pending = 0;
        for (int j = 0; j < server.io_threads_num; j  )
            pending  = io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O WRITE All threads finshedn");

    // 7. 如果还有数据没有写完的话则继续处理
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    
    // 8. 清空需要写数据的client对象列表
    listEmpty(server.clients_pending_write);
    return processed;
}

handleClientsWithPendingWritesUsingThreads()主要完成下面几个操作:

1. 判断当前需要返回数据给客户端的client对象的个数。

Redis把需要返回数据的client对象保存在server.clients_pending_write列表中;如果没有需要处理的client对象则直接返回。

2. 判断是否有必要使用多IO线程进行数据处理。

Redis会调用stopThreadedIOIfNeeded()方法来判断是否的确需要使用多IO线程(依据:当前需要处理的Client对象的数量 > 两倍的IO线程数量)

代码语言:javascript复制
int stopThreadedIOIfNeeded(void) {
    int pending = listLength(server.clients_pending_write);

    if (server.io_threads_num == 1) return 1;

    // 只要当前需要处理的client对象的数量超过两倍的IO线程的数量的情况下才会使用多线程
    if (pending < (server.io_threads_num*2)) {
        if (io_threads_active) stopThreadedIO();
        return 1;
    } else {
        return 0;
    }
}

如果不需要使用多IO线程,则把对应的互斥锁给锁上了,以及设置激活标志位io_threads_active=0,然后依然是由主线程调用handleClientsWithPendingWrites()方法完成数据的返回操作。

3. 如果需要使用多IO线程且IO线程还没激活的情况下则调用startThreadedIO()激活IO线程。

代码语言:javascript复制
void startThreadedIO(void) {
    if (tio_debug) { printf("S"); fflush(stdout); }
    if (tio_debug) printf("--- STARTING THREADED IO ---n");
    serverAssert(io_threads_active == 0);
    for (int j = 0; j < server.io_threads_num; j  )
        // 把所有的互斥锁给释放掉
        pthread_mutex_unlock(&io_threads_mutex[j]);
    // 设置激活标志位为1
    io_threads_active = 1;
}

在主线程释放锁之后,被阻塞的IO线程会抢到锁从而继续判断有没有被分配任务。

4. 主线程按照Round Robin算法把需要返回数据给客户端的client分配到io_threads_list数组中。 5. 设置io_threads_op为写操作,同时统计各个IO线程需要处理的client对象的个数,并写入对应的io_threads_pending数组中。

6. 主线程空循环等待所有的IO线程执行完成。

【从这里可以看到,当IO线程在执行具体的读写操作的时候,主线程是属于空循环等待状态的。】

7. 如果还有数据没有写完的话则由主线程继续处理。 8. 主线程清空clients_pending_write

从这整个过程可以看下来,当主线程执行的时候,IO线程基本上处于阻塞或者自旋空循环的状态,而IO线程执行读写操作的时候,主线程处于自旋空循环状态。两个之间通过_Atomic类型的变量来通信,所以从根本上保证了线程安全问题。

总体流程:

流程简述:

1、主线程负责接收建立连接请求,获取 socket 放入全局等待读处理队列

2、主线程处理完读事件之后,通过 RR(Round Robin) 将这些连接分配给这些 IO 线程

3、主线程阻塞等待 IO 线程读取 socket 完毕

4、主线程通过单线程的方式执行请求命令,请求数据读取并解析完成,但并不执行

5、主线程阻塞等待 IO 线程将数据回写 socket 完毕

6、解除绑定,清空等待队列

0 人点赞