redis源码分析(5)——aof

前面几篇基本介绍了redis的主要功能、流程,接下来是一些相对独立的部分,首先看一下持久化。redis持久化支持两种方式:RDB和AOF,我们首先看一下AOF的实现。

AOF(Append only file)实际上就是redis的redo log,在重启后,对redo log进行replay以便恢复数据。正常情况下,为了保证一致性,对于每条命令都要保证其对应的log落地到磁盘。即每条命令对应的日志,要写到文件cache,然后再fsync落地磁盘,这样才能保证强一致性,只要写日志失败,此条命令便执行失败。但是,redis本身是基于内存的,同时为了速度,在一致性上进行了折衷。AOF的sync策略分为:

(1)always:对于每条命令都执行fsync,速度慢,但是安全,不会丢数据。

(2)every second:每秒钟执行一次sync,足够快,只会丢失1秒的数据。

(3)never:不进行fsync,完全由os实现数据刷到磁盘,最快,但不能保证数据安全。

随着时间的流逝,redis不断的服务请求,AOF会不断膨胀。一般的db的思路是:snapshot加redo log。定时进行快照,redo log记录当前时间点距上次快照的变化。在恢复时,先加载snapshot,然后再对redo log进行replay。而redis采取的方式略微不同,它会对AOF进行rewrite,就是根据当前状态,生成一份新的AOF,保证每个key只会有一份数据,减少不必要的日志。

写AOF的整体流程是:redis会持有一个aof buffer,这个buffer会记录还没有写到文件的aof日志,在每一轮事件循环,执行更新命令时,都会将命令序列化然后追加到aof buffer。然后,在下一轮事件循环前,调用beforeSleep函数时,会将aof buffer写入到文件。根据sync配置的策略,调用fsync或者调度一个后台job执行fsync。之所以在beforeSleep中执行,是因为写AOF要在响应内容发送到客户端之前,在下一次事件循环会执行写事件处理函数发送响应内容。

AOF rewrite的整体流程是:rewrite流程的启动是在serverCron中,会创建一个子进程,遍历所有db,写到AOF中。同时,开启一个aof rewrite buffer,在命令写入aof buffer时,会判断是否开启rewrite,如果开启,则会同时追加到aof rewrite buffer。在子进程完成rewrite后,会将aof rewrite buffer追加到AOF中,完成aof rewrite。

1. AOF加载

AOF加载的流程要简单些,在启动后,读取AOF,然后将每条命令进行replay即可。下面看一下具体代码。

在redis.c的main函数中,完成初始化后,会调用loadDataFromDisk()完成数据的加载。

/* Function called at startup to load RDB or AOF file in memory. */
void loadDataFromDisk(void) {
    long long start = ustime();
    if (server.aof_state == REDIS_AOF_ON) {
        if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
            redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
    } else {
        if (rdbLoad(server.rdb_filename) == REDIS_OK) {
            redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
                (float)(ustime()-start)/1000000);
        } else if (errno != ENOENT) {
            redisLog(REDIS_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
            exit(1);
        }
    }
}

根据当前的持久化方式,分别执行aof或者rdb的数据加载。

下面看一下aof加载数据的函数loadAppendOnlyFile,主要是构建一个fake的client,然后从aof文件中解析并执行一条条命令。

    struct redisClient *fakeClient;
    FILE *fp = fopen(filename,"r");
    struct redis_stat sb;
    int old_aof_state = server.aof_state;
    long loops = 0;
    off_t valid_up_to = 0; /* Offset of the latest well-formed command loaded. */

    if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
        server.aof_current_size = 0;
        fclose(fp);
        return REDIS_ERR;
    }

    if (fp == NULL) {
        redisLog(REDIS_WARNING,"Fatal error: can‘t open the append log file for reading: %s",strerror(errno));
        exit(1);
    }

    /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
     * to the same file we‘re about to read. */
    server.aof_state = REDIS_AOF_OFF;

打开aof文件并检查其大小。

    // <MM>
    // fakeClient对应的文件描述符为-1
    // 响应时,会据此判断是否需要发送响应内容
    // </MM>
    fakeClient = createFakeClient();
    startLoading(fp);

