redis过期删除机制(redis过期策略和删除策略)

在Redis中,内存的大小是有限的,所以为了防止内存饱和,需要实现某种键淘汰策略。主要有两种方法,一种是当Redis内存不足时所采用的内存释放策略。另一种是对过期键进行删除的策略,也可以在某种程度上释放内存。1、内存释放的策略Redis中有专门释放内存的函数:freeMmoryIfNeeded。每当执行一个命令的时候,就会调用该函数来检测内存是否够用。如果已用内存大于最大内存限制,它就会进行内存释…

大家好,又见面了,我是你们的朋友全栈君。

在Redis中,内存的大小是有限的,所以为了防止内存饱和,需要实现某种键淘汰策略。主要有两种方法,一种是当Redis内存不足时所采用的内存释放策略。另一种是对过期键进行删除的策略,也可以在某种程度上释放内存。

1、内存释放的策略

Redis中有专门释放内存的函数:freeMmoryIfNeeded。每当执行一个命令的时候,就会调用该函数来检测内存是否够用。如果已用内存大于最大内存限制,它就会进行内存释放。

/* Check if we are over the memory usage limit. If we are not, no need
 * to subtract the slaves output buffers. We can just return ASAP. */
mem_reported = zmalloc_used_memory();
if (mem_reported <= server.maxmemory) return C_OK;

/* Remove the size of slaves output buffers and AOF buffer from the
 * count of used memory. */
mem_used = mem_reported;
size_t overhead = freeMemoryGetNotCountedMemory();
mem_used = (mem_used > overhead) ? mem_used-overhead : 0;

/* Check if we are still over the memory limit. */
if (mem_used <= server.maxmemory) return C_OK;

/* Compute how much memory we need to free. */
mem_tofree = mem_used - server.maxmemory;
mem_freed = 0;

if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
    goto cant_free; /* We need to free memory, but policy forbids. */

latencyStartMonitor(latency);
 
 
cant_free:
    /* We are here if we are not able to reclaim memory. There is only one
     * last thing we can try: check if the lazyfree thread has jobs in queue
     * and wait... */
    while(bioPendingJobsOfType(BIO_LAZY_FREE)) {
        if (((mem_reported - zmalloc_used_memory()) + mem_freed) >= mem_tofree)
            break;
        usleep(1000);
    }
    return C_ERR;
 


当需要进行内存释放的时候,需要用某种策略对保存的的对象进行删除。Redis有八种策略:

configEnum maxmemory_policy_enum[] = {
    {"volatile-lru", MAXMEMORY_VOLATILE_LRU},
    {"volatile-lfu", MAXMEMORY_VOLATILE_LFU},
    {"volatile-random",MAXMEMORY_VOLATILE_RANDOM},
    {"volatile-ttl",MAXMEMORY_VOLATILE_TTL},
    {"allkeys-lru",MAXMEMORY_ALLKEYS_LRU},
    {"allkeys-lfu",MAXMEMORY_ALLKEYS_LFU},
    {"allkeys-random",MAXMEMORY_ALLKEYS_RANDOM},
    {"noeviction",MAXMEMORY_NO_EVICTION},
    {NULL, 0}
};

# MAXMEMORY POLICY: how Redis will select what to remove when maxmemory
# is reached. You can select among five behaviors:
#
# volatile-lru -> Evict using approximated LRU among the keys with an expire set.
# allkeys-lru -> Evict any key using approximated LRU.
# volatile-lfu -> Evict using approximated LFU among the keys with an expire set.
# allkeys-lfu -> Evict any key using approximated LFU.
# volatile-random -> Remove a random key among the ones with an expire set.
# allkeys-random -> Remove a random key, any key.
# volatile-ttl -> Remove the key with the nearest expire time (minor TTL)
# noeviction -> Don't evict anything, just return an error on write operations.
#
# LRU means Least Recently Used
# LFU means Least Frequently Used
#
# Both LRU, LFU and volatile-ttl are implemented using approximated
# randomized algorithms.
#
# Note: with any of the above policies, Redis will return an error on write
#       operations, when there are no suitable keys for eviction.
#
#       At the date of writing these commands are: set setnx setex append
#       incr decr rpush lpush rpushx lpushx linsert lset rpoplpush sadd
#       sinter sinterstore sunion sunionstore sdiff sdiffstore zadd zincrby
#       zunionstore zinterstore hset hsetnx hmset hincrby incrby decrby
#       getset mset msetnx exec sort
#
# The default is:
#
# maxmemory-policy noeviction

  (1)volatile-lru:从已设置过期时间的数据集(server.db[i].expires)中挑选最近最少使用的数据淘汰

  (2)allkeys-lru:从数据集(server.db[i].dict)中挑选最近最少使用的数据淘汰

  (3)volatile-ttl:从已设置过期时间的数据集(server.db[i].expires)中挑选将要过期的数据淘汰
  (4)volatile-lfu:从已设置过期时间的数据集(server.db[i].expires)中挑选最近使用次数最少的数据淘汰

  (5) allkeys-lfu:从数据集(server.db[i].dict)中挑选最近使用次数最少的数据淘汰

