redis 的事务相关的几个命令分别为 watch multi exec.
watch 可以监控一个变量在事务开始执行之前是否有被修改.使用方式为: WATCH key [key ...]
在redis内部的实现是每个db有一个名为watched_keys的dict,这个dict的key为监控的key,value为所有监控该key的client组成的链表.
所有的涉及到修改数据库的命令执行成功之后都会执行signalModifiedKey->touchWatchedKey,将修改的db中的watched_keys里面记录的所有监控了本次修改的key的client设置c->flags |= CLIENT_DIRTY_CAS;
multi 表示开始一次事务,使用方式为: MULTI
实现为只是对client标记上 c->flags |= CLIENT_MULTI;它会先检查一下是否已经标记了 CLIENT_MULTI,如果标记了本次multi命令会失败,因为redis不允许事务嵌套.
在multi开始之后所有的命令(除了multi,exec,discard,watch)都会进入multicommand queue.
我们知道一条命令进入server的流程为,
第一步,底层的io多路复用处理器会上报client可读事件,调用注册的处理可读事件的函数readQueryFromClient将命令读入client的querybuf,
第二步,进入到processInputBuffer处理读入的buffer,分解出argv,argc;
第三步,再进入processCommand,先根据argv[1]在命令表的dict中找到对应的command,找到command后会检查本次的command是否合法(名字是否合法,参数个数是否合法),如果command不合法在返回错误的同时会检查是否处于事务状态中,如果命令
不合法CLIENT_DIRTY_EXEC标志将导致事务执行失败.
/* Flag the transacation as DIRTY_EXEC so that EXEC will fail. * Should be called every time there is an error while queueing a command. */ void flagTransaction(client *c) { if (c->flags & CLIENT_MULTI) c->flags |= CLIENT_DIRTY_EXEC; }
然后(略过一些与事务无关的步骤),检查c->flags & CLIENT_MULTI,来决定是否直接执行命令还是将命令进入queueMultiCommand
第四步,在queueMultiCommand中,将本次命令command,argc,argv加入到c->mstate.commands队列中.
/* Client MULTI/EXEC state */ typedef struct multiCmd { robj **argv; int argc; struct redisCommand *cmd; } multiCmd; typedef struct multiState { multiCmd *commands; /* Array of MULTI commands */ int count; /* Total number of MULTI commands */ int minreplicas; /* MINREPLICAS for synchronous replication */ time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */ } multiState; typedef struct client { ... multiState mstate; /* MULTI/EXEC state */ ... } client;
exec 命令开始执行事务队列中的全部命令,使用方式为:EXEC
exec会检查当前是否处于事务状态
if (!(c->flags & CLIENT_MULTI)) { addReplyError(c,"EXEC without MULTI"); return; }
然后检查该条命令来自的client监控的key是否有被touch
/* Check if we need to abort the EXEC because: * 1) Some WATCHed key was touched. * 2) There was a previous error while queueing commands. * A failed EXEC in the first case returns a multi bulk nil object * (technically it is not an error but a special behavior), while * in the second an EXECABORT error is returned. */ if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) { addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr : shared.nullmultibulk); discardTransaction(c); goto handle_monitor; }
最后是遍历c->mstate.commands执行所有事务命令队列中的命令
/* Exec all the queued commands */ unwatchAllKeys(c); /* Unwatch ASAP otherwise we‘ll waste CPU cycles */ orig_argv = c->argv; orig_argc = c->argc; orig_cmd = c->cmd; addReplyMultiBulkLen(c,c->mstate.count); for (j = 0; j < c->mstate.count; j++) { c->argc = c->mstate.commands[j].argc; c->argv = c->mstate.commands[j].argv; c->cmd = c->mstate.commands[j].cmd; /* Propagate a MULTI request once we encounter the first command which * is not readonly nor an administrative one. * This way we‘ll deliver the MULTI/..../EXEC block as a whole and * both the AOF and the replication link will have the same consistency * and atomicity guarantees. */ if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) { execCommandPropagateMulti(c); must_propagate = 1; } call(c,CMD_CALL_FULL); /* Commands may alter argc/argv, restore mstate. */ c->mstate.commands[j].argc = c->argc; c->mstate.commands[j].argv = c->argv; c->mstate.commands[j].cmd = c->cmd; } c->argv = orig_argv; c->argc = orig_argc; c->cmd = orig_cmd; discardTransaction(c);