【Redis源码剖析】 - Redis之事务的实现原理

原创作品,转载请标明:http://blog.csdn.net/Xiejingfa/article/details/51262268

今天为大家带来Redis中事务部分的源码分析。Redis的事务机制允许将多个命令当做一个独立的单元运行,主要包括multi、exec、watch、unwatch、discard五个相关命令。如果你还不熟悉这几个命令,可以先看看我的另一篇文章【Redis学习笔记(七)】 Redis中的事务

本文所讲述的内容主要涉及redis.h和multi.c两个源文件,依据惯例,文后会提供注释版的源码。


1、总流程

事务从开始到执行需要经历以下三个阶段:

  1. 声明事务 (multi命令)
  2. 命令入队
  3. 执行事务(exec命令)

对于一个拥有不同状态的对象,我们通常会使用状态机的手段加以管理。Redis也使用了类似的方法来实现对事务中不同状态的管理。

我们先来看看与事务有关的几个状态,这在Redis中又称作flag,定义在redis.h中。

#define REDIS_MULTI (1<<3)
#define REDIS_DIRTY_EXEC (1<<12)
#define REDIS_DIRTY_CAS (1<<5) 

下面具体介绍一下各个flag的含义:

  1. REDIS_MULTI表示客户端处于事务状态。当客户端执行multi命令后便由非事务状态转变为事务状态。在非事务状态下命令是一个接一个按序执行的;而当客户端处于事务状态时,命令则以事务为单位执行,一次性执行事务队列中的所有命令。
  2. REDIS_DIRTY_EXEC表示EXEC无效状态。当客户端进入事务状态后,Redis等待接收一个或多个命令,并把它们放入命令队列中等待执行。如果某条命令在入队过程中发生错误则进入该状态,此时Redis将客户端的flags标识字段置为REDIS_DIRTY_EXEC,随后的EXEC命令将会失败返回。
  3. REDIS_DIRTY_CAS表示非安全状态,该状态是针对watch命令设置的,客户端可以在声明事务前使用watch命令对一个或多个key进行监视,如果在事务执行之前这些被监视的key被其他命令修改,则进入REDIS_DIRTY_CAS状态。因为此时将要执行事务所相关的key被修改,无法保证事务的原子性。REDIS_DIRTY_CAS状态下如果执行exec命令也会失败返回,即相当于该事务被取消。

这里说明一下,上面的各个状态是我根据自己的理解定义的,便于理解事务执行流程,但可能有不规范之处。

所以Redis的事务整个流程大致是这样的:

  1. 客户端redisClient中有一个名叫flags的成员,标识当前客户端的状态。
  2. 在声明事务之前,我们可以通过watch命令对一个或多个key进行监视。如果在事务执行之前这些被监视的key被其他命令修改,Redis将redisClient->flags设置为REDIS_DIRTY_CAS标识。
  3. 使用multi命令可以标识着一个事务的开始,此时redisClient进入事务状态,其flags字段被设置为REDIS_MULTI标识。
  4. 当客户端进入事务状态后,Redis服务器等待接收一个或多个命令,并把它们放入命令队列中等待执行。如果某条命令在入队过程中发生错误,Redis会将redisClient的flags字段置为REDIS_DIRTY_EXEC标识。
  5. 最后我们通过exec命令执行事务,该命令将会检查redisClient的flags标识,如果该标识为REDIS_DIRTY_CAS或REDIS_DIRTY_EXEC,则事务执行失败,否则Redis一次性执行事务中的多个命令,并将所有命令的结果集合到回复队列,再作为 exec 命令的结果返回给客户端。

2、watch命令实现

与watch命令有关的关键数据结构主要有两个:

首先,每个redisDb数据库使用一个哈希表来维护key和所有监控该key的客户端列表的映射关系。这样当一个key被修改后,我们就可以对所有监控该key的客户端设置dirty标识。

redisDb结构定义在redis.h头文件中,这里省略其它无关代码。

/* Redis数据库结构体 */
typedef struct redisDb {
    ...
    // 被watch命令监控的键和相应client
    dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
    int id;
    ...
} redisDb;