/*如果是使用LRU算法则是采取局部的LRU算法,随机找到若干个键值删除其中的LRU算法选择的键*/
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
    server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
{
    struct evictionPoolEntry *pool = EvictionPoolLRU;

    while(bestkey == NULL) {
        unsigned long total_keys = 0, keys;

        /* We don't want to make local-db choices when expiring keys,
         * so to start populate the eviction pool sampling keys from
         * every DB. */
        for (i = 0; i < server.dbnum; i++) {
            db = server.db+i;
			/*其中要先判断是从过期及expires中删除键还是从所有数据集dict中删除键*/
            dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
                    db->dict : db->expires;
            if ((keys = dictSize(dict)) != 0) {
                evictionPoolPopulate(i, dict, db->dict, pool);
                total_keys += keys;
            }
        }
        if (!total_keys) break; /* No keys to evict. */

        /* Go backward from best to worst element to evict. */
        for (k = EVPOOL_SIZE-1; k >= 0; k--) {
            if (pool[k].key == NULL) continue;
            bestdbid = pool[k].dbid;
			/*其中要先判断是从过期及expires中删除键还是从所有数据集dict中删除键*/
            if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
                de = dictFind(server.db[pool[k].dbid].dict,
                    pool[k].key);
            } else {
                de = dictFind(server.db[pool[k].dbid].expires,
                    pool[k].key);
            }

            /* Remove the entry from the pool. */
            if (pool[k].key != pool[k].cached)
                sdsfree(pool[k].key);
            pool[k].key = NULL;
            pool[k].idle = 0;

            /* If the key exists, is our pick. Otherwise it is
             * a ghost and we need to try the next element. */
            if (de) {
                bestkey = dictGetKey(de);
                break;
            } else {
                /* Ghost... Iterate again. */
            }
        }
    }
}

  (6)volatile-random:从已设置过期时间的数据集(server.db[i].expires)中任意选择数据淘汰

  (7)allkeys-random:从数据集(server.db[i].dict)中任意选择数据淘汰

 

/* volatile-random and allkeys-random policy 如果使用随机算法,则从每一个db中挑选一个随机键进行删除*/
else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
         server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
{
    /* When evicting a random key, we try to evict a key for
     * each DB, so we use the static 'next_db' variable to
     * incrementally visit all DBs. */
    for (i = 0; i < server.dbnum; i++) {
        j = (++next_db) % server.dbnum;
        db = server.db+j;
       /*其中要先判断是从过期及expires中删除键还是从所有数据集dict中删除键*/
        dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
                db->dict : db->expires;
        if (dictSize(dict) != 0) {
            de = dictGetRandomKey(dict);
            bestkey = dictGetKey(de);
            bestdbid = j;
            break;
        }
    }
}

  

(8)no-enviction(驱逐):禁止驱逐数据[默认策略]

if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
    goto cant_free; /* We need to free memory, but policy forbids. */
 
cant_free:
    /* We are here if we are not able to reclaim memory. There is only one
     * last thing we can try: check if the lazyfree thread has jobs in queue
     * and wait... */
    while(bioPendingJobsOfType(BIO_LAZY_FREE)) {
        if (((mem_reported - zmalloc_used_memory()) + mem_freed) >= mem_tofree)
            break;
        usleep(1000);
    }
    return C_ERR;
 

