redis过期删除机制(redis过期策略和删除策略)

2022-08-01 11:04:57 浏览数 (1)

大家好,又见面了,我是你们的朋友全栈君。

在Redis中,内存的大小是有限的,所以为了防止内存饱和,需要实现某种键淘汰策略。主要有两种方法,一种是当Redis内存不足时所采用的内存释放策略。另一种是对过期键进行删除的策略,也可以在某种程度上释放内存。

1、内存释放的策略

Redis中有专门释放内存的函数:freeMmoryIfNeeded。每当执行一个命令的时候,就会调用该函数来检测内存是否够用。如果已用内存大于最大内存限制,它就会进行内存释放。

/* Check if we are over the memory usage limit. If we are not, no need * to subtract the slaves output buffers. We can just return ASAP. */ mem_reported = zmalloc_used_memory(); if (mem_reported <= server.maxmemory) return C_OK; /* Remove the size of slaves output buffers and AOF buffer from the * count of used memory. */ mem_used = mem_reported; size_t overhead = freeMemoryGetNotCountedMemory(); mem_used = (mem_used > overhead) ? mem_used-overhead : 0; /* Check if we are still over the memory limit. */ if (mem_used <= server.maxmemory) return C_OK; /* Compute how much memory we need to free. */ mem_tofree = mem_used - server.maxmemory; mem_freed = 0; if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) goto cant_free; /* We need to free memory, but policy forbids. */ latencyStartMonitor(latency); cant_free: /* We are here if we are not able to reclaim memory. There is only one * last thing we can try: check if the lazyfree thread has jobs in queue * and wait... */ while(bioPendingJobsOfType(BIO_LAZY_FREE)) { if (((mem_reported - zmalloc_used_memory()) mem_freed) >= mem_tofree) break; usleep(1000); } return C_ERR;

当需要进行内存释放的时候,需要用某种策略对保存的的对象进行删除。Redis有八种策略:

configEnum maxmemory_policy_enum[] = { {"volatile-lru", MAXMEMORY_VOLATILE_LRU}, {"volatile-lfu", MAXMEMORY_VOLATILE_LFU}, {"volatile-random",MAXMEMORY_VOLATILE_RANDOM}, {"volatile-ttl",MAXMEMORY_VOLATILE_TTL}, {"allkeys-lru",MAXMEMORY_ALLKEYS_LRU}, {"allkeys-lfu",MAXMEMORY_ALLKEYS_LFU}, {"allkeys-random",MAXMEMORY_ALLKEYS_RANDOM}, {"noeviction",MAXMEMORY_NO_EVICTION}, {NULL, 0} };

# MAXMEMORY POLICY: how Redis will select what to remove when maxmemory # is reached. You can select among five behaviors: # # volatile-lru -> Evict using approximated LRU among the keys with an expire set. # allkeys-lru -> Evict any key using approximated LRU. # volatile-lfu -> Evict using approximated LFU among the keys with an expire set. # allkeys-lfu -> Evict any key using approximated LFU. # volatile-random -> Remove a random key among the ones with an expire set. # allkeys-random -> Remove a random key, any key. # volatile-ttl -> Remove the key with the nearest expire time (minor TTL) # noeviction -> Don't evict anything, just return an error on write operations. # # LRU means Least Recently Used # LFU means Least Frequently Used # # Both LRU, LFU and volatile-ttl are implemented using approximated # randomized algorithms. # # Note: with any of the above policies, Redis will return an error on write # operations, when there are no suitable keys for eviction. # # At the date of writing these commands are: set setnx setex append # incr decr rpush lpush rpushx lpushx linsert lset rpoplpush sadd # sinter sinterstore sunion sunionstore sdiff sdiffstore zadd zincrby # zunionstore zinterstore hset hsetnx hmset hincrby incrby decrby # getset mset msetnx exec sort # # The default is: # # maxmemory-policy noeviction

(1)volatile-lru:从已设置过期时间的数据集(server.db[i].expires)中挑选最近最少使用的数据淘汰

(2)allkeys-lru:从数据集(server.db[i].dict)中挑选最近最少使用的数据淘汰

(3)volatile-ttl:从已设置过期时间的数据集(server.db[i].expires)中挑选将要过期的数据淘汰 (4)volatile-lfu:从已设置过期时间的数据集(server.db[i].expires)中挑选最近使用次数最少的数据淘汰