创建fake的client,对应的fd赋值为-1,在响应时,会判断如果fd不为-1,才会添加写事件处理函数。这里设为-1,避免产生响应内容。startLoading会设置状态信息,具体操作包括:

(1)将redisServer.loading置为1,表示当前正处于数据加载阶段。此时有客户端访问时,会根据loading状态返回“数据正在加载...”。

(2)将当前时间赋值给redisServer.loading_start_time,用以统计数据加载时间。

(3)将aof文件大小赋值给redisServer.loading_total_bytes,用以统计加载进度

接下来是一个while循环,不断的读取命令并执行。下面看一下循环内部。

        int argc, j;
        unsigned long len;
        robj **argv;
        char buf[128];
        sds argsds;
        struct redisCommand *cmd;

        /* Serve the clients from time to time */
        if (!(loops++ % 1000)) {
            loadingProgress(ftello(fp));
            // <MM>
            // 处理部分事件
            // 在启动后,加载aof时,此时监听socket已准备好
            // 调用此函数,可以处理客户端的连接,之后也可以响应客户端的请求
            // </MM>
            processEventsWhileBlocked();
        }

loops记录循环次数,在每执行1000次循环时,会更新一下加载进度。同时,由于加载过程一般比较长,所以此处会调用processEventsWhileBlocked函数,处理文件io事件,避免客户端一直阻塞。这个函数可以完成,客户端连接的建立,同时响应请求(数据正在加载,不完整,所以响应的内容都是返回错误,并提示“数据正在加载...”)。

接下来是读取aof文件并解析出命令。

        // <MM>
        // 读一行,遇到\n
        // </MM>
        if (fgets(buf,sizeof(buf),fp) == NULL) {
            // <MM>
            // 读到eof,加载完毕
            // </MM>
            if (feof(fp))
                break;
            else
                goto readerr;
        }
        // <MM>
        // 处理‘*MULTI_BULK_LEN\r\n‘
        // </MM>
        if (buf[0] != ‘*‘) goto fmterr;
        if (buf[1] == ‘\0‘) goto readerr;
        argc = atoi(buf+1);
        if (argc < 1) goto fmterr;

        argv = zmalloc(sizeof(robj*)*argc);
        fakeClient->argc = argc;
        fakeClient->argv = argv;

读取multi bulk的长度,接下来是一个for循环,一次读取每个bulk。

        // <MM>
        // 依次读取每个bulk
        // </MM>
        for (j = 0; j < argc; j++) {
            // <MM>
            // 处理‘$BULK_LEN\r\n‘
            // </MM>
            if (fgets(buf,sizeof(buf),fp) == NULL) {
                fakeClient->argc = j; /* Free up to j-1. */
                freeFakeClientArgv(fakeClient);
                goto readerr;
            }
            if (buf[0] != ‘$‘) goto fmterr;
            len = strtol(buf+1,NULL,10);
            // <MM>
            // 分配响应大小的buffer
            // </MM>
            argsds = sdsnewlen(NULL,len);
            // <MM>
            // 二进制读取len大小的buffer
            // </MM>
            if (len && fread(argsds,len,1,fp) == 0) {
                sdsfree(argsds);
                fakeClient->argc = j; /* Free up to j-1. */
                freeFakeClientArgv(fakeClient);
                goto readerr;
            }
            argv[j] = createObject(REDIS_STRING,argsds);
            // <MM>
            // 跳过\r\n
            // </MM>
            if (fread(buf,2,1,fp) == 0) {
                fakeClient->argc = j+1; /* Free up to j. */
                freeFakeClientArgv(fakeClient);
                goto readerr; /* discard CRLF */
            }
        }

依次读取每个bulk,解析出并赋值给fake client。

        /* Command lookup */
        cmd = lookupCommand(argv[0]->ptr);
        if (!cmd) {
            redisLog(REDIS_WARNING,"Unknown command ‘%s‘ reading the append only file", (char*)argv[0]->ptr);
            exit(1);
        }

        /* Run the command in the context of a fake client */
        // <MM>
        // 执行命令的处理函数
        // </MM>
        cmd->proc(fakeClient);