watched_keys为字典(哈希表)结构,键为被监视的key,值为所有监控该key的客户端列表(即list数据结构)。正如下所示:

另外,每个客户端redisClient也维护着一个保存所有被监控的key的列表,这样就可以方便地对key取消监视。这个列表就是redisClient中的watched_keys成员,该成员是一个双向链表list结构。

在redisClient->watched_keys中使用watchedKey结构来标识一个Redis中的key,在watchedKey中不仅需要保存被监视的key,还需要记录该key所在的数据库。其定义如下:

typedef struct watchedKey {
    // 被监控的key
    robj *key;
    // key所在的数据库
    redisDb *db;
} watchedKey;

redisDb-> watched_keys和 redisClient->watched_keys两者的结合就实现了watch/unwatch命令所需要的功能:通过redisDb-> watched_keys 哈希表, 如果某个程序需要检查某个键是否被监视, 那么它只要检查字典中是否存在这个键即可; 通过redisClient->watched_keys,如果某个程序要获得该客户端监视的所有key,那么它只要获得该链表即可。

接下来,我们看看watch命令的具体实现,该功能由watchForKey函数完成。

void watchForKey(redisClient *c, robj *key) {
    list *clients = NULL;
    listIter li;
    listNode *ln;
    watchedKey *wk;

    /* Check if we are already watching for this key */
    // 检查该key是否已经保存在client->watched_keys列表中

    // listRewind获取list的迭代器
    listRewind(c->watched_keys,&li);
    // 遍历查找,如果发现给定key已经存在直接返回
    while((ln = listNext(&li))) {
        wk = listNodeValue(ln);
        if (wk->db == c->db && equalStringObjects(key,wk->key))
            return; /* Key already watched */
    }

    /* This key is not already watched in this DB. Let‘s add it */
    // 检查redisDB->watched_keys是否保存了该key和客户端的映射关系,如果没有则添加之
    // 获取监控给定key的客户端列表
    clients = dictFetchValue(c->db->watched_keys,key);
    // 如果该列表为空,则创建一个
    if (!clients) {
        clients = listCreate();
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    // 并加入当前客户端
    listAddNodeTail(clients,c);

    /* Add the new key to the list of keys watched by this client */
    // 将一个新的watchedKey结构添加到client->watched_keys列表中
    wk = zmalloc(sizeof(*wk));
    wk->key = key;
    wk->db = c->db;
    incrRefCount(key);
    listAddNodeTail(c->watched_keys,wk);
}

unwatch命令执行相反的操作,由unwatchAllKeys函数实现。

void unwatchAllKeys(redisClient *c) {
    listIter li;
    listNode *ln;

    // 如果没有key被监控,直接返回
    if (listLength(c->watched_keys) == 0) return;
    // 获得c->watched_keys列表的迭代器
    listRewind(c->watched_keys,&li);
    // 遍历c->watched_keys列表,逐一删除被该客户端监视的key
    while((ln = listNext(&li))) {
        list *clients;
        watchedKey *wk;

        /* Lookup the watched key -> clients list and remove the client
         * from the list */
        wk = listNodeValue(ln);
        // 将当前客户端从db->watched_keys中删除
        clients = dictFetchValue(wk->db->watched_keys, wk->key);
        redisAssertWithInfo(c,NULL,clients != NULL);
        listDelNode(clients,listSearchKey(clients,c));

        /* Kill the entry at all if this was the only client */
        // 如果没有任何客户端监控该key,则将该key从db->watched_keys中删除
        if (listLength(clients) == 0)
            dictDelete(wk->db->watched_keys, wk->key);

        /* Remove this watched key from the client->watched list */
        // 将c->watched_keys删除该key
        listDelNode(c->watched_keys,ln);

        // 释放资源
        decrRefCount(wk->key);
        zfree(wk);
    }
}

对于上面这两段代码,我已经详细注释过了,这里就不展开讲解。接下来我们看看multi/exec命令实现原理。

3、multi/exec命令实现

我们从 “声明事务 ” => “命令入队” => “执行事务”这三个阶段来分别介绍其原理。

3.1、声明事务

声明事务通过multi命令实现,从下面源码中我们可以看到:

  1. Redis不支持嵌套事务。
  2. 声明事务其实就是简单地将flags设置为REDIS_MULTI标识。随后redisClient进入事务状态,等待命令入队。
/* 执行MULTI命令 */
void multiCommand(redisClient *c) {
    // 不支持嵌套事务,否则直接报错
    if (c->flags & REDIS_MULTI) {
        addReplyError(c,"MULTI calls can not be nested");
        return;
    }
    // 设置事务标识
    c->flags |= REDIS_MULTI;
    addReply(c,shared.ok);
}

3.2、命令入队

这里我们先来介绍一下与命令队列相关的数据结构。

在redisClient中存在multiState mstate字段用来保存一个事务中的所有命令和其它相关信息,multiState结构定义在redis.h头文件中。

/* 事务状态结构体 */
typedef struct multiState {
    // 命令数组,保存着该事务中的所有命令并按输入顺序排列
    multiCmd *commands;     /* Array of MULTI commands */
    // 命令数组长度,即命令的数量
    int count;              /* Total number of MULTI commands */
    int minreplicas;        /* MINREPLICAS for synchronous replication */
    time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
} multiState;

multiState结构中的multiCmd *commands正是真正存放命令的命令队列,其实质是一个数组。multiCmd表示一条完整的输入命令,包含“要执行的命令”、“命令参数”、“参数个数”三个属性。定义如下:

/* 事务命令结构体 */
typedef struct multiCmd {
    // 命令参数
    robj **argv;
    // 参数个数
    int argc;
    // 要执行的命令
    struct redisCommand *cmd;
} multiCmd;

命令入队由queueMultiCommand函数实现。

/* 将一个新命令添加到multi命令队列中 */
void queueMultiCommand(redisClient *c) {
    multiCmd *mc;
    int j;

    // 在原commands后面配置空间以存放新命令
    c->mstate.commands = zrealloc(c->mstate.commands,
            sizeof(multiCmd)*(c->mstate.count+1));
    // 执行新配置的空间
    mc = c->mstate.commands+c->mstate.count;
    // 设置各个属性(命令、命令参数个数以及具体的命令参数)
    mc->cmd = c->cmd;
    mc->argc = c->argc;
    // 分配空间以存放命令参数
    mc->argv = zmalloc(sizeof(robj*)*c->argc);
    memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
    for (j = 0; j < c->argc; j++)
        incrRefCount(mc->argv[j]);
    // 命令队列中保存的命令个数加1
    c->mstate.count++;
}

3.3、执行事务

Redis通过exec命令执行事务,该命令将会检查redisClient的flags标识,如果该标识为REDIS_DIRTY_CAS或REDIS_DIRTY_EXEC,则事务执行失败返回。如果客户端仍然处于事务状态, 那么当 exec 命令执行时,Redis会根据客户端所保存的事务队列, 以“先近先出”的策略执行事务队列中的命令,即最先入队的命令最先执行, 而最后入队的命令最后执行。

exec命令由execCommand执行。

/* 执行exec命令 */
void execCommand(redisClient *c) {
    int j;
    robj **orig_argv;
    int orig_argc;
    struct redisCommand *orig_cmd;
    // 是否需要将MULTI/EXEC命令传播到slave节点/AOF
    int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */

    // 如果客户端当前不处于事务状态,直接返回
    if (!(c->flags & REDIS_MULTI)) {
        addReplyError(c,"EXEC without MULTI");
        return;
    }

    /* Check if we need to abort the EXEC because:
     * 1) Some WATCHed key was touched.
     * 2) There was a previous error while queueing commands.
     * A failed EXEC in the first case returns a multi bulk nil object
     * (technically it is not an error but a special behavior), while
     * in the second an EXECABORT error is returned. */
    // 检查是否需要中断事务执行,因为:
    // (1)、有被监控的key被修改
    // (2)、命令入队的时候发生错误
    //  对于第一种情况,Redis返回多个nil空对象(准确地说这种情况并不是错误,应视为一种特殊的行为)
    //  对于第二种情况则返回一个EXECABORT错误
    if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {
        addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :
                                                  shared.nullmultibulk);
        // 取消事务
        discardTransaction(c);
        goto handle_monitor;
    }

    /* Exec all the queued commands */
    // 现在可以执行该事务的所有命令了

    // 取消对所有key的监控,否则会浪费CPU资源
    unwatchAllKeys(c); /* Unwatch ASAP otherwise we‘ll waste CPU cycles */
    // 先备份一次命令队列中的命令
    orig_argv = c->argv;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
    addReplyMultiBulkLen(c,c->mstate.count);
    // 逐一将事务中的命令交给客户端redisClient执行
    for (j = 0; j < c->mstate.count; j++) {
        // 将事务命令队列中的命令设置给客户端
        c->argc = c->mstate.commands[j].argc;
        c->argv = c->mstate.commands[j].argv;
        c->cmd = c->mstate.commands[j].cmd;

        /* Propagate a MULTI request once we encounter the first write op.
         * This way we‘ll deliver the MULTI/..../EXEC block as a whole and
         * both the AOF and the replication link will have the same consistency
         * and atomicity guarantees. */
        //  当我们第一次遇到写命令时,传播MULTI命令。如果是读命令则无需传播
        //  这里我们MULTI/..../EXEC当做一个整体传输,保证服务器和AOF以及附属节点的一致性
        if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) {
            execCommandPropagateMulti(c);
            // 只需要传播一次MULTI命令即可
            must_propagate = 1;
        }

        // 真正执行命令
        call(c,REDIS_CALL_FULL);

        /* Commands may alter argc/argv, restore mstate. */
        // 命令执行后可能会被修改,需要更新操作
        c->mstate.commands[j].argc = c->argc;
        c->mstate.commands[j].argv = c->argv;
        c->mstate.commands[j].cmd = c->cmd;
    }
    // 恢复原命令
    c->argv = orig_argv;
    c->argc = orig_argc;
    c->cmd = orig_cmd;
    // 清除事务状态
    discardTransaction(c);
    /* Make sure the EXEC command will be propagated as well if MULTI
     * was already propagated. */
    if (must_propagate) server.dirty++;

