redis事务源码分析

2022-11-07 00:56:02 浏览数 (2)

一、事务基础

1 redis事务介绍

事务能够将多个操作作为一个整体来执行,具备ACID四大特性。

  1. 原子性:redis主线程对字典空间进行操作,天生是原子的,不需要同步机制。
  2. 持久性:redis作为缓存是允许丢失数据的,我觉得不应该对持久性有过多的要求。另外redis也有rdb、aof来持久化数据。
  3. 一致性:redis并没有undo log,理论上事务执行一半就下线后是无法回滚的,需要通过redis-check-aof工具来检测,移除掉失败的事务命令。
  4. 隔离性:redis单线程处理client命令,算是可串行化读这个级别,事务不并发就不会破坏隔离性。

2 innodb事务介绍

事务并发破坏了事务的隔离性,根据破坏的程度分为四大隔离级别,每种级别对于写都是需要加写锁并在事务提交后释放,区别在于读的可见性不一样。

  1. 读未提交:对读没有约束,可以读到未提交的数据。
  2. 读已提交:每次读生成一个最新的read view,能够读到本事务执行期间提交的事务,与最开始读到的不一样,会有不可重复读现象。
  3. 可重复读:第一次读生成一个read view,后面的读以这个为主,并通过间隙锁来锁区间,阻止插入新的记录,解决幻读现象。 所有的写都是当前读,而读的可见性未必是当前最新的,也有可能是旧版本,需要考虑更新丢失/覆盖的现象。

二、监视器

redis事务是根据multi、exec、discard三个命令实现的,另外还需要关注watch、unwatch这个监视器,基于乐观锁的思想。redis事务期间是不能使用监视器的,待会儿源码中就能够看到。

1 redisDb

代码语言:c复制
typedef struct redisDb {
    dict *dict;                 /* The keyspace for this DB */
    dict *expires;              /* Timeout of keys with a timeout set */
    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP)*/
    dict *ready_keys;           /* Blocked keys that received a PUSH */
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    int id;                     /* Database ID */
    long long avg_ttl;          /* Average TTL, just for stats */
    unsigned long expires_cursor; /* Cursor of the active expire cycle. */
    list *defrag_later;         /* List of key names to attempt to defrag one by one, gradually. */
    clusterSlotToKeyMapping *slots_to_keys; /* Array of slots to keys. Only used in cluster mode (db 0). */
} redisDb;


struct dict {
    dictType *type; //操作集

    dictEntry **ht_table[2]; //数组 链表 渐进式哈希
    unsigned long ht_used[2];

    long rehashidx; /* rehashing not in progress if rehashidx == -1 */

    /* Keep small vars at end for optimal (minimal) struct padding */
    int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
    signed char ht_size_exp[2]; /* exponent of size. (size = 1<<exp) */
};

redisDb里面有多个字典空间,这里我们主要讲解watched key的dict。字典空间是一个哈希表,数组 链表的形式,扩容时采用渐进式哈希将代价分摊到每个请求上,对用户请求延迟没有太大影响,但却能够分割停顿时间,是一个比较好的思想,Golang的map也是基于这种思想实现的。

watched_keys dict保存的是key-clients的映射关系,clients就是监视这个key的所有客户端。

2 watch

代码语言:c复制
/* In the client->watched_keys list we need to use watchedKey structures
 * as in order to identify a key in Redis we need both the key name and the
 * DB */
typedef struct watchedKey {    robj *key;
    redisDb *db;
} watchedKey;
/**
 * watch命令
 */
void watchCommand(redisClient *c) {
    int j;
	//multi和watch只能选择一个
    if(c->flags & REDIS_MULTI) {
        addReplyError(c,"WATCH inside MULTI is not allowed");
        return;
    }
    for(j =1; j < c->argc; j  )
        watchForKey(c,c->argv[j]);
    addReply(c,shared.ok);
 }

/**
 * 监控指定的键
 */