(5) allkeys-lfu:从数据集(server.db[i].dict)中挑选最近使用次数最少的数据淘汰

/*如果是使用LRU算法则是采取局部的LRU算法,随机找到若干个键值删除其中的LRU算法选择的键*/ if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) || server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { struct evictionPoolEntry *pool = EvictionPoolLRU; while(bestkey == NULL) { unsigned long total_keys = 0, keys; /* We don't want to make local-db choices when expiring keys, * so to start populate the eviction pool sampling keys from * every DB. */ for (i = 0; i < server.dbnum; i ) { db = server.db i; /*其中要先判断是从过期及expires中删除键还是从所有数据集dict中删除键*/ dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ? db->dict : db->expires; if ((keys = dictSize(dict)) != 0) { evictionPoolPopulate(i, dict, db->dict, pool); total_keys = keys; } } if (!total_keys) break; /* No keys to evict. */ /* Go backward from best to worst element to evict. */ for (k = EVPOOL_SIZE-1; k >= 0; k--) { if (pool[k].key == NULL) continue; bestdbid = pool[k].dbid; /*其中要先判断是从过期及expires中删除键还是从所有数据集dict中删除键*/ if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { de = dictFind(server.db[pool[k].dbid].dict, pool[k].key); } else { de = dictFind(server.db[pool[k].dbid].expires, pool[k].key); } /* Remove the entry from the pool. */ if (pool[k].key != pool[k].cached) sdsfree(pool[k].key); pool[k].key = NULL; pool[k].idle = 0; /* If the key exists, is our pick. Otherwise it is * a ghost and we need to try the next element. */ if (de) { bestkey = dictGetKey(de); break; } else { /* Ghost... Iterate again. */ } } } }

(6)volatile-random:从已设置过期时间的数据集(server.db[i].expires)中任意选择数据淘汰

(7)allkeys-random:从数据集(server.db[i].dict)中任意选择数据淘汰

/* volatile-random and allkeys-random policy 如果使用随机算法,则从每一个db中挑选一个随机键进行删除*/ else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM || server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM) { /* When evicting a random key, we try to evict a key for * each DB, so we use the static 'next_db' variable to * incrementally visit all DBs. */ for (i = 0; i < server.dbnum; i ) { j = ( next_db) % server.dbnum; db = server.db j; /*其中要先判断是从过期及expires中删除键还是从所有数据集dict中删除键*/ dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ? db->dict : db->expires; if (dictSize(dict) != 0) { de = dictGetRandomKey(dict); bestkey = dictGetKey(de); bestdbid = j; break; } } }

(8)no-enviction(驱逐):禁止驱逐数据[默认策略]

if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) goto cant_free; /* We need to free memory, but policy forbids. */ cant_free: /* We are here if we are not able to reclaim memory. There is only one * last thing we can try: check if the lazyfree thread has jobs in queue * and wait... */ while(bioPendingJobsOfType(BIO_LAZY_FREE)) { if (((mem_reported - zmalloc_used_memory()) mem_freed) >= mem_tofree) break; usleep(1000); } return C_ERR;

2、过期键删除的策略

(1)惰性删除[被动删除]

惰性删除由db.c/expireIfNeeded()函数实现,所有读写数据库的命令在执行之前都会调用expireIfNeeded()函数对要操作的key进行检查。如果key已经过期,那么将会将key从数据库中删除

代码语言:javascript复制
/* This function is called when we are going to perform some operation
 * in a given key, but such key may be already logically expired even if
 * it still exists in the database. The main way this function is called
 * is via lookupKey*() family of functions.
 *
 * The behavior of the function depends on the replication role of the
 * instance, because slave instances do not expire keys, they wait
 * for DELs from the master for consistency matters. However even
 * slaves will try to have a coherent return value for the function,
 * so that read commands executed in the slave side will be able to
 * behave like if the key is expired even if still present (because the
 * master has yet to propagate the DEL).
 *
 * In masters as a side effect of finding a key which is expired, such
 * key will be evicted from the database. Also this may trigger the
 * propagation of a DEL/UNLINK command in AOF / replication stream.
 *
 * The return value of the function is 0 if the key is still valid,
 * otherwise the function returns 1 if the key is expired. */