解析出完整命令后,需要执行该命令,首先根据命令名,查找对应的command结构,最后回调命令处理函数。

        /* The fake client should not have a reply */
        // <MM>
        // fake client对应的socket fd为负数
        // 准备响应的函数prepareClientToWrite会据此作判断,不返回响应内容
        // </MM>
        redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
        /* The fake client should never get blocked */
        redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0);

        /* Clean up. Command code may have changed argv/argc so we use the
         * argv/argc of the client instead of the local variables. */
        freeFakeClientArgv(fakeClient);
        if (server.aof_load_truncated) valid_up_to = ftello(fp);

此处进行校验,因为fake client不可能有响应内容,最后清理fake client,以便下一个命令的执行。valid_up_to记录当前正确解析的日志长度,在数据不完整(提前读到eof)并且设置aof_load_truncated时,会将aof文件截断到valid_up_to字节。

最后是各种处理分支:

loaded_ok: /* DB loaded, cleanup and return REDIS_OK to the caller. */
    fclose(fp);
    freeFakeClient(fakeClient);
    server.aof_state = old_aof_state;
    stopLoading();
    aofUpdateCurrentSize();
    server.aof_rewrite_base_size = server.aof_current_size;
    return REDIS_OK;

数据加载正确的情况,会关闭aof文件,释放fake client,更新各种状态等。

readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */
    if (!feof(fp)) {
        redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
        exit(1);
    }

命令解析失败,直接退出。

uxeof: /* Unexpected AOF end of file. */
    if (server.aof_load_truncated) {
        redisLog(REDIS_WARNING,"!!! Warning: short read while loading the AOF file !!!");
        redisLog(REDIS_WARNING,"!!! Truncating the AOF at offset %llu !!!",
            (unsigned long long) valid_up_to);
        if (valid_up_to == -1 || truncate(filename,valid_up_to) == -1) {
            if (valid_up_to == -1) {
                redisLog(REDIS_WARNING,"Last valid command offset is invalid");
            } else {
                redisLog(REDIS_WARNING,"Error truncating the AOF file: %s",
                    strerror(errno));
            }
        } else {
            /* Make sure the AOF file descriptor points to the end of the
             * file after the truncate call. */
            if (server.aof_fd != -1 && lseek(server.aof_fd,0,SEEK_END) == -1) {
                redisLog(REDIS_WARNING,"Can‘t seek the end of the AOF file: %s",
                    strerror(errno));
            } else {
                redisLog(REDIS_WARNING,
                    "AOF loaded anyway because aof-load-truncated is enabled");
                goto loaded_ok;
            }
        }
    }
    redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file. You can: 1) Make a backup of your AOF file, then use ./redis-check-aof --fix <filename>. 2) Alternatively you can set the ‘aof-load-truncated‘ configuration option to yes and restart the server.");
    exit(1);

读到非预期的eof,即最后一条命令不完整。如果设置了aof_load_truncated,会将aof文件截断到valid_up_to,否则,直接退出。