2、过期键删除的策略

(1)惰性删除[被动删除]

             惰性删除由db.c/expireIfNeeded()函数实现,所有读写数据库的命令在执行之前都会调用expireIfNeeded()函数对要操作的key进行检查。如果key已经过期,那么将会将key从数据库中删除

redis过期删除机制(redis过期策略和删除策略)

/* This function is called when we are going to perform some operation
 * in a given key, but such key may be already logically expired even if
 * it still exists in the database. The main way this function is called
 * is via lookupKey*() family of functions.
 *
 * The behavior of the function depends on the replication role of the
 * instance, because slave instances do not expire keys, they wait
 * for DELs from the master for consistency matters. However even
 * slaves will try to have a coherent return value for the function,
 * so that read commands executed in the slave side will be able to
 * behave like if the key is expired even if still present (because the
 * master has yet to propagate the DEL).
 *
 * In masters as a side effect of finding a key which is expired, such
 * key will be evicted from the database. Also this may trigger the
 * propagation of a DEL/UNLINK command in AOF / replication stream.
 *
 * The return value of the function is 0 if the key is still valid,
 * otherwise the function returns 1 if the key is expired. */
int expireIfNeeded(redisDb *db, robj *key) {
    mstime_t when = getExpire(db,key);
    mstime_t now;

    if (when < 0) return 0; /* No expire for this key */

    /* Don't expire anything while loading. It will be done later. */
    if (server.loading) return 0;

    /* If we are in the context of a Lua script, we pretend that time is
     * blocked to when the Lua script started. This way a key can expire
     * only the first time it is accessed and not in the middle of the
     * script execution, making propagation to slaves / AOF consistent.
     * See issue #1525 on Github for more information. */
    now = server.lua_caller ? server.lua_time_start : mstime();

    /* If we are running in the context of a slave, return ASAP:
     * the slave key expiration is controlled by the master that will
     * send us synthesized DEL operations for expired keys.
     *
     * Still we try to return the right information to the caller,
     * that is, 0 if we think the key should be still valid, 1 if
     * we think the key is expired at this time. */
    if (server.masterhost != NULL) return now > when;

    /* Return when this key has not expired */
    if (now <= when) return 0;

    /* Delete the key */
    server.stat_expiredkeys++;
    propagateExpire(db,key,server.lazyfree_lazy_expire);
    notifyKeyspaceEvent(NOTIFY_EXPIRED,
        "expired",key,db->id);
    return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
                                         dbSyncDelete(db,key);
}
 

/* EXISTS key1 key2 ... key_N.
 * Return value is the number of keys existing. */
void existsCommand(client *c) {
    long long count = 0;
    int j;

    for (j = 1; j < c->argc; j++) {
        expireIfNeeded(c->db,c->argv[j]);
        if (dbExists(c->db,c->argv[j])) count++;
    }
    addReplyLongLong(c,count);
}

(2)定期删除[主动删除]

          定期删除由函数redis.c/activeExpireCycle()函数实现,每当server在调用beforeSleep()和serverCron()时,都会被调用 

       redis过期删除机制(redis过期策略和删除策略) 

  1. Redis配置项hz定义了serverCron任务的执行周期,默认为10,即CPU空闲时每秒执行10次;
  2. 每次过期key清理的时间不超过CPU时间的25%,即若hz=1,则一次清理时间最大为250ms,若hz=10,则一次清理时间最大为25ms;
  3. 清理时依次遍历所有的db;
  4. 从db中随机取20个key,判断是否过期,若过期,则逐出;
  5. 若有5个以上key过期,则重复步骤4,否则遍历下一个db;
  6. 在清理过程中,若达到了25%CPU时间,退出清理过程;


int main(int argc, char **argv) {
.....
aeSetBeforeSleepProc(server.el,beforeSleep);
......
}
 
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
    eventLoop->beforesleep = beforesleep;
}
 
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}
 
/* This function gets called every time Redis is entering the  每次事件循环执行的时候,逐出部分过期Key;
 * main loop of the event driven library, that is, before to sleep
 * for ready file descriptors. */
