Redis源码分析(二十七)--- rio系统I/O的封装


struct _rio {
    /* Backend functions.
     * Since this functions do not tolerate short writes or reads the return
     * value is simplified to: zero on error, non zero on complete success. */
    /* 数据流的读方法 */
    size_t (*read)(struct _rio *, void *buf, size_t len);
    /* 数据流的写方法 */
    size_t (*write)(struct _rio *, const void *buf, size_t len);
    /* 获取当前的读写偏移量 */
    off_t (*tell)(struct _rio *);
    /* The update_cksum method if not NULL is used to compute the checksum of
     * all the data that was read or written so far. The method should be
     * designed so that can be called with the current checksum, and the buf
     * and len fields pointing to the new block of data to add to the checksum
     * computation. */
    /* 当读入新的数据块的时候,会更新当前的校验和 */
    void (*update_cksum)(struct _rio *, const void *buf, size_t len);

    /* The current checksum */
    /* 当前的校验和 */
    uint64_t cksum;

    /* number of bytes read or written */
    /* 当前读取的或写入的字节大小 */
    size_t processed_bytes;

    /* maximum single read or write chunk size */
    /* 最大的单次读写的大小 */
    size_t max_processing_chunk;

    /* Backend-specific vars. */
    /* rio中I/O变量 */
    union {
        struct {
            sds ptr;
            off_t pos;
        } buffer;
        struct {
            FILE *fp;
            off_t buffered; /* Bytes written since last fsync. */
            off_t autosync; /* fsync after 'autosync' bytes written. */
        } file;
    } io;


/* The following functions are our interface with the stream. They'll call the
 * actual implementation of read / write / tell, and will update the checksum
 * if needed. */
/* rio的写方法 */
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
    while (len) {
        size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
        if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
        if (r->write(r,buf,bytes_to_write) == 0)
            return 0;
        buf = (char*)buf + bytes_to_write;
        len -= bytes_to_write;
        r->processed_bytes += bytes_to_write;
    return 1;

/* rio的读方法 */
static inline size_t rioRead(rio *r, void *buf, size_t len) {
    while (len) {
        size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
        if (r->read(r,buf,bytes_to_read) == 0)
            return 0;
        if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);
        buf = (char*)buf + bytes_to_read;
        len -= bytes_to_read;
        r->processed_bytes += bytes_to_read;
    return 1;

这里有一个比较不错的地方,每次当有数据发生改变的时候,Redis都会做一个计算校验和的处理算法,表明了数据操作的改变动作,用的算法就是之前介绍过CRC64算法,针对RIO的buffer IO和File IO,Redis定义了2个RIO结构体:

/* 根据上面描述的方法,定义了BufferRio */
static const rio rioBufferIO = {
    NULL,           /* update_checksum */
    0,              /* current checksum */
    0,              /* bytes read or written */
    0,              /* read/write chunk size */
    { { NULL, 0 } } /* union for io-specific vars */

/* 根据上面描述的方法,定义了FileRio */
static const rio rioFileIO = {
    NULL,           /* update_checksum */
    0,              /* current checksum */
    0,              /* bytes read or written */
    0,              /* read/write chunk size */
    { { NULL, 0 } } /* union for io-specific vars */


/* Returns 1 or 0 for success/failure. */
/* 读取rio中的buffer内容到传入的参数 */
static size_t rioBufferRead(rio *r, void *buf, size_t len) {
    if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len)
        return 0; /* not enough buffer to return len bytes. */
    r->io.buffer.pos += len;
    return 1;
/* Returns 1 or 0 for success/failure. */
/* 读取rio中的fp文件内容 */
static size_t rioFileRead(rio *r, void *buf, size_t len) {
    return fread(buf,len,1,r->io.file.fp);


/* rio写入不同类型数据方法,最终调用的是riowrite方法 */
size_t rioWriteBulkCount(rio *r, char prefix, int count);
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);
size_t rioWriteBulkLongLong(rio *r, long long l);
size_t rioWriteBulkDouble(rio *r, double d);


/* Write multi bulk count in the format: "*<count>\r\n". */
/* rio写入不同类型数据方法,调用的是riowrite方法 */
size_t rioWriteBulkCount(rio *r, char prefix, int count) {
    char cbuf[128];
    int clen;

    cbuf[0] = prefix;
    clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count);
    cbuf[clen++] = '\r';
    cbuf[clen++] = '\n';
    if (rioWrite(r,cbuf,clen) == 0) return 0;
    return clen;

调用的还是里面的rioWrite方法,根据你定义的是buffer IO还是File IO,.各自有各自不同的实现而已。在文件的write方法时,有一个细节,当你把内容读入到rio.file.buffer时,buffer超过给定的同步最小字节,你得必须将buffer内容刷新到文件中了。

/* Returns 1 or 0 for success/failure. */
/* 将buf写入rio中的file文件中 */
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
    size_t retval;

    retval = fwrite(buf,len,1,r->io.file.fp);
    r->io.file.buffered += len;

    if (r->io.file.autosync &&
        r->io.file.buffered >= r->io.file.autosync)
        r->io.file.buffered = 0;
    return retval;
时间: 2024-08-10 15:00:03