fmterr: /* Format error. */
    redisLog(REDIS_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
    exit(1);

最后是命令的格式不正确,直接退出。

2. AOF序列化

AOF要记录每条命令对数据库的更改,所以需要记录每条更新命令。redis会持有一个aof buffer,用于在一轮事件循环中,记录多天命令,然后在调用一次write进行写入,避免一个命令一次write,提高效率。序列化的流程很简单,对命令序列化,然后追加到aof buffer后面。

在介绍请求处理时,我们知道对于每条命令都会调用call函数处理。其中,会调用propagate函数处理主从复制和AOF。

void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & REDIS_PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

在redis开启aof,并且该命令需要记录aof时,会调用feedAppendOnlyFile函数用于生成并写入aof。下面看一下这个函数。

    sds buf = sdsempty();
    robj *tmpargv[3];

    /* The DB this command was targeting is not the same as the last command
     * we appendend. To issue a SELECT command is needed. */
    // <MM>
    // 当前操作的db与aof对应的db不同时,需要一个切换db的命令
    // </MM>
    if (dictid != server.aof_selected_db) {
        char seldb[64];

        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
        server.aof_selected_db = dictid;
    }

在全局server结构中得aof_selected_db记录当前aof对应的数据库,如果当前命令操作的数据库与之不同的话,首先需要切换数据库。上述代码就是用于生产select db命令的。

    // <MM>
    // 将命令序列化,并保存到buf
    // </MM>
    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand) {
        /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        /* Translate SETEX/PSETEX to SET and PEXPIREAT */
        tmpargv[0] = createStringObject("SET",3);
        tmpargv[1] = argv[1];
        tmpargv[2] = argv[3];
        buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
        decrRefCount(tmpargv[0]);
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else {
        /* All the other commands don‘t need translation or need the
         * same translation already operated in the command vector
         * for the replication itself. */
        buf = catAppendOnlyGenericCommand(buf,argc,argv);
    }

接下来,将命令序列化为aof,具体序列化过程再次不赘述。这里应该可以优化,在读取命令buffer时,保存此buffer,命令参数使用指针指向该buffer,便可以节省次数序列化的开销。

    // <MW>
    // 为什么不提前判断?这会浪费资源
    // </MW>
    /* Append to the AOF buffer. This will be flushed on disk just before
     * of re-entering the event loop, so before the client will get a
     * positive reply about the operation performed. */
    if (server.aof_state == REDIS_AOF_ON)
        // <MM>
        // 将命令buf追加到aof_buf
        // </MM>
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

判断是否开启aof,如果开启则将aof追加到aof buffer。此处,应该可以提前判断,避免关闭aof时的aof的序列化开销。

    /* If a background append only file rewriting is in progress we want to
     * accumulate the differences between the child DB and the current one
     * in a buffer, so that when the child process will do its work we
     * can append the differences to the new append only file. */
    // <MM>
    // 如果开启了aof rewrite进程,将命令也添加到aof rewrite buf中
    // 等rewrite完之后,在将rewrite buf的数据追加到文件中
    // </MM>
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));

    sdsfree(buf);

aof_child_pid记录aof rewrite进程的pid,如果rewrite正在进行,这个值不为-1。如果当前正在进行aof rewrite,则将命令的aof追加到aof rewrite buffer,待rewrite结束后进行replay。

3. AOF写入

AOF的写入就是将aof buffer写入到aof文件中,write系统调用只能保证写入page cache中,要落地到磁盘还需要调用fsync。所以,涉及到fsync的策略,这个函数会略微复杂一些。在beforeSleep函数中,会调用flushAppendOnlyFile函数进行写入。

    /* Write the AOF buffer on disk */
    flushAppendOnlyFile(0);

之所以,在beforeSleep中,是为了在给客户端发送响应内容前进行,保证返回给客户端的内容都是写过aof的。同时,也保证一轮事件循环,对于多个客户端的请求处理只写一次aof,提升性能(当然,这样做的缺点就是不能保证数据的一致性)。下面看一下flushAppendOnlyFile函数。

    ssize_t nwritten;
    int sync_in_progress = 0;
    mstime_t latency;

    // <MM>
    // 没有aof需要write,直接返回
    // </MM>
    if (sdslen(server.aof_buf) == 0) return;

检查aof buffer是否为空,空的话直接返回,没必要进行flush。

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;

fsync是阻塞操作,避免影响主线程的事件循环,fsync操作由后台线程完成。如果设置的fsync策略是everysec,获取是否有后台线程正在进行fsync。

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
        /* With this append fsync policy we do background fsyncing.
         * If the fsync is still in progress we can try to delay
         * the write for a couple of seconds. */
        if (sync_in_progress) {
            // <MM>
            // aof_flush_postponed_start记录从什么时候开始延迟flush
            // </MM>
            if (server.aof_flush_postponed_start == 0) {
                /* No previous write postponinig, remember that we are
                 * postponing the flush and return. */
                server.aof_flush_postponed_start = server.unixtime;
                return;
            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                // <MM>
                // 尚未flush的aof buf不超过1s,没有违反every_sec策略,此次也不进行flush
                // </MM>
                /* We were already waiting for fsync to finish, but for less
                 * than two seconds this is still ok. Postpone again. */
                return;
            }
            /* Otherwise fall trough, and go write since we can‘t wait
             * over two seconds. */
            server.aof_delayed_fsync++;
            redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
        }
    }