void beforeSleep(struct aeEventLoop *eventLoop) {
    UNUSED(eventLoop);

    /* Call the Redis Cluster before sleep function. Note that this function
     * may change the state of Redis Cluster (from ok to fail or vice versa),
     * so it's a good idea to call it before serving the unblocked clients
     * later in this function. */
    if (server.cluster_enabled) clusterBeforeSleep();

    /* Run a fast expire cycle (the called function will return
     * ASAP if a fast cycle is not needed). */
    if (server.active_expire_enabled && server.masterhost == NULL)
        activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);

    /* Send all the slaves an ACK request if at least one client blocked
     * during the previous event loop iteration. */
    if (server.get_ack_from_slaves) {
        robj *argv[3];

        argv[0] = createStringObject("REPLCONF",8);
        argv[1] = createStringObject("GETACK",6);
        argv[2] = createStringObject("*",1); /* Not used argument. */
        replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
        decrRefCount(argv[0]);
        decrRefCount(argv[1]);
        decrRefCount(argv[2]);
        server.get_ack_from_slaves = 0;
    }

    /* Unblock all the clients blocked for synchronous replication
     * in WAIT. */
    if (listLength(server.clients_waiting_acks))
        processClientsWaitingReplicas();

    /* Check if there are clients unblocked by modules that implement
     * blocking commands. */
    moduleHandleBlockedClients();

    /* Try to process pending commands for clients that were just unblocked. */
    if (listLength(server.unblocked_clients))
        processUnblockedClients();

    /* Write the AOF buffer on disk */
    flushAppendOnlyFile(0);

    /* Handle writes with pending output buffers. */
    handleClientsWithPendingWrites();

    /* Before we are going to sleep, let the threads access the dataset by
     * releasing the GIL. Redis main thread will not touch anything at this
     * time. */
    if (moduleCount()) moduleReleaseGIL();
}

