redis源码分析之事务Transaction(上)

2022-10-22,,,,

这周学习了一下redis事务功能的实现原理,本来是想用一篇文章进行总结的,写完以后发现这块内容比较多,而且多个命令之间又互相依赖,放在一篇文章里一方面篇幅会比较大,另一方面文章组织结构会比较乱,不容易阅读。因此把事务这个模块整理成上下两篇文章进行总结。

原文地址:http://www.jianshu.com/p/acb97d620ad7

这篇文章我们重点分析一下redis事务命令中的两个辅助命令:watch跟unwatch。

一、redis事务辅助命令简介

依然从server.c文件的命令表中找到相应的命令以及它们对应的处理函数。

//watch,unwatch两个命令我们把它们叫做redis事务辅助命令
{"watch",watchCommand,-2,"sF",0,NULL,1,-1,1,0,0},
{"unwatch",unwatchCommand,1,"sF",0,NULL,0,0,0,0,0},
    watch,用于客户端关注某个key,当这个key的值被修改时,整个事务就会执行失败(注:该命令需要在事务开启前使用)。
    unwatch,用于客户端取消已经watch的key。

用法举例如下:

clientA

127.0.0.1:6379> watch a
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set b b
QUEUED
//在执行前插入clientB的操作如下,事务就会执行失败
127.0.0.1:6379> exec
(nil)
127.0.0.1:6379>

clientB

127.0.0.1:6379> set a aa
OK
127.0.0.1:6379>

二、redis事务辅助命令源码分析

在看具体执行函数之前首先了解几个数据结构:

//每个客户端对象中有一个watched_keys链表来保存已经watch的key
typedef struct client {
list *watched_keys;
}
//上述链表中每个节点的数据结构
typedef struct watchedKey {
//watch的key
robj *key;
//指向的DB,后面细说
redisDb *db;
} watchedKey;

关于事务的几个命令所对应的函数都放在了multi.c文件中。

一起看下watch命令对应处理函数的源码:

void watchCommand(client *c) {
int j;
//如果客户端处于事务状态,则返回错误信息
//由此可以看出,watch必须在事务开启前使用
if (c->flags & CLIENT_MULTI) {
addReplyError(c,"WATCH inside MULTI is not allowed");
return;
}
//依次watch客户端的各个参数(这里说明watch命令可以一次watch多个key)
//注:0表示命令本身,所以参数从1开始
for (j = 1; j < c->argc; j++)
watchForKey(c,c->argv[j]);
//返回结果
addReply(c,shared.ok);
} //具体的watch操作,代码较长,慢慢分析
void watchForKey(client *c, robj *key) {
list *clients = NULL;
listIter li;
listNode *ln;
//上面已经提到了数据结构
watchedKey *wk; //首先判断key是否已经被客户端watch
//listRewind这个函数在发布订阅那篇文章里也有,就是把客户端的watched_keys赋值给li
listRewind(c->watched_keys,&li);
while((ln = listNext(&li))) {
wk = listNodeValue(ln);
//这里一个wk节点中有db,key两个字段
if (wk->db == c->db && equalStringObjects(key,wk->key))
return;
}
//开始watch指定key
//整个watch操作保存了两套数据结构,一套是在db->watched_keys中的字典结构,如下:
clients = dictFetchValue(c->db->watched_keys,key);
//如果是key第一次出现,则进行初始化
if (!clients) {
clients = listCreate();
dictAdd(c->db->watched_keys,key,clients);
incrRefCount(key);
}
//把当前客户端加到该key的watch链表中
listAddNodeTail(clients,c);
//另一套是在c->watched_keys中的链表结构:如下
wk = zmalloc(sizeof(*wk));
//初始化各个字段
wk->key = key;
wk->db = c->db;
incrRefCount(key);
//加入到链表最后
listAddNodeTail(c->watched_keys,wk);
}

整个watch的数据结构比较复杂,我这里画了一张图方便理解:

简单解释一下上面的图,首先redis把每个客户端连接包装成了一个client对象,上图中db,watch_keys就是其中的两个字段(client对象里面还有很多其他字段,包括上篇文章中提到的pub/sub)。

    db字段指向给该client对象分配的储存空间,db对象中也含有一个watched_keys字段,是字典类型(也就是哈希表),以想要watch的key做key,存储的链表则是所有watch该key的客户端。
    watch_keys字段则是一个链表类型,每个节点类型为watch_key,其中包含两个字段,key表示watch的key,db则指向了当前client对象的db字段,如上图。

看完watch命令的源码以后,再来看一下unwatch命令,如果搞明白了上面提到的两套数据结构,那么看unwatch的源码应该会比较容易,毕竟就是删除数据结构中对应的内容。

void unwatchCommand(client *c) {
//取消watch所有key
unwatchAllKeys(c);
//修改客户端状态
c->flags &= (~CLIENT_DIRTY_CAS);
addReply(c,shared.ok);
} //取消watch的key
void unwatchAllKeys(client *c) {
listIter li;
listNode *ln;
//如果客户端没有watch任何key,则直接返回
if (listLength(c->watched_keys) == 0) return;
//注意这里操作的是链表字段
listRewind(c->watched_keys,&li);
while((ln = listNext(&li))) {
list *clients;
watchedKey *wk;
//遍历取出该客户端watch的key
wk = listNodeValue(ln);
//取出所有watch了该key的客户端,这里则是字典(即哈希表)
clients = dictFetchValue(wk->db->watched_keys, wk->key);
//空指针判断
serverAssertWithInfo(c,NULL,clients != NULL);
//从watch列表中删除该客户端
listDelNode(clients,listSearchKey(clients,c));
//如果key只有一个当前客户端watch,则删除
if (listLength(clients) == 0)
dictDelete(wk->db->watched_keys, wk->key);
//从当前client的watch列表中删除该key
listDelNode(c->watched_keys,ln);
//减少引用数
decrRefCount(wk->key);
//释放内存
zfree(wk);
}
}

最后我们考虑一下watch机制的触发时机,现在我们已经把想要watch的key加入到了watch的数据结构中,可以想到触发watch的时机应该是修改key的内容时,通知到所有watch了该key的客户端。

感兴趣的用户可以任意选一个修改命令跟踪一下源码,例如set命令,我们发现所有对key进行修改的命令最后都会调用touchWatchedKey()函数,而该函数源码就位于multi.c文件中,该函数就是触发watch机制的关键函数,源码如下:

//这里入参db就是客户端对象中的db,上文已经提到,不赘述
void touchWatchedKey(redisDb *db, robj *key) {
list *clients;
listIter li;
listNode *ln;
//保存watchkey的字典为空,则返回
if (dictSize(db->watched_keys) == 0) return;
//注意这里操作的是字典(即哈希表)数据结构
clients = dictFetchValue(db->watched_keys, key);
//如果没有客户端watch该key,则返回
if (!clients) return;
//把client赋值给li
listRewind(clients,&li);
//遍历watch了该key的客户端,修改他们的状态
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags |= CLIENT_DIRTY_CAS;
}
}

跟我们猜测的一样,就是每当key的内容被修改时,则遍历所有watch了该key的客户端,设置相应的状态为CLIENT_DIRTY_CAS。

三、redis事务辅助命令总结

上面就是redis事务命令中watch,unwatch的实现原理,其中最复杂的应该就是watch对应的那两套数据结构了,跟之前的pub/sub类似,都是使用链表+哈希表的结构存储,另外也是通过修改客户端的状态位FLAG来通知客户端。

代码比较多,而且C++代码看上去会比较费劲,需要慢慢读,反复读。

redis源码分析之事务Transaction(上)的相关教程结束。

《redis源码分析之事务Transaction(上).doc》

下载本文的Word格式文档,以方便收藏与打印。