int expireIfNeeded(redisDb *db, robj *key) {
    mstime_t when = getExpire(db,key);
    mstime_t now;

    if (when < 0) return 0; /* No expire for this key */

    /* Don't expire anything while loading. It will be done later. */
    if (server.loading) return 0;

    /* If we are in the context of a Lua script, we pretend that time is
     * blocked to when the Lua script started. This way a key can expire
     * only the first time it is accessed and not in the middle of the
     * script execution, making propagation to slaves / AOF consistent.
     * See issue #1525 on Github for more information. */
    now = server.lua_caller ? server.lua_time_start : mstime();

    /* If we are running in the context of a slave, return ASAP:
     * the slave key expiration is controlled by the master that will
     * send us synthesized DEL operations for expired keys.
     *
     * Still we try to return the right information to the caller,
     * that is, 0 if we think the key should be still valid, 1 if
     * we think the key is expired at this time. */
    if (server.masterhost != NULL) return now > when;

    /* Return when this key has not expired */
    if (now <= when) return 0;

    /* Delete the key */
    server.stat_expiredkeys  ;
    propagateExpire(db,key,server.lazyfree_lazy_expire);
    notifyKeyspaceEvent(NOTIFY_EXPIRED,
        "expired",key,db->id);
    return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
                                         dbSyncDelete(db,key);
}

/* EXISTS key1 key2 ... key_N. * Return value is the number of keys existing. */ void existsCommand(client *c) { long long count = 0; int j; for (j = 1; j < c->argc; j ) { expireIfNeeded(c->db,c->argv[j]); if (dbExists(c->db,c->argv[j])) count ; } addReplyLongLong(c,count); }

(2)定期删除[主动删除]

定期删除由函数redis.c/activeExpireCycle()函数实现,每当server在调用beforeSleep()和serverCron()时,都会被调用

  1. Redis配置项hz定义了serverCron任务的执行周期,默认为10,即CPU空闲时每秒执行10次;
  2. 每次过期key清理的时间不超过CPU时间的25%,即若hz=1,则一次清理时间最大为250ms,若hz=10,则一次清理时间最大为25ms;
  3. 清理时依次遍历所有的db;
  4. 从db中随机取20个key,判断是否过期,若过期,则逐出;
  5. 若有5个以上key过期,则重复步骤4,否则遍历下一个db;
  6. 在清理过程中,若达到了25%CPU时间,退出清理过程;

int main(int argc, char **argv) { ..... aeSetBeforeSleepProc(server.el,beforeSleep); ...... } void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) { eventLoop->beforesleep = beforesleep; } void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } } /* This function gets called every time Redis is entering the 每次事件循环执行的时候,逐出部分过期Key; * main loop of the event driven library, that is, before to sleep * for ready file descriptors. */ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); /* Call the Redis Cluster before sleep function. Note that this function * may change the state of Redis Cluster (from ok to fail or vice versa), * so it's a good idea to call it before serving the unblocked clients * later in this function. */ if (server.cluster_enabled) clusterBeforeSleep(); /* Run a fast expire cycle (the called function will return * ASAP if a fast cycle is not needed). */ if (server.active_expire_enabled && server.masterhost == NULL) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); /* Send all the slaves an ACK request if at least one client blocked * during the previous event loop iteration. */ if (server.get_ack_from_slaves) { robj *argv[3]; argv[0] = createStringObject("REPLCONF",8); argv[1] = createStringObject("GETACK",6); argv[2] = createStringObject("*",1); /* Not used argument. */ replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); decrRefCount(argv[0]); decrRefCount(argv[1]); decrRefCount(argv[2]); server.get_ack_from_slaves = 0; } /* Unblock all the clients blocked for synchronous replication * in WAIT. */ if (listLength(server.clients_waiting_acks)) processClientsWaitingReplicas(); /* Check if there are clients unblocked by modules that implement * blocking commands. */ moduleHandleBlockedClients(); /* Try to process pending commands for clients that were just unblocked. */ if (listLength(server.unblocked_clients)) processUnblockedClients(); /* Write the AOF buffer on disk */ flushAppendOnlyFile(0); /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(); /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this * time. */ if (moduleCount()) moduleReleaseGIL(); }