在fsync策略是everysec时,这段代码用于控制flush的频率。server.aof_flush_postponed_start记录上次延迟flush的时间戳,如果等于0,说明没有延迟。如果是everysec的fsync策略,并且当前正在进行fsync,这里会设置aof_flush_postponed_start。如果当前时间戳server.unixtime与延迟flush的时间戳间隔小于2s,那么没有违反everysec策略,不进行flush,直接返回。通过这段代码可以保证1秒内,不会flush多次。

如果没有当前没有进行fsync,或者当前时间戳server.unixtime与延迟flush的时间戳间隔大于2s,就会跳过这段代码,进行flush操作。

    latencyStartMonitor(latency);
    // <MM>
    // 将aof写入日志,此处只是写入page cache,还需要fsync
    // </MM>
    nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
    latencyEndMonitor(latency);
    /* We want to capture different events for delayed writes:
     * when the delay happens with a pending fsync, or with a saving child
     * active, and when the above two conditions are missing.
     * We also use an additional event name to save all samples which is
     * useful for graphing / monitoring purposes. */
    if (sync_in_progress) {
        latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
    } else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) {
        latencyAddSampleIfNeeded("aof-write-active-child",latency);
    } else {
        latencyAddSampleIfNeeded("aof-write-alone",latency);
    }
    latencyAddSampleIfNeeded("aof-write",latency);

接下来,调用write进行写入,同时会记录各种延迟。write时,是将整个aof_buf进行写入。这里可以看到,如果fsync的策略是everysec,那么write也是每秒钟调用一次。实际上,这存在一个缺陷:即在机器没有掉电的情况下,redis挂了,也会最多丢失1秒的数据。如果不限制每秒调用一次write,而是每轮事件循环都调用write,就可以保证数据已经写入page cache,只要机器没挂,最终数据都会写入磁盘,就不会丢失数据。本身write是写cache,不存在性能瓶颈,所以这里可以改进一下。

    /* We performed the write so reset the postponed flush sentinel to zero. */
    // <MM>
    // 清空,当前没有延迟flush aof
    // </MM>
    server.aof_flush_postponed_start = 0;

重置aof_flush_postponed_start,因为接下来会进行flush。

接下来,是对write调用进行错误检验。会有一个if分支进行

    if (nwritten != (signed)sdslen(server.aof_buf)) {
        // 错误分支
    } else {
        // 正常分支
    }

首先看一下错误分支。

        static time_t last_write_error_log = 0;
        int can_log = 0;

        /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
        // <MM>
        // 限制记录错误日志的频率
        // </MM>
        if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
            can_log = 1;
            last_write_error_log = server.unixtime;
        }

last_write_error_log是static类型,记录上一次记录错误日志的时间戳,这段代码就是用于控制记录日志的频率,避免日志刷屏。

        /* Lof the AOF write error and record the error code. */
        if (nwritten == -1) {
            if (can_log) {
                redisLog(REDIS_WARNING,"Error writing to the AOF file: %s",
                    strerror(errno));
                server.aof_last_write_errno = errno;
            }
        } else {

write返回值是-1,说明调用错误,只记录日志。接下来是处理部分写的情况。

        } else {
            if (can_log) {
                redisLog(REDIS_WARNING,"Short write while writing to "
                                       "the AOF file: (nwritten=%lld, "
                                       "expected=%lld)",
                                       (long long)nwritten,
                                       (long long)sdslen(server.aof_buf));
            }

            // <MM>
            // aof_current_size记录当前正确写入的aof的长度
            // 当前write只写入部分数据,此处保证完整性,将写入的部分数据删掉
            // </MM>
            if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
                if (can_log) {
                    redisLog(REDIS_WARNING, "Could not remove short write "
                             "from the append-only file.  Redis may refuse "
                             "to load the AOF the next time it starts.  "
                             "ftruncate: %s", strerror(errno));
                }
            } else {
                /* If the ftrunacate() succeeded we can set nwritten to
                 * -1 since there is no longer partial data into the AOF. */
                nwritten = -1;
            }
            server.aof_last_write_errno = ENOSPC;
        }

