Redis源码解析(十五)--- aof-append only file解析

继续学习redis源码下的Data数据相关文件的代码分析,今天我看的是一个叫aof的文件,这个字母是append ONLY file的简称,意味只进行追加文件操作。这里的文件追加记录时为了记录数据操作的改变记录,用以异常情况的数据恢复的。类似于之前我说的redo,undo日志的作用。我们都知道,redis作为一个内存数据库,数据的每次操作改变是先放在内存中,等到内存数据满了,在刷新到磁盘文件中,达到持久化的目的。所以aof的操作模式,也是采用了这样的方式。这里引入了一个block块的概念,其实就是一个缓冲区块。关于块的一些定义如下:

/* AOF的下面的一些代码都用到了一个简单buffer缓存块来进行存储,存储了数据的一些改变操作记录,等到
	缓冲中的达到一定的数据规模时,在持久化地写入到一个文件中,redis采用的方式是append追加的形式,这意味
	每次追加都要调整存储的块的大小,但是不可能会有无限大小的块空间,所以redis在这里引入了块列表的概念,
	设定死一个块的大小,超过单位块大小,存入另一个块中,这里定义每个块的大小为10M. */
#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10)    /* 10 MB per block */

/* 标准的aof文件读写块 */
typedef struct aofrwblock {
	//当前文件块被使用了多少,空闲的大小
    unsigned long used, free;
    //具体存储内容,大小10M
    char buf[AOF_RW_BUF_BLOCK_SIZE];
} aofrwblock;

也就是说,每个块的大小默认为10M,这个大小说大不大,说小不小了,如果填入的数据超出长度了,系统会动态申请一个新的缓冲块,在server端是通过一个块链表的形式,组织整个块的:

/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
/* 在缓冲区中追加数据,如果超出空间,会新申请一个缓冲块 */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
    listNode *ln = listLast(server.aof_rewrite_buf_blocks);
    //定位到缓冲区的最后一块,在最后一块的位置上进行追加写操作
    aofrwblock *block = ln ? ln->value : NULL;

    while(len) {
        /* If we already got at least an allocated block, try appending
         * at least some piece into it. */
        if (block) {
        	//如果当前的缓冲块的剩余空闲能支持len长度的内容时,直接写入
            unsigned long thislen = (block->free < len) ? block->free : len;
            if (thislen) {  /* The current block is not already full. */
                memcpy(block->buf+block->used, s, thislen);
                block->used += thislen;
                block->free -= thislen;
                s += thislen;
                len -= thislen;
            }
        }

        if (len) { /* First block to allocate, or need another block. */
            int numblocks;
			//如果不够的话,需要新创建,进行写操作
            block = zmalloc(sizeof(*block));
            block->free = AOF_RW_BUF_BLOCK_SIZE;
            block->used = 0;
            //还要把缓冲块追加到服务端的buffer列表中
            listAddNodeTail(server.aof_rewrite_buf_blocks,block);

            /* Log every time we cross more 10 or 100 blocks, respectively
             * as a notice or warning. */
            numblocks = listLength(server.aof_rewrite_buf_blocks);
            if (((numblocks+1) % 10) == 0) {
                int level = ((numblocks+1) % 100) == 0 ? REDIS_WARNING :
                                                         REDIS_NOTICE;
                redisLog(level,"Background AOF buffer size: %lu MB",
                    aofRewriteBufferSize()/(1024*1024));
            }
        }
    }
}

当想要主动的将缓冲区中的数据刷新到持久化到磁盘中时,调用下面的方法:

/* Write the append only file buffer on disk.
 *
 * Since we are required to write the AOF before replying to the client,
 * and the only way the client socket can get a write is entering when the
 * the event loop, we accumulate all the AOF writes in a memory
 * buffer and write it on disk using this function just before entering
 * the event loop again.
 *
 * About the 'force' argument:
 *
 * When the fsync policy is set to 'everysec' we may delay the flush if there
 * is still an fsync() going on in the background thread, since for instance
 * on Linux write(2) will be blocked by the background fsync anyway.
 * When this happens we remember that there is some aof buffer to be
 * flushed ASAP, and will try to do that in the serverCron() function.
 *
 * However if force is set to 1 we'll write regardless of the background
 * fsync. */
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
/* 刷新缓存区的内容到磁盘中 */
void flushAppendOnlyFile(int force) {
    ssize_t nwritten;
    int sync_in_progress = 0;
    mstime_t latency;

    if (sdslen(server.aof_buf) == 0) return;

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
        /* With this append fsync policy we do background fsyncing.
         * If the fsync is still in progress we can try to delay
         * the write for a couple of seconds. */
        if (sync_in_progress) {
            if (server.aof_flush_postponed_start == 0) {
                /* No previous write postponinig, remember that we are
                 * postponing the flush and return. */
                server.aof_flush_postponed_start = server.unixtime;
                return;
            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                /* We were already waiting for fsync to finish, but for less
                 * than two seconds this is still ok. Postpone again. */
                return;
            }
            /* Otherwise fall trough, and go write since we can't wait
             * over two seconds. */
            server.aof_delayed_fsync++;
            redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
        }
    }
    /* We want to perform a single write. This should be guaranteed atomic
     * at least if the filesystem we are writing is a real physical one.
     * While this will save us against the server being killed I don't think
     * there is much to do about the whole server stopping for power problems
     * or alike */

	//在进行写入操作的时候,还监听了延迟
    latencyStartMonitor(latency);
    nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
    latencyEndMonitor(latency);
    /* We want to capture different events for delayed writes:
     * when the delay happens with a pending fsync, or with a saving child
     * active, and when the above two conditions are missing.
     * We also use an additional event name to save all samples which is
     * useful for graphing / monitoring purposes. */
    if (sync_in_progress) {
        latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
    } else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) {
        latencyAddSampleIfNeeded("aof-write-active-child",latency);
    } else {
        latencyAddSampleIfNeeded("aof-write-alone",latency);
    }
    latencyAddSampleIfNeeded("aof-write",latency);

    /* We performed the write so reset the postponed flush sentinel to zero. */
    server.aof_flush_postponed_start = 0;

    if (nwritten != (signed)sdslen(server.aof_buf)) {
        static time_t last_write_error_log = 0;
        int can_log = 0;

        /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
        if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
            can_log = 1;
            last_write_error_log = server.unixtime;
        }

        /* Lof the AOF write error and record the error code. */
        if (nwritten == -1) {
            if (can_log) {
                redisLog(REDIS_WARNING,"Error writing to the AOF file: %s",
                    strerror(errno));
                server.aof_last_write_errno = errno;
            }
        } else {
            if (can_log) {
                redisLog(REDIS_WARNING,"Short write while writing to "
                                       "the AOF file: (nwritten=%lld, "
                                       "expected=%lld)",
                                       (long long)nwritten,
                                       (long long)sdslen(server.aof_buf));
            }

            if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
                if (can_log) {
                    redisLog(REDIS_WARNING, "Could not remove short write "
                             "from the append-only file.  Redis may refuse "
                             "to load the AOF the next time it starts.  "
                             "ftruncate: %s", strerror(errno));
                }
            } else {
                /* If the ftrunacate() succeeded we can set nwritten to
                 * -1 since there is no longer partial data into the AOF. */
                nwritten = -1;
            }
            server.aof_last_write_errno = ENOSPC;
        }

        /* Handle the AOF write error. */
        if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
            /* We can't recover when the fsync policy is ALWAYS since the
             * reply for the client is already in the output buffers, and we
             * have the contract with the user that on acknowledged write data
             * is synched on disk. */
            redisLog(REDIS_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
            exit(1);
        } else {
            /* Recover from failed write leaving data into the buffer. However
             * set an error to stop accepting writes as long as the error
             * condition is not cleared. */
            server.aof_last_write_status = REDIS_ERR;

            /* Trim the sds buffer if there was a partial write, and there
             * was no way to undo it with ftruncate(2). */
            if (nwritten > 0) {
                server.aof_current_size += nwritten;
                sdsrange(server.aof_buf,nwritten,-1);
            }
            return; /* We'll try again on the next call... */
        }
    } else {
        /* Successful write(2). If AOF was in error state, restore the
         * OK state and log the event. */
        if (server.aof_last_write_status == REDIS_ERR) {
            redisLog(REDIS_WARNING,
                "AOF write error looks solved, Redis can write again.");
            server.aof_last_write_status = REDIS_OK;
        }
    }
    server.aof_current_size += nwritten;

    /* Re-use AOF buffer when it is small enough. The maximum comes from the
     * arena size of 4k minus some overhead (but is otherwise arbitrary). */
    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
        sdsclear(server.aof_buf);
    } else {
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
    }

    /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
     * children doing I/O in the background. */
    if (server.aof_no_fsync_on_rewrite &&
        (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
            return;

    /* Perform the fsync if needed. */
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        /* aof_fsync is defined as fdatasync() for Linux in order to avoid
         * flushing metadata. */
        latencyStartMonitor(latency);
        aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-fsync-always",latency);
        server.aof_last_fsync = server.unixtime;
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.unixtime > server.aof_last_fsync)) {
        if (!sync_in_progress) aof_background_fsync(server.aof_fd);
        server.aof_last_fsync = server.unixtime;
    }
}

