深入理解redis的一个del和unlink的命令的执行过程-2

2022-04-25 09:26:11 浏览数 (1)

继续上一篇文章继续讲,上次我们讲了del涉及到的同步删除的整个逻辑,del删除会通过参数走到dbSyncDelete方法,然而unlink则会走dbAsyncDelete方法。这里我们直接从dbAsyncDelete这个方法开始讲。

dbAsyncDelete方法的执行是在redis/src/lazyfree.c里面。

代码语言:javascript复制
/* Delete a key, value, and associated expiration entry if any, from the DB.
 * If there are enough allocations to free the value object may be put into
 * a lazy free list instead of being freed synchronously. The lazy free list
 * will be reclaimed in a different bio.c thread. */
#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
    /* 从db->expires中删除key,只是删除其指针而已,并没有删除实际值 */
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

    /* If the value is composed of a few allocations, to free in a lazy way
     * is actually just slower... So under a certain limit we just free
     * the object synchronously. */
    /*
    * 在字典中摘除这个key(没有真正删除,只是查不到而已),如果被摘除的dictEntry不为
    * 空就去执行下面的释放逻辑 
    */
    dictEntry *de = dictUnlink(db->dict,key->ptr);
    if (de) {
        robj *val = dictGetVal(de);
        /* Tells the module that the key has been unlinked from the database. */
        moduleNotifyKeyUnlink(key,val);

        /* lazy_free并不是完全异步的,而是先评估释放操作所需工作量,如果影响较小就直接在主线程中删除了 */
        size_t free_effort = lazyfreeGetFreeEffort(key,val);


        /* If releasing the object is too much work, do it in the background
         * by adding the object to the lazy free list.
         * Note that if the object is shared, to reclaim it now it is not
         * possible. This rarely happens, however sometimes the implementation
         * of parts of the Redis core may call incrRefCount() to protect
         * objects, and then call dbDelete(). In this case we'll fall
         * through and reach the dictFreeUnlinkedEntry() call, that will be
         * equivalent to just calling decrRefCount(). 
         * 如果释放这个对象需要做大量的工作,就把他放到异步线程里做
         * 但如果这个对象是共享对象(refcount > 1)就不能直接释放了,当然这很少发送,但有可能redis
         * 核心会调用incrRefCount来保护对象,然后调用dbDelete。这我只需要直接调用dictFreeUnlinkedEntry,
         * 等价于调用decrRefCount */
        if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
           // 异步释放 1,原子操作 atomicIncr(lazyfree_objects,1);
           // 将 value 的释放添加到异步线程队列中去,后台处理, 任务类型为 异步释放内存 bioCreateLazyFreeJob(lazyfreeFreeObject,1, val);
           设置val为NULL, 以便在外部进行删除时忽略释放value相关内存
            dictSetVal(db->dict,de,NULL);
        }
    }

    /* 释放键值对所占用的内存,如果是lazyFree,val已经是null了,只需要释放key的内存即可 */
    if (de) {
        dictFreeUnlinkedEntry(db->dict,de);
        if (server.cluster_enabled) slotToKeyDel(key->ptr);
        return 1;
    } else {
        return 0;
    }
}

上面的函数执行之后,就是添加异步任务到线程中,执行bioCreateLazyFreeJob函数,再执行bioSubmitJob,方法在redis/src/bio.c文件中

代码语言:javascript复制
void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {
    va_list valist;
    /* Allocate memory for the job structure and all required
     * arguments */
    struct bio_job *job = zmalloc(sizeof(*job)   sizeof(void *) * (arg_count));
    job->free_fn = free_fn;

    va_start(valist, arg_count);
    for (int i = 0; i < arg_count; i  ) {
        job->free_args[i] = va_arg(valist, void *);
    }
    va_end(valist);
    bioSubmitJob(BIO_LAZY_FREE, job);
}

void bioSubmitJob(int type, struct bio_job *job) {
    job->time = time(NULL);
    // 多线程需要加锁,把待处理的job添加到队列末尾
    pthread_mutex_lock(&bio_mutex[type]);
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]  ;
    // 唤醒任务线程
    pthread_cond_signal(&bio_newjob_cond[type]);
    pthread_mutex_unlock(&bio_mutex[type]);
}

后台线程任务框架调用的方法是bioProcessBackgroundJobs,线程根据不同的类型的后台线程执行相关操作。方法位于redis/src/bio.c。

代码语言:javascript复制
/* 根据参数创建不同的后台线程 */
void *bioProcessBackgroundJobs(void *arg) {
    struct bio_job *job;
    unsigned long type = (unsigned long) arg;
    sigset_t sigset;

    /* Check that the type is within the right interval. */
    if (type >= BIO_NUM_OPS) {
        serverLog(LL_WARNING,
            "Warning: bio thread started with wrong type %lu",type);
        return NULL;
    }

    switch (type) {
    case BIO_CLOSE_FILE:
        redis_set_thread_title("bio_close_file");
        break;
    case BIO_AOF_FSYNC:
        redis_set_thread_title("bio_aof_fsync");
        break;
    case BIO_LAZY_FREE:
        redis_set_thread_title("bio_lazy_free");
        break;
    }

    redisSetCpuAffinity(server.bio_cpulist);
    makeThreadKillable();
    pthread_mutex_lock(&bio_mutex[type]);
    /* Block SIGALRM so we are sure that only the main thread will
     * receive the watchdog signal. */
    sigemptyset(&sigset);
    sigaddset(&sigset, SIGALRM);
    if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
        serverLog(LL_WARNING,
            "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));

    while(1) {
        listNode *ln;

        /* The loop always starts with the lock hold. */
        if (listLength(bio_jobs[type]) == 0) {
            // 注意此处将会释放锁哟,以便外部可以添加任务进来 pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
            continue;
        }
        /* Pop the job from the queue. */
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        /* It is now possible to unlock the background system as we know have
         * a stand alone job structure to process.*/
        pthread_mutex_unlock(&bio_mutex[type]);

        /* 根据任务类型执行不同的逻辑 */
        if (type == BIO_CLOSE_FILE) {
            close(job->fd);
        } else if (type == BIO_AOF_FSYNC) {
            redis_fsync(job->fd);
        } else if (type == BIO_LAZY_FREE) {
            job->free_fn(job->free_args);
        } else {
            serverPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        zfree(job);

        /* Lock again before reiterating the loop, if there are no longer
         * jobs to process we'll block again in pthread_cond_wait(). */
        pthread_mutex_lock(&bio_mutex[type]);
        listDelNode(bio_jobs[type],ln);
        bio_pending[type]--;

        /* Unblock threads blocked on bioWaitStepOfType() if any. */
        pthread_cond_broadcast(&bio_step_cond[type]);
    }
}

关于unlink的异步操作主要涉及到这几个函数的执行,这里unlink操作删除的时候会先评估释放数据的工作量,如果较小就会直接主线程做删除操作。

这里分享的源码希望对大家有帮助,如果有兴趣进一步了解,可以私我要一下整个redis翻译后的代码。

0 人点赞