继续上一篇文章继续讲,上次我们讲了del涉及到的同步删除的整个逻辑,del删除会通过参数走到dbSyncDelete方法,然而unlink则会走dbAsyncDelete方法。这里我们直接从dbAsyncDelete这个方法开始讲。
dbAsyncDelete方法的执行是在redis/src/lazyfree.c里面。
代码语言:javascript复制/* Delete a key, value, and associated expiration entry if any, from the DB.
* If there are enough allocations to free the value object may be put into
* a lazy free list instead of being freed synchronously. The lazy free list
* will be reclaimed in a different bio.c thread. */
#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
/* 从db->expires中删除key,只是删除其指针而已,并没有删除实际值 */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
/* If the value is composed of a few allocations, to free in a lazy way
* is actually just slower... So under a certain limit we just free
* the object synchronously. */
/*
* 在字典中摘除这个key(没有真正删除,只是查不到而已),如果被摘除的dictEntry不为
* 空就去执行下面的释放逻辑
*/
dictEntry *de = dictUnlink(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);
/* Tells the module that the key has been unlinked from the database. */
moduleNotifyKeyUnlink(key,val);
/* lazy_free并不是完全异步的,而是先评估释放操作所需工作量,如果影响较小就直接在主线程中删除了 */
size_t free_effort = lazyfreeGetFreeEffort(key,val);
/* If releasing the object is too much work, do it in the background
* by adding the object to the lazy free list.
* Note that if the object is shared, to reclaim it now it is not
* possible. This rarely happens, however sometimes the implementation
* of parts of the Redis core may call incrRefCount() to protect
* objects, and then call dbDelete(). In this case we'll fall
* through and reach the dictFreeUnlinkedEntry() call, that will be
* equivalent to just calling decrRefCount().
* 如果释放这个对象需要做大量的工作,就把他放到异步线程里做
* 但如果这个对象是共享对象(refcount > 1)就不能直接释放了,当然这很少发送,但有可能redis
* 核心会调用incrRefCount来保护对象,然后调用dbDelete。这我只需要直接调用dictFreeUnlinkedEntry,
* 等价于调用decrRefCount */
if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
// 异步释放 1,原子操作 atomicIncr(lazyfree_objects,1);
// 将 value 的释放添加到异步线程队列中去,后台处理, 任务类型为 异步释放内存 bioCreateLazyFreeJob(lazyfreeFreeObject,1, val);
设置val为NULL, 以便在外部进行删除时忽略释放value相关内存
dictSetVal(db->dict,de,NULL);
}
}
/* 释放键值对所占用的内存,如果是lazyFree,val已经是null了,只需要释放key的内存即可 */
if (de) {
dictFreeUnlinkedEntry(db->dict,de);
if (server.cluster_enabled) slotToKeyDel(key->ptr);
return 1;
} else {
return 0;
}
}
上面的函数执行之后,就是添加异步任务到线程中,执行bioCreateLazyFreeJob函数,再执行bioSubmitJob,方法在redis/src/bio.c文件中
代码语言:javascript复制void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {
va_list valist;
/* Allocate memory for the job structure and all required
* arguments */
struct bio_job *job = zmalloc(sizeof(*job) sizeof(void *) * (arg_count));
job->free_fn = free_fn;
va_start(valist, arg_count);
for (int i = 0; i < arg_count; i ) {
job->free_args[i] = va_arg(valist, void *);
}
va_end(valist);
bioSubmitJob(BIO_LAZY_FREE, job);
}
void bioSubmitJob(int type, struct bio_job *job) {
job->time = time(NULL);
// 多线程需要加锁,把待处理的job添加到队列末尾
pthread_mutex_lock(&bio_mutex[type]);
listAddNodeTail(bio_jobs[type],job);
bio_pending[type] ;
// 唤醒任务线程
pthread_cond_signal(&bio_newjob_cond[type]);
pthread_mutex_unlock(&bio_mutex[type]);
}
后台线程任务框架调用的方法是bioProcessBackgroundJobs,线程根据不同的类型的后台线程执行相关操作。方法位于redis/src/bio.c。
代码语言:javascript复制/* 根据参数创建不同的后台线程 */
void *bioProcessBackgroundJobs(void *arg) {
struct bio_job *job;
unsigned long type = (unsigned long) arg;
sigset_t sigset;
/* Check that the type is within the right interval. */
if (type >= BIO_NUM_OPS) {
serverLog(LL_WARNING,
"Warning: bio thread started with wrong type %lu",type);
return NULL;
}
switch (type) {
case BIO_CLOSE_FILE:
redis_set_thread_title("bio_close_file");
break;
case BIO_AOF_FSYNC:
redis_set_thread_title("bio_aof_fsync");
break;
case BIO_LAZY_FREE:
redis_set_thread_title("bio_lazy_free");
break;
}
redisSetCpuAffinity(server.bio_cpulist);
makeThreadKillable();
pthread_mutex_lock(&bio_mutex[type]);
/* Block SIGALRM so we are sure that only the main thread will
* receive the watchdog signal. */
sigemptyset(&sigset);
sigaddset(&sigset, SIGALRM);
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
serverLog(LL_WARNING,
"Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
while(1) {
listNode *ln;
/* The loop always starts with the lock hold. */
if (listLength(bio_jobs[type]) == 0) {
// 注意此处将会释放锁哟,以便外部可以添加任务进来 pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
continue;
}
/* Pop the job from the queue. */
ln = listFirst(bio_jobs[type]);
job = ln->value;
/* It is now possible to unlock the background system as we know have
* a stand alone job structure to process.*/
pthread_mutex_unlock(&bio_mutex[type]);
/* 根据任务类型执行不同的逻辑 */
if (type == BIO_CLOSE_FILE) {
close(job->fd);
} else if (type == BIO_AOF_FSYNC) {
redis_fsync(job->fd);
} else if (type == BIO_LAZY_FREE) {
job->free_fn(job->free_args);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
zfree(job);
/* Lock again before reiterating the loop, if there are no longer
* jobs to process we'll block again in pthread_cond_wait(). */
pthread_mutex_lock(&bio_mutex[type]);
listDelNode(bio_jobs[type],ln);
bio_pending[type]--;
/* Unblock threads blocked on bioWaitStepOfType() if any. */
pthread_cond_broadcast(&bio_step_cond[type]);
}
}
关于unlink的异步操作主要涉及到这几个函数的执行,这里unlink操作删除的时候会先评估释放数据的工作量,如果较小就会直接主线程做删除操作。
这里分享的源码希望对大家有帮助,如果有兴趣进一步了解,可以私我要一下整个redis翻译后的代码。