/* This is our timer interrupt, called server.hz times per second. 一秒钟执行10次定期操作  CPU空闲时在定期serverCron任务中,逐出部分过期Key
* Here is where we do a number of things that need to be done asynchronously.
* For instance:
*
* - Active expired keys collection (it is also performed in a lazy way on
*   lookup).
* - Software watchdog.
* - Update some statistic.
* - Incremental rehashing of the DBs hash tables.
* - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
* - Clients timeout of different kinds.
* - Replication reconnection.
* - Many more...
*
* Everything directly called here will be called server.hz times per second,
* so in order to throttle execution of things we want to do less frequently
* a macro is used: run_with_period(milliseconds) { .... }
*/
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j;
UNUSED(eventLoop);
UNUSED(id);
UNUSED(clientData);
/* Software watchdog: deliver the SIGALRM that will reach the signal
* handler if we don't return here fast enough. */
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
/* Update the time cache. */
updateCachedTime();
run_with_period(100) {
trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
server.stat_net_input_bytes);
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
server.stat_net_output_bytes);
}
/* We have just LRU_BITS bits per object for LRU information.
* So we use an (eventually wrapping) LRU clock.
*
* Note that even if the counter wraps it's not a big problem,
* everything will still work but some object will appear younger
* to Redis. However for this to happen a given object should never be
* touched for all the time needed to the counter to wrap, which is
* not likely.
*
* Note that you can change the resolution altering the
* LRU_CLOCK_RESOLUTION define. */
unsigned long lruclock = getLRUClock();
atomicSet(server.lruclock,lruclock);
/* Record the max memory used since the server was started. */
if (zmalloc_used_memory() > server.stat_peak_memory)
server.stat_peak_memory = zmalloc_used_memory();
/* Sample the RSS here since this is a relatively slow call. */
server.resident_set_size = zmalloc_get_rss();
/* We received a SIGTERM, shutting down here in a safe way, as it is
* not ok doing so inside the signal handler. */
if (server.shutdown_asap) {
if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
server.shutdown_asap = 0;
}
/* Show some info about non-empty databases */
run_with_period(5000) {
for (j = 0; j < server.dbnum; j++) {
long long size, used, vkeys;
size = dictSlots(server.db[j].dict);
used = dictSize(server.db[j].dict);
vkeys = dictSize(server.db[j].expires);
if (used || vkeys) {
serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
/* dictPrintStats(server.dict); */
}
}
}
/* Show information about connected clients */
if (!server.sentinel_mode) {
run_with_period(5000) {
serverLog(LL_VERBOSE,
"%lu clients connected (%lu slaves), %zu bytes in use",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
zmalloc_used_memory());
}
}
/* We need to do a few operations on clients asynchronously. */
clientsCron();
/* Handle background operations on Redis databases. */
databasesCron();
/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}
/* Check if a background saving or AOF rewrite in progress terminated. */
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren())
{
int statloc;
pid_t pid;
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
if (pid == -1) {
serverLog(LL_WARNING,"wait3() returned an error: %s. "
"rdb_child_pid = %d, aof_child_pid = %d",
strerror(errno),
(int) server.rdb_child_pid,
(int) server.aof_child_pid);
} else if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else {
if (!ldbRemoveChild(pid)) {
serverLog(LL_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
}
updateDictResizePolicy();
closeChildInfoPipe();
}
} else {
/* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now. */
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
/* Save if we reached the given amount of changes,
* the given amount of seconds, and if the latest bgsave was
* successful or if, in case of an error, at least
* CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(server.rdb_filename,rsiptr);
break;
}
}
/* Trigger an AOF rewrite if needed. */
if (server.aof_state == AOF_ON &&
server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
}
/* AOF postponed flush: Try at every cron cycle if the slow fsync
* completed. */
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
/* AOF write errors: in this case we have a buffer to flush as well and
* clear the AOF error in case of success to make the DB writable again,
* however to try every second is enough in case of 'hz' is set to
* an higher frequency. */
run_with_period(1000) {
if (server.aof_last_write_status == C_ERR)
flushAppendOnlyFile(0);
}
/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue();
/* Clear the paused clients flag if needed. */
clientsArePaused(); /* Don't check return value, just use the side effect.*/
/* Replication cron function -- used to reconnect to master,
* detect transfer failures, start background RDB transfers and so forth. */
run_with_period(1000) replicationCron();
/* Run the Redis Cluster cron. */
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
}
/* Run the Sentinel timer if we are in sentinel mode. */
run_with_period(100) {
if (server.sentinel_mode) sentinelTimer();
}
/* Cleanup expired MIGRATE cached sockets. */
run_with_period(1000) {
migrateCloseTimedoutSockets();
}
/* Start a scheduled BGSAVE if the corresponding flag is set. This is
* useful when we are forced to postpone a BGSAVE because an AOF
* rewrite is in progress.
*
* Note: this code must be after the replicationCron() call above so
* make sure when refactoring this file to keep this order. This is useful
* because we want to give priority to RDB savings for replication. */
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.rdb_bgsave_scheduled &&
(server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK)
server.rdb_bgsave_scheduled = 0;
}
server.cronloops++;
return 1000/server.hz;
}
 
 
/* This function handles 'background' operations we are required to do
* incrementally in Redis databases, such as active key expiring, resizing,
* rehashing. */
void databasesCron(void) {
/* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */
if (server.active_expire_enabled && server.masterhost == NULL) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else if (server.masterhost != NULL) {
expireSlaveKeys();
}
/* Defrag keys gradually. */
if (server.active_defrag_enabled)
activeDefragCycle();
/* Perform hash tables rehashing if needed, but only if there are no
* other processes saving the DB on disk. Otherwise rehashing is bad
* as will cause a lot of copy-on-write of memory pages. */
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
/* We use global counters so if we stop the computation at a given
* DB we'll be able to start from the successive in the next
* cron loop iteration. */
static unsigned int resize_db = 0;
static unsigned int rehash_db = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
int j;
/* Don't test more DBs than we have. */
if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
/* Resize */
for (j = 0; j < dbs_per_call; j++) {
tryResizeHashTables(resize_db % server.dbnum);
resize_db++;
}
/* Rehash */
if (server.activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
int work_done = incrementallyRehash(rehash_db);
if (work_done) {
/* If the function did some work, stop here, we'll do
* more at the next cron loop. */
break;
} else {
/* If this db didn't need rehash, we'll try the next one. */
rehash_db++;
rehash_db %= server.dbnum;
}
}
}
}
}