/* This is our timer interrupt, called server.hz times per second. 一秒钟执行10次定期操作 CPU空闲时在定期serverCron任务中,逐出部分过期Key * Here is where we do a number of things that need to be done asynchronously. * For instance: * * - Active expired keys collection (it is also performed in a lazy way on * lookup). * - Software watchdog. * - Update some statistic. * - Incremental rehashing of the DBs hash tables. * - Triggering BGSAVE / AOF rewrite, and handling of terminated children. * - Clients timeout of different kinds. * - Replication reconnection. * - Many more... * * Everything directly called here will be called server.hz times per second, * so in order to throttle execution of things we want to do less frequently * a macro is used: run_with_period(milliseconds) { .... } */ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { int j; UNUSED(eventLoop); UNUSED(id); UNUSED(clientData); /* Software watchdog: deliver the SIGALRM that will reach the signal * handler if we don't return here fast enough. */ if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period); /* Update the time cache. */ updateCachedTime(); run_with_period(100) { trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands); trackInstantaneousMetric(STATS_METRIC_NET_INPUT, server.stat_net_input_bytes); trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT, server.stat_net_output_bytes); } /* We have just LRU_BITS bits per object for LRU information. * So we use an (eventually wrapping) LRU clock. * * Note that even if the counter wraps it's not a big problem, * everything will still work but some object will appear younger * to Redis. However for this to happen a given object should never be * touched for all the time needed to the counter to wrap, which is * not likely. * * Note that you can change the resolution altering the * LRU_CLOCK_RESOLUTION define. */ unsigned long lruclock = getLRUClock(); atomicSet(server.lruclock,lruclock); /* Record the max memory used since the server was started. */ if (zmalloc_used_memory() > server.stat_peak_memory) server.stat_peak_memory = zmalloc_used_memory(); /* Sample the RSS here since this is a relatively slow call. */ server.resident_set_size = zmalloc_get_rss(); /* We received a SIGTERM, shutting down here in a safe way, as it is * not ok doing so inside the signal handler. */ if (server.shutdown_asap) { if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0); serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); server.shutdown_asap = 0; } /* Show some info about non-empty databases */ run_with_period(5000) { for (j = 0; j < server.dbnum; j ) { long long size, used, vkeys; size = dictSlots(server.db[j].dict); used = dictSize(server.db[j].dict); vkeys = dictSize(server.db[j].expires); if (used || vkeys) { serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); /* dictPrintStats(server.dict); */ } } } /* Show information about connected clients */ if (!server.sentinel_mode) { run_with_period(5000) { serverLog(LL_VERBOSE, "%lu clients connected (%lu slaves), %zu bytes in use", listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), zmalloc_used_memory()); } } /* We need to do a few operations on clients asynchronously. */ clientsCron(); /* Handle background operations on Redis databases. */ databasesCron(); /* Start a scheduled AOF rewrite if this was requested by the user while * a BGSAVE was in progress. */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.aof_rewrite_scheduled) { rewriteAppendOnlyFileBackground(); } /* Check if a background saving or AOF rewrite in progress terminated. */ if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || ldbPendingChildren()) { int statloc; pid_t pid; if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { int exitcode = WEXITSTATUS(statloc); int bysignal = 0; if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); if (pid == -1) { serverLog(LL_WARNING,"wait3() returned an error: %s. " "rdb_child_pid = %d, aof_child_pid = %d", strerror(errno), (int) server.rdb_child_pid, (int) server.aof_child_pid); } else if (pid == server.rdb_child_pid) { backgroundSaveDoneHandler(exitcode,bysignal); if (!bysignal && exitcode == 0) receiveChildInfo(); } else if (pid == server.aof_child_pid) { backgroundRewriteDoneHandler(exitcode,bysignal); if (!bysignal && exitcode == 0) receiveChildInfo(); } else { if (!ldbRemoveChild(pid)) { serverLog(LL_WARNING, "Warning, detected child with unmatched pid: %ld", (long)pid); } } updateDictResizePolicy(); closeChildInfoPipe(); } } else { /* If there is not a background saving/rewrite in progress check if * we have to save/rewrite now. */ for (j = 0; j < server.saveparamslen; j ) { struct saveparam *sp = server.saveparams j; /* Save if we reached the given amount of changes, * the given amount of seconds, and if the latest bgsave was * successful or if, in case of an error, at least * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */ if (server.dirty >= sp->changes && server.unixtime-server.lastsave > sp->seconds && (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY || server.lastbgsave_status == C_OK)) { serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, (int)sp->seconds); rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); rdbSaveBackground(server.rdb_filename,rsiptr); break; } } /* Trigger an AOF rewrite if needed. */ if (server.aof_state == AOF_ON && server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.aof_rewrite_perc && server.aof_current_size > server.aof_rewrite_min_size) { long long base = server.aof_rewrite_base_size ? server.aof_rewrite_base_size : 1; long long growth = (server.aof_current_size*100/base) - 100; if (growth >= server.aof_rewrite_perc) { serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); rewriteAppendOnlyFileBackground(); } } } /* AOF postponed flush: Try at every cron cycle if the slow fsync * completed. */ if (server.aof_flush_postponed_start) flushAppendOnlyFile(0); /* AOF write errors: in this case we have a buffer to flush as well and * clear the AOF error in case of success to make the DB writable again, * however to try every second is enough in case of 'hz' is set to * an higher frequency. */ run_with_period(1000) { if (server.aof_last_write_status == C_ERR) flushAppendOnlyFile(0); } /* Close clients that need to be closed asynchronous */ freeClientsInAsyncFreeQueue(); /* Clear the paused clients flag if needed. */ clientsArePaused(); /* Don't check return value, just use the side effect.*/ /* Replication cron function -- used to reconnect to master, * detect transfer failures, start background RDB transfers and so forth. */ run_with_period(1000) replicationCron(); /* Run the Redis Cluster cron. */ run_with_period(100) { if (server.cluster_enabled) clusterCron(); } /* Run the Sentinel timer if we are in sentinel mode. */ run_with_period(100) { if (server.sentinel_mode) sentinelTimer(); } /* Cleanup expired MIGRATE cached sockets. */ run_with_period(1000) { migrateCloseTimedoutSockets(); } /* Start a scheduled BGSAVE if the corresponding flag is set. This is * useful when we are forced to postpone a BGSAVE because an AOF * rewrite is in progress. * * Note: this code must be after the replicationCron() call above so * make sure when refactoring this file to keep this order. This is useful * because we want to give priority to RDB savings for replication. */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.rdb_bgsave_scheduled && (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY || server.lastbgsave_status == C_OK)) { rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) server.rdb_bgsave_scheduled = 0; } server.cronloops ; return 1000/server.hz; } /* This function handles 'background' operations we are required to do * incrementally in Redis databases, such as active key expiring, resizing, * rehashing. */ void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ if (server.active_expire_enabled && server.masterhost == NULL) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); } else if (server.masterhost != NULL) { expireSlaveKeys(); } /* Defrag keys gradually. */ if (server.active_defrag_enabled) activeDefragCycle(); /* Perform hash tables rehashing if needed, but only if there are no * other processes saving the DB on disk. Otherwise rehashing is bad * as will cause a lot of copy-on-write of memory pages. */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { /* We use global counters so if we stop the computation at a given * DB we'll be able to start from the successive in the next * cron loop iteration. */ static unsigned int resize_db = 0; static unsigned int rehash_db = 0; int dbs_per_call = CRON_DBS_PER_CALL; int j; /* Don't test more DBs than we have. */ if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum; /* Resize */ for (j = 0; j < dbs_per_call; j ) { tryResizeHashTables(resize_db % server.dbnum); resize_db ; } /* Rehash */ if (server.activerehashing) { for (j = 0; j < dbs_per_call; j ) { int work_done = incrementallyRehash(rehash_db); if (work_done) { /* If the function did some work, stop here, we'll do * more at the next cron loop. */ break; } else { /* If this db didn't need rehash, we'll try the next one. */ rehash_db ; rehash_db %= server.dbnum; } } } } }