void watchForKey(redisClient *c, robj *key) {    
    list *clients =NULL;
    listIter li;
    listNode *ln;
    watchedKey *wk;

    /* Check if we are already watching for this key */
    // 检查指定的键是否已经被监控
    listRewind(c->watched_keys,&li);
    while((ln =listNext(&li))) {
        wk =listNodeValue(ln);
        if(wk->db == c->db &&equalStringObjects(key,wk->key))
            return; /* Key already watched */
    }
    /* This key is not already watched in this DB. Let's add it */
    // 指定的键没有被监控
    clients =dictFetchValue(c->db->watched_keys,key);
    if(!clients) { 
        clients =listCreate();
        // 将key加入指定dict
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    listAddNodeTail(clients,c);
    /* Add the new key to the list of keys watched by this client */
    wk =zmalloc(sizeof(*wk));
    wk->key = key;
    wk->db = c->db;
    incrRefCount(key);
    // 将key加入指定的list
    listAddNodeTail(c->watched_keys,wk);
}

当客户端监视key时就会将这个客户端加入到watched_keys中key的value链表中。

3 修改watched_key

代码语言:c复制
void signalModifiedKey(redisDb *db, robj *key){
	touchWatchedKey(db,key);
}
/* "Touch" a key, so that if this key is being WATCHed by some client the
 * next EXEC will fail. */
void touchWatchedKey(redisDb *db, robj *key) {    
    list *clients;
    listIter li;
    listNode *ln;

    // 判断该DB是否有监视的Key
    if(dictSize(db->watched_keys) ==0) return;
    clients =dictFetchValue(db->watched_keys, key);
    if(!clients) return;

    /* Mark all the clients watching this key as REDIS_DIRTY_CAS */
    /* Check if we are already watching for this key */
    listRewind(clients,&li);

    // 迭代所有的有该key的client,并为它们的flags标志位增加REDIS_DIRTY_CAS
    while((ln =listNext(&li))) {
        redisClient *c =listNodeValue(ln);

        c->flags |= REDIS_DIRTY_CAS;
    }
}

修改watched_key后会从watched_keys这个字典空间找到所有监视这个key的client,修改client的flag标志为脏位,client真正执行时都会判断flag是否为脏,如果是就直接return了。

4 unwatch

代码语言:c复制
/**
 * unwatch命令
 */
void unwatchCommand(redisClient *c) {    
    unwatchAllKeys(c);
    //复位,重置脏位
    c->flags &=(~REDIS_DIRTY_CAS);
    addReply(c,shared.ok);
}
/* Unwatch all the keys watched by this client. To clean the EXEC dirty
 * flag is up to the caller. */
void unwatchAllKeys(redisClient *c) {    
    listIter li;
    listNode *ln;

    if(listLength(c->watched_keys) ==0) return;
    listRewind(c->watched_keys,&li);
    while((ln =listNext(&li))) {
        list *clients;
        watchedKey *wk;

        /* Lookup the watched key -> clients list and remove the client
         * from the list */
        wk =listNodeValue(ln);
        clients =dictFetchValue(wk->db->watched_keys, wk->key);
        redisAssertWithInfo(c,NULL,clients !=NULL);
        listDelNode(clients,listSearchKey(clients,c));
        /* Kill the entry at all if this was the only client */
        if(listLength(clients) ==0)
            // 函数dict中的key
            dictDelete(wk->db->watched_keys, wk->key);
        /* Remove this watched key from the client->watched list */
        // 删除list中的key
        listDelNode(c->watched_keys,ln);
        decrRefCount(wk->key);
        zfree(wk);
    }
}

touchWatchedKey修改key后会对watched_key的所有client设置脏位。所有set、delete等函数都会调用signalModifiedKey。

三、事务实现

主要讲解multi、exec、discard三个命令的实现。

image.pngimage.png

1 multi

代码语言:c复制
/**
 * multi命令对应的源码
 */void multiCommand(redisClient *c) {    // 判断是否嵌套执行multi
    if(c->flags & REDIS_MULTI) {
        addReplyError(c,"MULTI calls can not be nested");
        return;
    }

    // 开启multi标志位
    c->flags |= REDIS_MULTI;
    addReply(c,shared.ok);
 }

2 入队

redis事务的命令是先入队,等exec到了后一起执行的。

代码语言:c复制
//将命令入队。
/* Add a new command into the MULTI commands queue */
void queueMultiCommand(client *c, uint64_t cmd_flags) {
    multiCmd *mc;

    /* No sense to waste memory if the transaction is already aborted.
     * this is useful in case client sends these in a pipeline, or doesn't
     * bother to read previous responses and didn't notice the multi was already
     * aborted. */
    if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))
        return;
    if (c->mstate.count == 0) {
        /* If a client is using multi/exec, assuming it is used to execute at least
         * two commands. Hence, creating by default size of 2. */
        c->mstate.commands = zmalloc(sizeof(multiCmd)*2);
        c->mstate.alloc_count = 2;
    }
    if (c->mstate.count == c->mstate.alloc_count) {
        c->mstate.alloc_count = c->mstate.alloc_count < INT_MAX/2 ? c->mstate.alloc_count*2 : INT_MAX;
        c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.alloc_count));
    }
    mc = c->mstate.commands c->mstate.count;
    mc->cmd = c->cmd;
    mc->argc = c->argc;
    mc->argv = c->argv;
    mc->argv_len = c->argv_len;

    c->mstate.count  ;
    c->mstate.cmd_flags |= cmd_flags;
    c->mstate.cmd_inv_flags |= ~cmd_flags;
    c->mstate.argv_len_sums  = c->argv_len_sum   sizeof(robj*)*c->argc;

    /* Reset the client's args since we copied them into the mstate and shouldn't
     * reference them from c anymore. */
    c->argv = NULL;
    c->argc = 0;
    c->argv_len_sum = 0;
    c->argv_len = 0;
}

3 exec

代码语言:c复制
/**
 * exec命令对应的源码
 */ 
void execCommand(redisClient *c) {
    ……
    
    // 判断是否开启事务
    if(!(c->flags & REDIS_MULTI)) {
        addReplyError(c,"EXEC without MULTI");
        return;
    }
    
    ……
    
    // 判断是否有对应的watch,且watch的keys是否被修改
    if(c->flags &(REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {
        addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :
                                                  shared.nullmultibulk);
        
        // 取消事务
        discardTransaction(c);
        goto handle_monitor;
    }

    // 执行事务前,将所有的wach的keys都unwatch掉
    unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
    orig_argv = c->argv;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
    addReplyMultiBulkLen(c,c->mstate.count);
    for(j =0; j < c->mstate.count; j  ) {

        ……
        
        // 执行事务
        call(c,REDIS_CALL_FULL);

        ……
    }
    c->argv = orig_argv;
    c->argc = orig_argc;
    c->cmd = orig_cmd;
    
    ……
}

事务期间是不能够再watch的,一般是multi之前watch key来监视key的变化,然后exec时再判断这些watched keys来决定是否执行。在事务exec/discard后,这个client所有的watched key都会失效。

4 discard

代码语言:c复制
//重置multi队列,释放所有监视的键
void discardTransaction(client *c) {
    freeClientMultiState(c);
    initClientMultiState(c);
    c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
    unwatchAllKeys(c);
}

四、小结

本节介绍redis事务,并简单讨论了下innodb的事务,对于事务的实现都是采用乐观锁/或者悲观锁来实现,乐观锁实现居多,大多是基于mvcc实现。

0 人点赞