handle_monitor:
    /* Send EXEC to clients waiting data from MONITOR. We do it here
     * since the natural order of commands execution is actually:
     * MUTLI, EXEC, ... commands inside transaction ...
     * Instead EXEC is flagged as REDIS_CMD_SKIP_MONITOR in the command
     * table, and we do it here with correct ordering. */
    if (listLength(server.monitors) && !server.loading)
        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}


Redis事务的实现原理就介绍这么多。很多人一听到“事务”这个词就会潜意识的认为这是一个很复杂的东西。而实际上Redis中使用很轻巧的办法提供事务操作,代码只有300来行,并不是很复杂。

注释版源码请移步:https://github.com/xiejingfa/the-annotated-redis-2.8.24

时间: 2024-11-08 23:43:20

【Redis源码剖析】 - Redis之事务的实现原理的相关文章

【Redis源码剖析】 - Redis之数据库redisDb

原创作品,转载请标明:http://blog.csdn.net/xiejingfa/article/details/51321282 今天,我们来讨论两点内容:一是Redis是如何存储类型对象的,二是Redis如何实现键的过期操作. 本文介绍的内容主要涉及db.c和redis.h两个文件. 1.redisDb介绍 Redis中存在"数据库"的概念,该结构由redis.h中的redisDb定义.我们知道Redis提供string.list.set.zset.hash五种数据类型的存储,在