/* Try to expire a few timed out keys. The algorithm used is adaptive and
* will use few CPU cycles if there are few expiring keys, otherwise
* it will get more aggressive to avoid that too much memory is used by
* keys that can be removed from the keyspace.
*
* No more than CRON_DBS_PER_CALL databases are tested at every
* iteration.
*
* This kind of call is used when Redis detects that timelimit_exit is
* true, so there is more work to do, and we do it more incrementally from
* the beforeSleep() function of the event loop.
*
* Expire cycle type:
*
* If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a
* "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION
* microseconds, and is not repeated again before the same amount of time.
*
* If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is
* executed, where the time limit is a percentage of the REDIS_HZ period
* as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. */
void activeExpireCycle(int type) {
/* This function has some global state in order to continue the work
* incrementally across calls. */
static unsigned int current_db = 0; /* Last DB tested. */
static int timelimit_exit = 0;      /* Time limit hit in previous call? */
static long long last_fast_cycle = 0; /* When last fast cycle ran. */
int j, iteration = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
long long start = ustime(), timelimit, elapsed;
/* When clients are paused the dataset should be static not just from the
* POV of clients not being able to write, but also from the POV of
* expires and evictions of keys not being performed. */
if (clientsArePaused()) return;
if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
/* Don't start a fast cycle if the previous cycle did not exit
* for time limt. Also don't repeat a fast cycle for the same period
* as the fast cycle total duration itself. */
if (!timelimit_exit) return;
if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return;
last_fast_cycle = start;
}
/* We usually should test CRON_DBS_PER_CALL per iteration, with
* two exceptions:
*
* 1) Don't test more DBs than we have.
* 2) If last time we hit the time limit, we want to scan all DBs
* in this iteration, as there is work to do in some DB and we don't want
* expired keys to use memory for too much time. */
if (dbs_per_call > server.dbnum || timelimit_exit)
dbs_per_call = server.dbnum;
/* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time
* per iteration. Since this function gets called with a frequency of
* server.hz times per second, the following is the max amount of
* microseconds we can spend in this function. */
timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
timelimit_exit = 0;
if (timelimit <= 0) timelimit = 1;
if (type == ACTIVE_EXPIRE_CYCLE_FAST)
timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */
/* Accumulate some global stats as we expire keys, to have some idea
* about the number of keys that are already logically expired, but still
* existing inside the database. */
long total_sampled = 0;
long total_expired = 0;
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
int expired;
redisDb *db = server.db+(current_db % server.dbnum);
/* Increment the DB now so we are sure if we run out of time
* in the current DB we'll restart from the next. This allows to
* distribute the time evenly across DBs. */
current_db++;
/* Continue to expire if at the end of the cycle more than 25%
* of the keys were expired. */
do {
unsigned long num, slots;
long long now, ttl_sum;
int ttl_samples;
iteration++;
/* If there is nothing to expire try next DB ASAP. */
if ((num = dictSize(db->expires)) == 0) {
db->avg_ttl = 0;
break;
}
slots = dictSlots(db->expires);
now = mstime();
/* When there are less than 1% filled slots getting random
* keys is expensive, so stop here waiting for better times...
* The dictionary will be resized asap. */
if (num && slots > DICT_HT_INITIAL_SIZE &&
(num*100/slots < 1)) break;
/* The main collection cycle. Sample random keys among keys
* with an expire set, checking for expired ones. */
expired = 0;
ttl_sum = 0;
ttl_samples = 0;
if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
while (num--) {
dictEntry *de;
long long ttl;
if ((de = dictGetRandomKey(db->expires)) == NULL) break;
ttl = dictGetSignedIntegerVal(de)-now;
if (activeExpireCycleTryExpire(db,de,now)) expired++;
if (ttl > 0) {
/* We want the average TTL of keys yet not expired. */
ttl_sum += ttl;
ttl_samples++;
}
total_sampled++;
}
total_expired += expired;
/* Update the average TTL stats for this database. */
if (ttl_samples) {
long long avg_ttl = ttl_sum/ttl_samples;
/* Do a simple running average with a few samples.
* We just use the current estimate with a weight of 2%
* and the previous estimate with a weight of 98%. */
if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
}
/* We can't block forever here even if there are many keys to
* expire. So after a given amount of milliseconds return to the
* caller waiting for the other active expire cycle. */
if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
elapsed = ustime()-start;
if (elapsed > timelimit) {
timelimit_exit = 1;
server.stat_expired_time_cap_reached_count++;
break;
}
}
/* We don't repeat the cycle if there are less than 25% of keys
* found expired in the current DB. */
} while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
}
elapsed = ustime()-start;
latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
/* Update our estimate of keys existing but yet to be expired.
* Running average with this sample accounting for 5%. */
double current_perc;
if (total_sampled) {
current_perc = (double)total_expired/total_sampled;
} else
current_perc = 0;
server.stat_expired_stale_perc = (current_perc*0.05)+
(server.stat_expired_stale_perc*0.95);
}



