一、事务基础
1 redis事务介绍
事务能够将多个操作作为一个整体来执行,具备ACID四大特性。
- 原子性:redis主线程对字典空间进行操作,天生是原子的,不需要同步机制。
- 持久性:redis作为缓存是允许丢失数据的,我觉得不应该对持久性有过多的要求。另外redis也有rdb、aof来持久化数据。
- 一致性:redis并没有undo log,理论上事务执行一半就下线后是无法回滚的,需要通过redis-check-aof工具来检测,移除掉失败的事务命令。
- 隔离性:redis单线程处理client命令,算是可串行化读这个级别,事务不并发就不会破坏隔离性。
2 innodb事务介绍
事务并发破坏了事务的隔离性,根据破坏的程度分为四大隔离级别,每种级别对于写都是需要加写锁并在事务提交后释放,区别在于读的可见性不一样。
- 读未提交:对读没有约束,可以读到未提交的数据。
- 读已提交:每次读生成一个最新的read view,能够读到本事务执行期间提交的事务,与最开始读到的不一样,会有不可重复读现象。
- 可重复读:第一次读生成一个read view,后面的读以这个为主,并通过间隙锁来锁区间,阻止插入新的记录,解决幻读现象。 所有的写都是当前读,而读的可见性未必是当前最新的,也有可能是旧版本,需要考虑更新丢失/覆盖的现象。
二、监视器
redis事务是根据multi、exec、discard三个命令实现的,另外还需要关注watch、unwatch这个监视器,基于乐观锁的思想。redis事务期间是不能使用监视器的,待会儿源码中就能够看到。
1 redisDb
代码语言:c复制typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
clusterSlotToKeyMapping *slots_to_keys; /* Array of slots to keys. Only used in cluster mode (db 0). */
} redisDb;
struct dict {
dictType *type; //操作集
dictEntry **ht_table[2]; //数组 链表 渐进式哈希
unsigned long ht_used[2];
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
/* Keep small vars at end for optimal (minimal) struct padding */
int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
signed char ht_size_exp[2]; /* exponent of size. (size = 1<<exp) */
};
redisDb里面有多个字典空间,这里我们主要讲解watched key的dict。字典空间是一个哈希表,数组 链表的形式,扩容时采用渐进式哈希将代价分摊到每个请求上,对用户请求延迟没有太大影响,但却能够分割停顿时间,是一个比较好的思想,Golang的map也是基于这种思想实现的。
watched_keys dict保存的是key-clients的映射关系,clients就是监视这个key的所有客户端。
2 watch
代码语言:c复制/* In the client->watched_keys list we need to use watchedKey structures
* as in order to identify a key in Redis we need both the key name and the
* DB */
typedef struct watchedKey { robj *key;
redisDb *db;
} watchedKey;
/**
* watch命令
*/
void watchCommand(redisClient *c) {
int j;
//multi和watch只能选择一个
if(c->flags & REDIS_MULTI) {
addReplyError(c,"WATCH inside MULTI is not allowed");
return;
}
for(j =1; j < c->argc; j )
watchForKey(c,c->argv[j]);
addReply(c,shared.ok);
}
/**
* 监控指定的键
*/
void watchForKey(redisClient *c, robj *key) {
list *clients =NULL;
listIter li;
listNode *ln;
watchedKey *wk;
/* Check if we are already watching for this key */
// 检查指定的键是否已经被监控
listRewind(c->watched_keys,&li);
while((ln =listNext(&li))) {
wk =listNodeValue(ln);
if(wk->db == c->db &&equalStringObjects(key,wk->key))
return; /* Key already watched */
}
/* This key is not already watched in this DB. Let's add it */
// 指定的键没有被监控
clients =dictFetchValue(c->db->watched_keys,key);
if(!clients) {
clients =listCreate();
// 将key加入指定dict
dictAdd(c->db->watched_keys,key,clients);
incrRefCount(key);
}
listAddNodeTail(clients,c);
/* Add the new key to the list of keys watched by this client */
wk =zmalloc(sizeof(*wk));
wk->key = key;
wk->db = c->db;
incrRefCount(key);
// 将key加入指定的list
listAddNodeTail(c->watched_keys,wk);
}
当客户端监视key时就会将这个客户端加入到watched_keys中key的value链表中。
3 修改watched_key
代码语言:c复制void signalModifiedKey(redisDb *db, robj *key){
touchWatchedKey(db,key);
}
/* "Touch" a key, so that if this key is being WATCHed by some client the
* next EXEC will fail. */
void touchWatchedKey(redisDb *db, robj *key) {
list *clients;
listIter li;
listNode *ln;
// 判断该DB是否有监视的Key
if(dictSize(db->watched_keys) ==0) return;
clients =dictFetchValue(db->watched_keys, key);
if(!clients) return;
/* Mark all the clients watching this key as REDIS_DIRTY_CAS */
/* Check if we are already watching for this key */
listRewind(clients,&li);
// 迭代所有的有该key的client,并为它们的flags标志位增加REDIS_DIRTY_CAS
while((ln =listNext(&li))) {
redisClient *c =listNodeValue(ln);
c->flags |= REDIS_DIRTY_CAS;
}
}
修改watched_key后会从watched_keys这个字典空间找到所有监视这个key的client,修改client的flag标志为脏位,client真正执行时都会判断flag是否为脏,如果是就直接return了。
4 unwatch
代码语言:c复制/**
* unwatch命令
*/
void unwatchCommand(redisClient *c) {
unwatchAllKeys(c);
//复位,重置脏位
c->flags &=(~REDIS_DIRTY_CAS);
addReply(c,shared.ok);
}
/* Unwatch all the keys watched by this client. To clean the EXEC dirty
* flag is up to the caller. */
void unwatchAllKeys(redisClient *c) {
listIter li;
listNode *ln;
if(listLength(c->watched_keys) ==0) return;
listRewind(c->watched_keys,&li);
while((ln =listNext(&li))) {
list *clients;
watchedKey *wk;
/* Lookup the watched key -> clients list and remove the client
* from the list */
wk =listNodeValue(ln);
clients =dictFetchValue(wk->db->watched_keys, wk->key);
redisAssertWithInfo(c,NULL,clients !=NULL);
listDelNode(clients,listSearchKey(clients,c));
/* Kill the entry at all if this was the only client */
if(listLength(clients) ==0)
// 函数dict中的key
dictDelete(wk->db->watched_keys, wk->key);
/* Remove this watched key from the client->watched list */
// 删除list中的key
listDelNode(c->watched_keys,ln);
decrRefCount(wk->key);
zfree(wk);
}
}
touchWatchedKey修改key后会对watched_key的所有client设置脏位。所有set、delete等函数都会调用signalModifiedKey。
三、事务实现
主要讲解multi、exec、discard三个命令的实现。
1 multi
代码语言:c复制/**
* multi命令对应的源码
*/void multiCommand(redisClient *c) { // 判断是否嵌套执行multi
if(c->flags & REDIS_MULTI) {
addReplyError(c,"MULTI calls can not be nested");
return;
}
// 开启multi标志位
c->flags |= REDIS_MULTI;
addReply(c,shared.ok);
}
2 入队
redis事务的命令是先入队,等exec到了后一起执行的。
代码语言:c复制//将命令入队。
/* Add a new command into the MULTI commands queue */
void queueMultiCommand(client *c, uint64_t cmd_flags) {
multiCmd *mc;
/* No sense to waste memory if the transaction is already aborted.
* this is useful in case client sends these in a pipeline, or doesn't
* bother to read previous responses and didn't notice the multi was already
* aborted. */
if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))
return;
if (c->mstate.count == 0) {
/* If a client is using multi/exec, assuming it is used to execute at least
* two commands. Hence, creating by default size of 2. */
c->mstate.commands = zmalloc(sizeof(multiCmd)*2);
c->mstate.alloc_count = 2;
}
if (c->mstate.count == c->mstate.alloc_count) {
c->mstate.alloc_count = c->mstate.alloc_count < INT_MAX/2 ? c->mstate.alloc_count*2 : INT_MAX;
c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.alloc_count));
}
mc = c->mstate.commands c->mstate.count;
mc->cmd = c->cmd;
mc->argc = c->argc;
mc->argv = c->argv;
mc->argv_len = c->argv_len;
c->mstate.count ;
c->mstate.cmd_flags |= cmd_flags;
c->mstate.cmd_inv_flags |= ~cmd_flags;
c->mstate.argv_len_sums = c->argv_len_sum sizeof(robj*)*c->argc;
/* Reset the client's args since we copied them into the mstate and shouldn't
* reference them from c anymore. */
c->argv = NULL;
c->argc = 0;
c->argv_len_sum = 0;
c->argv_len = 0;
}
3 exec
代码语言:c复制/**
* exec命令对应的源码
*/
void execCommand(redisClient *c) {
……
// 判断是否开启事务
if(!(c->flags & REDIS_MULTI)) {
addReplyError(c,"EXEC without MULTI");
return;
}
……
// 判断是否有对应的watch,且watch的keys是否被修改
if(c->flags &(REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {
addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :
shared.nullmultibulk);
// 取消事务
discardTransaction(c);
goto handle_monitor;
}
// 执行事务前,将所有的wach的keys都unwatch掉
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
orig_argv = c->argv;
orig_argc = c->argc;
orig_cmd = c->cmd;
addReplyMultiBulkLen(c,c->mstate.count);
for(j =0; j < c->mstate.count; j ) {
……
// 执行事务
call(c,REDIS_CALL_FULL);
……
}
c->argv = orig_argv;
c->argc = orig_argc;
c->cmd = orig_cmd;
……
}
事务期间是不能够再watch的,一般是multi之前watch key来监视key的变化,然后exec时再判断这些watched keys来决定是否执行。在事务exec/discard后,这个client所有的watched key都会失效。
4 discard
代码语言:c复制//重置multi队列,释放所有监视的键
void discardTransaction(client *c) {
freeClientMultiState(c);
initClientMultiState(c);
c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
unwatchAllKeys(c);
}
四、小结
本节介绍redis事务,并简单讨论了下innodb的事务,对于事务的实现都是采用乐观锁/或者悲观锁来实现,乐观锁实现居多,大多是基于mvcc实现。