当然有操作会对数据库中的所有数据,做操作记录,便宜用此文件进行全盘恢复:

/* Write a sequence of commands able to fully rebuild the dataset into
 * "filename". Used both by REWRITEAOF and BGREWRITEAOF.
 *
 * In order to minimize the number of commands needed in the rewritten
 * log Redis uses variadic commands when possible, such as RPUSH, SADD
 * and ZADD. However at max REDIS_AOF_REWRITE_ITEMS_PER_CMD items per time
 * are inserted using a single command. */
/* 将数据库的内容按照键值,再次完全重写入文件中 */
int rewriteAppendOnlyFile(char *filename) {
    dictIterator *di = NULL;
    dictEntry *de;
    rio aof;
    FILE *fp;
    char tmpfile[256];
    int j;
    long long now = mstime();

    /* Note that we have to use a different temp name here compared to the
     * one used by rewriteAppendOnlyFileBackground() function. */
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
        return REDIS_ERR;
    }

    rioInitWithFile(&aof,fp);
    if (server.aof_rewrite_incremental_fsync)
        rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);
    for (j = 0; j < server.dbnum; j++) {
        char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
        di = dictGetSafeIterator(d);
        if (!di) {
            fclose(fp);
            return REDIS_ERR;
        }

        /* SELECT the new DB */
        if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
        if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;

        /* Iterate this DB writing every entry */
        //遍历数据库中的每条记录,进行日志记录
        while((de = dictNext(di)) != NULL) {
            sds keystr;
            robj key, *o;
            long long expiretime;

            keystr = dictGetKey(de);
            o = dictGetVal(de);
            initStaticStringObject(key,keystr);

            expiretime = getExpire(db,&key);

            /* If this key is already expired skip it */
            if (expiretime != -1 && expiretime < now) continue;

            /* Save the key and associated value */
            if (o->type == REDIS_STRING) {
                /* Emit a SET command */
                char cmd[]="*3\r\n$3\r\nSET\r\n";
                if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                /* Key and value */
                if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
                if (rioWriteBulkObject(&aof,o) == 0) goto werr;
            } else if (o->type == REDIS_LIST) {
                if (rewriteListObject(&aof,&key,o) == 0) goto werr;
            } else if (o->type == REDIS_SET) {
                if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
            } else if (o->type == REDIS_ZSET) {
                if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
            } else if (o->type == REDIS_HASH) {
                if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
            } else {
                redisPanic("Unknown object type");
            }
            /* Save the expire time */
            if (expiretime != -1) {
                char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
                if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
                if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
            }
        }
        dictReleaseIterator(di);
    }

    /* Make sure data will not remain on the OS's output buffers */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    if (rename(tmpfile,filename) == -1) {
        redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        return REDIS_ERR;
    }
    redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
    return REDIS_OK;

werr:
    fclose(fp);
    unlink(tmpfile);
    redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
    if (di) dictReleaseIterator(di);
    return REDIS_ERR;
}

系统同样开放了后台的此方法操作:

/* This is how rewriting of the append only file in background works:
 *
 * 1) The user calls BGREWRITEAOF
 * 2) Redis calls this function, that forks():
 *    2a) the child rewrite the append only file in a temp file.
 *    2b) the parent accumulates differences in server.aof_rewrite_buf.
 * 3) When the child finished '2a' exists.
 * 4) The parent will trap the exit code, if it's OK, will append the
 *    data accumulated into server.aof_rewrite_buf into the temp file, and
 *    finally will rename(2) the temp file in the actual file name.
 *    The the new file is reopened as the new append only file. Profit!
 */
/* 后台进行AOF数据文件写入操作 */
int rewriteAppendOnlyFileBackground(void)

原理就是和昨天分析的一样,用的是fork(),创建子线程,最后开放出API:

/* aof.c 中的API */
void aofRewriteBufferReset(void) /* 释放server中旧的buffer,并创建一份新的buffer */
unsigned long aofRewriteBufferSize(void) /* 返回当前AOF的buffer的总大小 */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) /* 在缓冲区中追加数据,如果超出空间,会新申请一个缓冲块 */
ssize_t aofRewriteBufferWrite(int fd) /* 将保存内存中的buffer内容写入到文件中,也是分块分块的写入 */
void aof_background_fsync(int fd) /* 开启后台线程进行文件同步操作 */
void stopAppendOnly(void) /* 停止追加数据操作,这里用的是一个命令模式 */
int startAppendOnly(void) /* 开启追加模式 */
void flushAppendOnlyFile(int force) /* 刷新缓存区的内容到磁盘中 */
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) /* 根据输入的字符串,进行参数包装,再次输出 */
sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, robj *seconds) /* 将过期等的命令都转化为PEXPIREAT命令,把时间转化为了绝对时间 */
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) /* 根据cmd的不同操作,进行命令的不同转化 */
struct redisClient *createFakeClient(void) /* 命令总是被客户端所执行的,因此要引入客户端的方法 */
void freeFakeClientArgv(struct redisClient *c) /* 释放客户端参数操作 */
void freeFakeClient(struct redisClient *c) /* 释放客户端参数操作 */
int loadAppendOnlyFile(char *filename) /* 加载AOF文件内容 */
int rioWriteBulkObject(rio *r, robj *obj) /* 写入bulk对象,分为LongLong对象,和普通的String对象 */
int rewriteListObject(rio *r, robj *key, robj *o) /* 写入List列表对象,分为ZIPLIST压缩列表和LINEDLIST普通链表操作 */
int rewriteSetObject(rio *r, robj *key, robj *o) /* 写入set对象数据 */
int rewriteSortedSetObject(rio *r, robj *key, robj *o) /* 写入排序好的set对象 */
static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) /* 写入哈希迭代器当前指向的对象 */
int rewriteHashObject(rio *r, robj *key, robj *o) /* 写入哈希字典对象 */
int rewriteAppendOnlyFile(char *filename) /* 将数据库的内容按照键值,再次完全重写入文件中 */
int rewriteAppendOnlyFileBackground(void) /* 后台进行AOF数据文件写入操作 */
void bgrewriteaofCommand(redisClient *c) /* 后台写AOF文件操作命令模式 */
void aofRemoveTempFile(pid_t childpid) /* 移除某次子线程ID为childpid所生产的aof文件 */
void aofUpdateCurrentSize(void) /* 更新当前aof文件的大小 */
void backgroundRewriteDoneHandler(int exitcode, int bysignal) /* 后台子线程写操作完成后的回调方法 */
时间: 2024-08-24 14:11:45

Redis源码解析(十五)--- aof-append only file解析的相关文章

jQuery 源码分析(十五) 数据操作模块 val详解

jQuery的属性操作模块总共有4个部分,本篇说一下最后一个部分:val值的操作,也是属性操作里最简单的吧,只有一个API,如下: val(vlaue)        ;获取匹配元素集合中第一个元素的当前值,或者设置匹配元素集合中每个元素的值,有三种用法: val()        ;没有参数,获取第一个匹配元素的当前值 val(value)      ;为每个匹配元素设置value值        ;如果为null则表示设置值为空 val(func)       ;设置每个匹配元素为函数fun

Redis源码分析(五)--- sparkline微线图