【Redis源码剖析】 - Redis内置数据结构值压缩字典zipmap

原创作品,转载请标明:http://blog.csdn.net/Xiejingfa/article/details/51111230 今天为大家带来Redis中zipmap数据结构的分析,该结构定义在zipmap.h和zipmap.c文件中.我把zipmap称作"压缩字典"(不知道这样称呼正不正确)是因为zipmap利用字符串实现了一个简单的hash_table结构,又通过固定的字节表示节省空间.zipmap和前面介绍的ziplist结构十分类似,我们可以对比地进行学习: Redis中

Redis源码剖析(八)--对象系统

对象的类型与编码 在 Redis 中新创建一个键值对时, 我们至少会创建两个对象, 一个对象用作键值对的键(键对象), 另一个对象用作键值对的值(值对象).Redis 中的每个对象都由一个 redisObject 结构表示: typedef struct redisObject { // 类型 unsigned type:4; // 编码 unsigned encoding:4; // 对象最后一次被访问的时间 unsigned lru:REDIS_LRU_BITS; /* lru time (

Redis源码剖析和注释(十八)--- Redis AOF持久化机制

Redis AOF持久化机制 1. AOF持久化介绍 Redis中支持RDB和AOF这两种持久化机制,目的都是避免因进程退出,造成的数据丢失问题. RDB持久化:把当前进程数据生成时间点快照(point-in-time snapshot)保存到硬盘的过程,避免数据意外丢失. AOF持久化:以独立日志的方式记录每次写命令,重启时在重新执行AOF文件中的命令达到恢复数据的目的. Redis RDB持久化机制源码剖析和注释 AOF的使用:在redis.conf配置文件中,将appendonly设置为y

Redis源码剖析和注释(八)--- 对象系统(redisObject)

Redis 对象系统 1. 介绍 redis中基于双端链表.简单动态字符串(sds).字典.跳跃表.整数集合.压缩列表.快速列表等等数据结构实现了一个对象系统,并且实现了5种不同的对象,每种对象都使用了至少一种前面的数据结构,优化对象在不同场合下的使用效率. 双端链表源码剖析和注释 简单动态字符串(SDS)源码剖析和注释 字典结构源码剖析和注释 跳跃表源码剖析和注释 整数集合源码剖析和注释 压缩列表源码剖析和注释 快速列表源码剖析和注释 2. 对象的系统的实现 redis 3.2版本.所有注释在

spring源码剖析(六)AOP实现原理剖析

Spring的AOP实现原理,酝酿了一些日子,写博客之前信心不是很足,所以重新阅读了一边AOP的实现核心代码,而且又从网上找了一些Spring Aop剖析的例子,但是发现挂羊头买狗肉的太多,标题高大上,内容却大部分都是比较浅显的一些介绍,可能也是由于比较少人阅读这部分的核心代码逻辑把,然后写这部分介绍的人估计也是少之又少,不过说实话,Spring Aop的核心原理实现介绍确实不太好写,里面涉及的类之间的调用还是蛮多的,关系图画的太细的画也很难画,而且最重要的一点就是,如果对AOP的概念以及spr

【Redis源码剖析】 - Redis数据类型之列表List

原创作品,转载请标明:http://blog.csdn.net/Xiejingfa/article/details/51166709 今天为大家带来Redis五大数据类型之一 – List的源码分析. Redis中的List类型是一种双向链表结构,主要支持以下几种命令: lpush.rpush.lpushx.rpushx lpop.rpop.lrange.ltrim.lrem.rpoplpush linsert.llen.lindex.lset blpop.brpop.brpoplpush Li

【Redis源码剖析】 - Redis数据类型之有序集合zset

原创作品,转载请标明:http://blog.csdn.net/Xiejingfa/article/details/51231967 这周事情比较多,原本计划每周写两篇文章的任务看来是完不成了.今天为大家带来有序集合zset的源码分析. Redis中的zset主要支持以下命令: zadd.zincrby zrem.zremrangebyrank.zremrangebyscore.zremrangebyrank zrange.zrevrange.zrangebyscore.zrevrangebys

【Redis源码剖析】 - Redis持久化之RDB

原创作品,转载请标明:http://blog.csdn.net/xiejingfa/article/details/51553370 Redis是一个高效的内存数据库,所有的数据都存放在内存中.我们知道,内存中的信息会随着进程的退出或机器的宕机而消失.为此,Redis提供了两种持久化机制:RDB和AOF.这两种持久化方式的原理实际上就是把内存中所有数据的快照保存到磁盘文件上,以避免数据丢失. 今天我们主要来介绍一下RDB持久化机制RDB的实现原理,涉及的文件为rdb.h和rdb.c. RDB的主