在部分写的情况发生时,会将部分写入的内容截掉,保证aof中的是完整的。server.aof_current_size记录当前正确写入的aof的长度,后面会对这个值进行更新。如果ftruncate成功,会设置nwritten为-1。如果失败的话,后面代码会将aof_current_size加上部分写的数据长度,同时将aof_buf中截取已写入部分。

接下来处理aof write失败。

        /* Handle the AOF write error. */
        if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
            /* We can‘t recover when the fsync policy is ALWAYS since the
             * reply for the client is already in the output buffers, and we
             * have the contract with the user that on acknowledged write data
             * is synched on disk. */
            redisLog(REDIS_WARNING,"Can‘t recover from AOF write error when the AOF fsync policy is ‘always‘. Exiting...");
            // <MM>
            // fsync策略是always,write失败后,不能恢复,直接退出
            // </MM>
            exit(1);
        } else {
            /* Recover from failed write leaving data into the buffer. However
             * set an error to stop accepting writes as long as the error
             * condition is not cleared. */
            server.aof_last_write_status = REDIS_ERR;

            /* Trim the sds buffer if there was a partial write, and there
             * was no way to undo it with ftruncate(2). */
            if (nwritten > 0) {
                // <MM>
                // 在ftruncate失败时,走这个分支,会把已写的aof从buffer中清空
                // </MM>
                server.aof_current_size += nwritten;
                sdsrange(server.aof_buf,nwritten,-1);
            }

            // <MM>
            // write失败,下次会进行重试
            // </MM>
            return; /* We‘ll try again on the next call... */
        }

如果fsync策略是always,那么write失败,就表示整个操作失败,保证强一致性,此处进程退出。如果是其他策略,会根据nwritten,更新aof_current_size并调整aof_buf。

上面就是write的错误分支,下面看一下正常分支。

        /* Successful write(2). If AOF was in error state, restore the
         * OK state and log the event. */
        if (server.aof_last_write_status == REDIS_ERR) {
            redisLog(REDIS_WARNING,
                "AOF write error looks solved, Redis can write again.");
            server.aof_last_write_status = REDIS_OK;
        }

如果fsync策略不是always,在write出错时,会有server.aof_last_write_status记录错误状态。如果后续的write操作正常,此处只是打印日志,表示错误恢复正常。

write调用的错误校验完成,接下来主要是后续的flush策略相关。

    // <MM>
    // write成功,更新aof文件的大小
    // </MM>
    server.aof_current_size += nwritten;

write成功时,更新aof_current_size。

    // <MM>
    // aof buf已成功write,此处序清空buffer
    // </MM>
    /* Re-use AOF buffer when it is small enough. The maximum comes from the
     * arena size of 4k minus some overhead (but is otherwise arbitrary). */
    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
        sdsclear(server.aof_buf);
    } else {
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
    }

aof_buf已成功写入文件,可以清空。为避免频繁分配、释放内存,此处保证在buf小于4K时,会一直重用该buf。如果大于4K,就会释放旧的buf,分配新的。

    /* Don‘t fsync if no-appendfsync-on-rewrite is set to yes and there are
     * children doing I/O in the background. */
    if (server.aof_no_fsync_on_rewrite &&
        (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
            return;

如果配置了no-appendfsync-on-rewrite,即在有aof rewrite或者是rdb save的子进程时不进行fsync,主要是避免对磁盘产生过大压力,这里会直接返回,不进行fsync。

    /* Perform the fsync if needed. */
    // <MM>
    // 1) always策略:每次write,都会调用fsync
    // 2) everysec策略:当大于上次fsync的时间(秒数)时,才会调度后台线程执行
    // </MM>
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        /* aof_fsync is defined as fdatasync() for Linux in order to avoid
         * flushing metadata. */
        latencyStartMonitor(latency);
        aof_fsync(server.aof_fd); /* Let‘s try to get this data on the disk */
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-fsync-always",latency);
        server.aof_last_fsync = server.unixtime;
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                // <MM>
                // 通过当前时间戳和上次aof sync的时间比较,
                // 只有比上次sync大时,才会启动后台sync操作
                // </MM>
                server.unixtime > server.aof_last_fsync)) {
        if (!sync_in_progress) aof_background_fsync(server.aof_fd);
        server.aof_last_fsync = server.unixtime;
    }