参考文献

https://yq.aliyun.com/articles/257459

https://blog.csdn.net/u012658346/article/details/51374858

https://blog.csdn.net/caishenfans/article/details/44902651

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/126685.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • 舵机的工作原理和控制[通俗易懂]

    舵机的工作原理和控制[通俗易懂]控制信号由接收机的通道进入信号调制芯片,获得直流偏置电压。它内部有一个基准电路,产生周期为20ms,宽度为1.5ms的基准信号,将获得的直流偏置电压与电位器的电压比较,获得电压差输出。最后,电压差的正负输出到电机驱动芯片决定电机的正反转。当电机转速一定时,通过级联减速齿轮带动电位器旋转,使得电压差为0,电机停止转动。舵机的控制一般需要一个20

  • java面试葵花宝典[通俗易懂]

    java面试葵花宝典[通俗易懂]15年毕业到现在也近四年了,最近面试了阿里集团(菜鸟网络,蚂蚁金服),网易,滴滴,点我达,最终收到点我达,网易offer,蚂蚁金服二面挂掉,菜鸟网络一个月了还在流程中,最终有幸去了网易。但是要特别感谢点我达的领导及HR,真的非常非常好,很感谢他们一直的关照和指导。回家后,我对这次面试经历,做了总结,希望对想要跳槽的朋友们有个借鉴。面试整体事项1.简历要准备好,联系方式一定要正确清晰醒目,…

  • c++ 函数指针

    c++ 函数指针函数指针基础:1.获取函数的地址2.声明一个函数指针3.使用函数指针来调用函数获取函数指针:函数的地址就是函数名,要将函数作为参数进行传递,必须传递函数名。声明函数指针声明指针时,必须指定指针指向的数据类型,同样,声明指向函数的指针时,必须指定指针指向的函数类型,这意味着声明应当指定函数的返回类型以及函数的参数列表。例如:doublecal(int);…

  • 怎么发现RAC环境中&#39;library cache pin&#39;等待事件的堵塞者(Blocker)?「建议收藏」

    怎么发现RAC环境中&#39;library cache pin&#39;等待事件的堵塞者(Blocker)?

  • 海量数据处理之Bloom Filter详解

    海量数据处理之Bloom Filter详解海量数据处理之BloomFilter详解 前言   本博客内曾已经整理过十道海量数据处理面试题与十个方法大总结。接下来,本博客内会重点分析那些海量数据处理的方法,并重写十道海量数据处理的面试题。如果有任何问题,欢迎不吝指正。谢谢。一、什么是BloomFilter   BloomFilter是一种空间效率很高的随机数据结构,它利用位数组很简洁地表示一个集合,并能判断一个元素是否属于这个集

  • 阿里核心系统团队博客在哪_阿里云是谁创始人

    阿里核心系统团队博客在哪_阿里云是谁创始人http://csrd.aliapp.com/阿里核心系统团队博客基础极致分享Home招聘信息阿里核心系统团队介绍TFS运维平台改造1604天bylinqinginTFSTFS负责运维的同学在工作过程中,积累了各种运维脚本

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号