/* Try to expire a few timed out keys. The algorithm used is adaptive and * will use few CPU cycles if there are few expiring keys, otherwise * it will get more aggressive to avoid that too much memory is used by * keys that can be removed from the keyspace. * * No more than CRON_DBS_PER_CALL databases are tested at every * iteration. * * This kind of call is used when Redis detects that timelimit_exit is * true, so there is more work to do, and we do it more incrementally from * the beforeSleep() function of the event loop. * * Expire cycle type: * * If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a * "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION * microseconds, and is not repeated again before the same amount of time. * * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is * executed, where the time limit is a percentage of the REDIS_HZ period * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. */ void activeExpireCycle(int type) { /* This function has some global state in order to continue the work * incrementally across calls. */ static unsigned int current_db = 0; /* Last DB tested. */ static int timelimit_exit = 0; /* Time limit hit in previous call? */ static long long last_fast_cycle = 0; /* When last fast cycle ran. */ int j, iteration = 0; int dbs_per_call = CRON_DBS_PER_CALL; long long start = ustime(), timelimit, elapsed; /* When clients are paused the dataset should be static not just from the * POV of clients not being able to write, but also from the POV of * expires and evictions of keys not being performed. */ if (clientsArePaused()) return; if (type == ACTIVE_EXPIRE_CYCLE_FAST) { /* Don't start a fast cycle if the previous cycle did not exit * for time limt. Also don't repeat a fast cycle for the same period * as the fast cycle total duration itself. */ if (!timelimit_exit) return; if (start < last_fast_cycle ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return; last_fast_cycle = start; } /* We usually should test CRON_DBS_PER_CALL per iteration, with * two exceptions: * * 1) Don't test more DBs than we have. * 2) If last time we hit the time limit, we want to scan all DBs * in this iteration, as there is work to do in some DB and we don't want * expired keys to use memory for too much time. */ if (dbs_per_call > server.dbnum || timelimit_exit) dbs_per_call = server.dbnum; /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time * per iteration. Since this function gets called with a frequency of * server.hz times per second, the following is the max amount of * microseconds we can spend in this function. */ timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100; timelimit_exit = 0; if (timelimit <= 0) timelimit = 1; if (type == ACTIVE_EXPIRE_CYCLE_FAST) timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */ /* Accumulate some global stats as we expire keys, to have some idea * about the number of keys that are already logically expired, but still * existing inside the database. */ long total_sampled = 0; long total_expired = 0; for (j = 0; j < dbs_per_call && timelimit_exit == 0; j ) { int expired; redisDb *db = server.db (current_db % server.dbnum); /* Increment the DB now so we are sure if we run out of time * in the current DB we'll restart from the next. This allows to * distribute the time evenly across DBs. */ current_db ; /* Continue to expire if at the end of the cycle more than 25% * of the keys were expired. */ do { unsigned long num, slots; long long now, ttl_sum; int ttl_samples; iteration ; /* If there is nothing to expire try next DB ASAP. */ if ((num = dictSize(db->expires)) == 0) { db->avg_ttl = 0; break; } slots = dictSlots(db->expires); now = mstime(); /* When there are less than 1% filled slots getting random * keys is expensive, so stop here waiting for better times... * The dictionary will be resized asap. */ if (num && slots > DICT_HT_INITIAL_SIZE && (num*100/slots < 1)) break; /* The main collection cycle. Sample random keys among keys * with an expire set, checking for expired ones. */ expired = 0; ttl_sum = 0; ttl_samples = 0; if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP; while (num--) { dictEntry *de; long long ttl; if ((de = dictGetRandomKey(db->expires)) == NULL) break; ttl = dictGetSignedIntegerVal(de)-now; if (activeExpireCycleTryExpire(db,de,now)) expired ; if (ttl > 0) { /* We want the average TTL of keys yet not expired. */ ttl_sum = ttl; ttl_samples ; } total_sampled ; } total_expired = expired; /* Update the average TTL stats for this database. */ if (ttl_samples) { long long avg_ttl = ttl_sum/ttl_samples; /* Do a simple running average with a few samples. * We just use the current estimate with a weight of 2% * and the previous estimate with a weight of 98%. */ if (db->avg_ttl == 0) db->avg_ttl = avg_ttl; db->avg_ttl = (db->avg_ttl/50)*49 (avg_ttl/50); } /* We can't block forever here even if there are many keys to * expire. So after a given amount of milliseconds return to the * caller waiting for the other active expire cycle. */ if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */ elapsed = ustime()-start; if (elapsed > timelimit) { timelimit_exit = 1; server.stat_expired_time_cap_reached_count ; break; } } /* We don't repeat the cycle if there are less than 25% of keys * found expired in the current DB. */ } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4); } elapsed = ustime()-start; latencyAddSampleIfNeeded("expire-cycle",elapsed/1000); /* Update our estimate of keys existing but yet to be expired. * Running average with this sample accounting for 5%. */ double current_perc; if (total_sampled) { current_perc = (double)total_expired/total_sampled; } else current_perc = 0; server.stat_expired_stale_perc = (current_perc*0.05) (server.stat_expired_stale_perc*0.95); }

参考文献

https://yq.aliyun.com/articles/257459

https://blog.csdn.net/u012658346/article/details/51374858

https://blog.csdn.net/caishenfans/article/details/44902651

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/126685.html原文链接:https://javaforall.cn

0 人点赞