接下来,就是fsync相关。如果策略是always,直接进行fsync,记录延迟,同时更新aof_last_fsync。如果是everysec策略,并且server.unixtime > server.aof_last_fsync(保证一秒内不进行多次fsync),并且没有后台线程执行fsync,则调度后台线程进行fsync。

上面就是flush的全部流程。这个函数除了在beforeSleep中调用,在定时器事件处理函数serverCron中也会调用。

    /* AOF postponed flush: Try at every cron cycle if the slow fsync
     * completed. */
    // <MW>
    // 在有延迟flush aof的情况下,才会调用,主要是在fsync完成后
    // 尽快进行下一次write aof
    // 但是,serverCron执行后,立刻就会执行beforeSleep,有这个必要在这执行么?
    // </MW>
    if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);

在aof_flush_postponed_start不为0时调用,即存在延迟flush的情况。主要是保证fsync完成之后,可以快速的进入下一次flush。尽量保证fsync策略是everysec时,每秒都可以进行fsync,同时缩短两次fsync的间隔,减少影响。

    /* AOF write errors: in this case we have a buffer to flush as well and
     * clear the AOF error in case of success to make the DB writable again,
     * however to try every second is enough in case of ‘hz‘ is set to
     * an higher frequency. */
    run_with_period(1000) {
        if (server.aof_last_write_status == REDIS_ERR)
            flushAppendOnlyFile(0);
    }

还有一处调用,是保证aof出错时,尽快执行下一次flush,以便从错误恢复。

上面便是aof的序列化、写入以及sync的过程,rewrite放到下一篇再写。

时间: 2024-11-09 07:58:53

redis源码分析(5)——aof的相关文章

redis源码分析(四)--aof持久化

Redis aof持久化 Redis支持两种持久化方式:rdb与aof,上一篇文章中已经大致介绍了rdb的持久化实现,这篇文章主要介绍aof实现. 与rdb方式相比,aof会使用更多的存储空间,因为它将数据以客户端命令的形式进行存储,并使用ascii编码.但它也有相应的优点,如支持append的方式保存db内容的变动,不需要像rdb方式一样一旦内容有变动,便需要重新完整生成文件才能将变动保存到文件中:同时在子进程持久化的过程中,可以累积客户端的命令到缓存中,最后将缓存内容添加到持久化生成的文件的

redis源码分析之事务Transaction(下)

接着上一篇,这篇文章分析一下redis事务操作中multi,exec,discard三个核心命令. 原文地址:http://www.jianshu.com/p/e22615586595 看本篇文章前需要先对上面文章有所了解: redis源码分析之事务Transaction(上) 一.redis事务核心命令简介 redis事务操作核心命令: //用于开启事务 {"multi",multiCommand,1,"sF",0,NULL,0,0,0,0,0}, //用来执行事

redis源码分析4---结构体---跳跃表

redis源码分析4---结构体---跳跃表 跳跃表是一种有序的数据结构,他通过在每个节点中维持多个指向其他节点的指针,从而达到快速访问节点的目的: 跳跃表支持平均O(logN),最坏O(N)复杂度的节点查找,还可以通过顺序性操作来批量处理节点.性能上和平衡树媲美,因为事先简单,常用来代替平衡树. 在redis中,只在两个地方使用了跳跃表,一个是实现有序集合键,另一个是在集群节点中用作内部数据结构. 1 跳跃表节点 1.1 层 层的数量越多,访问其他节点的速度越快: 1.2 前进指针 遍历举例