sparkline这个单词,我第一次看的时候,也不知道这什么意思啊,以前根本没听过啊,但是这真真实实的出现在了redis的代码中了,刚刚开始以为这也是属于普通的队列嘛,就把他分在了struct包里了.好来分析完了,与原本我所想的差太大了.sparkline英文中的意思"微线图",这么说吧,类似于折线图,由一个一个信息点构成.所以看到这个意思,你或许就明白了sparkline.c是干什么用的了吧,就是画图用的.我们看看这个画图的内部结构是什么,画图需要的元素是哪些: /* sparkli

做一个合格的程序猿之浅析Spring AOP源码(十五) 分析JdkDynamicAopProxy的invoke方法

上一节我们已经分析了Proxyfactorybean如何去生成一个目标对象的代理的,这一节我们将浅析一下基于JDK动态代理的核心回调方法invoke的源代码: 首先先打开JdkDynamicAopProxy.java 如下 JdkDynamicAopProxy.java文件是实现了AopProxy和InvocationHandler这2个接口的 先讲AopProxy这个接口,如图所示,AopProxy接口就定义了2个方法 我们再看这个接口的继承关系 好了,作为原生的基于JDK的动态代理的JdkD

SDWebImage源码阅读(十五)UIView+WebCacheOperation

这个分类主要用来对 UIView 的图像下载操作添加.取消和移除. .h 1 * Set the image load operation (storage in a UIView based dictionary) 2 * 3 * @param operation the operation 4 * @param key key for storing the operation 5 */ 6 - (void)sd_setImageLoadOperation:(nullable id)oper

Redis源码剖析和注释(十八)--- Redis AOF持久化机制

Redis AOF持久化机制 1. AOF持久化介绍 Redis中支持RDB和AOF这两种持久化机制,目的都是避免因进程退出,造成的数据丢失问题. RDB持久化:把当前进程数据生成时间点快照(point-in-time snapshot)保存到硬盘的过程,避免数据意外丢失. AOF持久化:以独立日志的方式记录每次写命令,重启时在重新执行AOF文件中的命令达到恢复数据的目的. Redis RDB持久化机制源码剖析和注释 AOF的使用:在redis.conf配置文件中,将appendonly设置为y

Redis源码解析:15Resis主从复制之从节点流程

Redis的主从复制功能,可以实现Redis实例的高可用,避免单个Redis 服务器的单点故障,并且可以实现负载均衡. 一:主从复制过程 Redis的复制功能分为同步(sync)和命令传播(commandpropagate)两个操作: 同步操作用于将从节点的数据库状态更新至主节点当前所处的数据库状态: 命令传播操作则用于在主节点的数据库状态被修改,导致主从节点的数据库状态不一致时,让主从节点的数据库重新回到一致状态: 1:同步 当客户端向从节点发送SLAYEOF命令,或者从节点的配置文件中配置了

Redis源码分析(一)--Redis结构解析

从今天起,本人将会展开对Redis源码的学习,Redis的代码规模比较小,非常适合学习,是一份非常不错的学习资料,数了一下大概100个文件左右的样子,用的是C语言写的.希望最终能把他啃完吧,C语言好久不用,快忘光了.分析源码的第一步,先别急着想着从哪开始看起,先浏览一下源码结构,可以模块式的渐入,不过比较坑爹的是,Redis的源码全部放在在里面的src目录里,一下90多个文件统统在里面了,所以我选择了拆分,按功能拆分,有些文件你看名字就知道那是干什么的.我拆分好后的而结果如下: 11个包,这样每

redis源码解析之事件驱动

Redis 内部有个小型的事件驱动,它主要处理两项任务: 文件事件:使用I/O多路复用技术处理多个客户端请求,并返回执行结果. 时间事件:维护服务器的资源管理,状态检查. 主要的数据结构包括文件事件结构体,时间事件结构体,触发事件结构体,事件循环结构体 /* File event structure */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE) */ aeFileProc *rfileProc

Redis源码解析——双向链表

相对于之前介绍的字典和SDS字符串库,Redis的双向链表库则是非常标准的.教科书般简单的库.但是作为Redis源码的一部分,我决定还是要讲一讲的.(转载请指明出于breaksoftware的csdn博客) 基本结构 首先我们看链表元素的结构.因为是双向链表,所以其基本元素应该有一个指向前一个节点的指针和一个指向后一个节点的指针,还有一个记录节点值的空间 typedef struct listNode { struct listNode *prev; struct listNode *next;