redis源码分析3---结构体---字典

redis源码分析3---结构体---字典 字典,简单来说就是一种用于保存键值对的抽象数据结构: 注意,字典中每个键都是独一无二的:在redis中,内部的redis的数据库就是使用字典作为底层实现的: 1 字典的实现 在redis中,字典是使用哈希表作为底层实现的,一个hash表里面可以有多个hash表节点,而每个hash表节点就保存了字典中的一个键值对: hash表定义 table属性是一个数组,数组中的每个元素都是一个指向dictEntry结构的指针,每个dictEntry结构保存着一个键值

redis 源码分析(一) 内存管理

一,redis内存管理介绍 redis是一个基于内存的key-value的数据库,其内存管理是非常重要的,为了屏蔽不同平台之间的差异,以及统计内存占用量等,redis对内存分配函数进行了一层封装,程序中统一使用zmalloc,zfree一系列函数,其对应的源码在src/zmalloc.h和src/zmalloc.c两个文件中,源码点这里. 二,redis内存管理源码分析 redis封装是为了屏蔽底层平台的差异,同时方便自己实现相关的函数,我们可以通过src/zmalloc.h 文件中的相关宏定义

redis源码分析之内存布局

redis源码分析之内存布局 1. 介绍 众所周知,redis是一个开源.短小.高效的key-value存储系统,相对于memcached,redis能够支持更加丰富的数据结构,包括: 字符串(string) 哈希表(map) 列表(list) 集合(set) 有序集(zset) 主流的key-value存储系统,都是在系统内部维护一个hash表,因为对hash表的操作时间复杂度为O(1).如果数据增加以后,导致冲突严重,时间复杂度增加,则可以对hash表进行rehash,以此来保证操作的常量时

Redis源码分析(一)--Redis结构解析

从今天起,本人将会展开对Redis源码的学习,Redis的代码规模比较小,非常适合学习,是一份非常不错的学习资料,数了一下大概100个文件左右的样子,用的是C语言写的.希望最终能把他啃完吧,C语言好久不用,快忘光了.分析源码的第一步,先别急着想着从哪开始看起,先浏览一下源码结构,可以模块式的渐入,不过比较坑爹的是,Redis的源码全部放在在里面的src目录里,一下90多个文件统统在里面了,所以我选择了拆分,按功能拆分,有些文件你看名字就知道那是干什么的.我拆分好后的而结果如下: 11个包,这样每

redis源码分析(1)--makefile和目录结构分析

一.redis源码编译 redis可以直接在官网下载(本文使用版本 3.0.7):https://redis.io/download 安装: $ tar xzf redis-3.0.7.tar.gz $ cd redis-3.0.7 $ make make执行以后主要编译产物在src/redis-server src/redis-cli 如果想把redis-server直接install到可执行目录/usr/local/bin,还需要执行: $ make install Run Redis wi

Redis源码分析(四)-- sds字符串

今天分析的是Redis源码中的字符串操作类的代码实现.有了上几次的分析经验,渐渐觉得我得换一种分析的方法,如果每个API都进行代码分析,有些功能性的重复,导致分析效率的偏低.所以下面我觉得对于代码的分析偏重的是一种功能整体的思维实现来讲解,其中我也会挑出一个比较有特点的方法进行拆分了解,这也可以让我们见识一下里面的一些神奇的代码.好,回归正题,说到字符串,这不管放到哪个编程语言中,都是使用频率极高的操作类.什么new String, concat, strcopy,substr, splitSt

redis源码分析之事务Transaction(上)

这周学习了一下redis事务功能的实现原理,本来是想用一篇文章进行总结的,写完以后发现这块内容比较多,而且多个命令之间又互相依赖,放在一篇文章里一方面篇幅会比较大,另一方面文章组织结构会比较乱,不容易阅读.因此把事务这个模块整理成上下两篇文章进行总结. 原文地址:http://www.jianshu.com/p/acb97d620ad7 这篇文章我们重点分析一下redis事务命令中的两个辅助命令:watch跟unwatch. 一.redis事务辅助命令简介 依然从server.c文件的命令表中找