Redis源代码分析(二十七)--- rio制I/O包裹

I/O每个操作系统,它的一个组成部分。和I/O业务质量,在一定程度上也影响了系统的效率。

今天,我在了解了Redis中间I/O的,相同的,Redis在他自己的系统中。也封装了一个I/O层。简称RIO。得先看看RIO中有什么东西喽:

struct _rio {
    /* Backend functions.
     * Since this functions do not tolerate short writes or reads the return
     * value is simplified to: zero on error, non zero on complete success. */
    /* 数据流的读方法 */
    size_t (*read)(struct _rio *, void *buf, size_t len);
    /* 数据流的写方法 */
    size_t (*write)(struct _rio *, const void *buf, size_t len);
    /* 获取当前的读写偏移量 */
    off_t (*tell)(struct _rio *);
    /* The update_cksum method if not NULL is used to compute the checksum of
     * all the data that was read or written so far. The method should be
     * designed so that can be called with the current checksum, and the buf
     * and len fields pointing to the new block of data to add to the checksum
     * computation. */
    /* 当读入新的数据块的时候,会更新当前的校验和 */
    void (*update_cksum)(struct _rio *, const void *buf, size_t len);

    /* The current checksum */
    /* 当前的校验和 */
    uint64_t cksum;

    /* number of bytes read or written */
    /* 当前读取的或写入的字节大小 */
    size_t processed_bytes;

    /* maximum single read or write chunk size */
    /* 最大的单次读写的大小 */
    size_t max_processing_chunk;

    /* Backend-specific vars. */
    /* rio中I/O变量 */
    union {
    	//buffer结构体
        struct {
        	//buffer详细内容
            sds ptr;
            //偏移量
            off_t pos;
        } buffer;
        //文件结构体
        struct {
            FILE *fp;
            off_t buffered; /* Bytes written since last fsync. */
            //同步的最小大小
            off_t autosync; /* fsync after 'autosync' bytes written. */
        } file;
    } io;
};

里面除了3个必须的方法,read,write方法,还有获取偏移量的tell方法。还有2个结构体变量。一个buffer结构体,一个file结构体。作者针对不同的I/O情况。做了不同的处理。当运行暂时的I/O操作时,都与rio.buffer打交道,当与文件进行I/O操作时。则运行与rio.file之间的操作。以下看看rio统一定义的读写方法:

/* The following functions are our interface with the stream. They'll call the
 * actual implementation of read / write / tell, and will update the checksum
 * if needed. */
/* rio的写方法 */
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
    while (len) {
    	//推断当前操作字节长度是否超过最大长度
        size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
        //写入新的数据时,更新校验和
        if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
        //运行写方法
        if (r->write(r,buf,bytes_to_write) == 0)
            return 0;
        buf = (char*)buf + bytes_to_write;
        len -= bytes_to_write;
        //操作字节数添加
        r->processed_bytes += bytes_to_write;
    }
    return 1;
}

/* rio的读方法 */
static inline size_t rioRead(rio *r, void *buf, size_t len) {
    while (len) {
    	//推断当前操作字节长度是否超过最大长度
        size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ?

r->max_processing_chunk : len;
        //读数据方法
        if (r->read(r,buf,bytes_to_read) == 0)
            return 0;
        //读数据时,更新校验和
        if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);
        buf = (char*)buf + bytes_to_read;
        len -= bytes_to_read;
        r->processed_bytes += bytes_to_read;
    }
    return 1;
}

这里有一个比較不错的地方。每次当有数据发生改变的时候,Redis都会做一个计算校验和的处理算法,表明了数据操作的改变动作,用的算法就是之前介绍过CRC64算法,针对RIO的buffer IO和File IO,Redis定义了2个RIO结构体:

/* 依据上面描写叙述的方法,定义了BufferRio */
static const rio rioBufferIO = {
    rioBufferRead,
    rioBufferWrite,
    rioBufferTell,
    NULL,           /* update_checksum */
    0,              /* current checksum */
    0,              /* bytes read or written */
    0,              /* read/write chunk size */
    { { NULL, 0 } } /* union for io-specific vars */
};

/* 依据上面描写叙述的方法,定义了FileRio */
static const rio rioFileIO = {
    rioFileRead,
    rioFileWrite,
    rioFileTell,
    NULL,           /* update_checksum */
    0,              /* current checksum */
    0,              /* bytes read or written */
    0,              /* read/write chunk size */
    { { NULL, 0 } } /* union for io-specific vars */
};

里面分别定义了相相应的读写方法,比方buffer的Read方法和File的Read方法:

/* Returns 1 or 0 for success/failure. */
/* 读取rio中的buffer内容到传入的參数 */
static size_t rioBufferRead(rio *r, void *buf, size_t len) {
    if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len)
        return 0; /* not enough buffer to return len bytes. */
    memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len);
    r->io.buffer.pos += len;
    return 1;
}
/* Returns 1 or 0 for success/failure. */
/* 读取rio中的fp文件内容 */
static size_t rioFileRead(rio *r, void *buf, size_t len) {
    return fread(buf,len,1,r->io.file.fp);
}

作用的rio的对象变量不一样,最后在Redis的声明中给出了4种不同类型数据的写入方法:

/* rio写入不同类型数据方法。终于调用的是riowrite方法 */
size_t rioWriteBulkCount(rio *r, char prefix, int count);
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);
size_t rioWriteBulkLongLong(rio *r, long long l);
size_t rioWriteBulkDouble(rio *r, double d);

举当中的一个方法实现:

/* Write multi bulk count in the format: "*<count>\r\n". */
/* rio写入不同类型数据方法,调用的是riowrite方法 */
size_t rioWriteBulkCount(rio *r, char prefix, int count) {
    char cbuf[128];
    int clen;

    cbuf[0] = prefix;
    clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count);
    cbuf[clen++] = '\r';
    cbuf[clen++] = '\n';
    if (rioWrite(r,cbuf,clen) == 0) return 0;
    return clen;
}

调用的还是里面的rioWrite方法,依据你定义的是buffer IO还是File IO,.各自有各自不同的实现而已。

在文件的write方法时,有一个细节,当你把内容读入到rio.file.buffer时。buffer超过给定的同步最小字节,你得必须将buffer内容刷新到文件里了。

/* Returns 1 or 0 for success/failure. */
/* 将buf写入rio中的file文件里 */
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
    size_t retval;

    retval = fwrite(buf,len,1,r->io.file.fp);
    r->io.file.buffered += len;

    if (r->io.file.autosync &&
        r->io.file.buffered >= r->io.file.autosync)
    {
    	//判读是否须要同步
        fflush(r->io.file.fp);
        aof_fsync(fileno(r->io.file.fp));
        r->io.file.buffered = 0;
    }
    return retval;
}

版权声明:本文博主原创文章,博客,未经同意不得转载。

时间: 2024-10-14 01:12:21

Redis源代码分析(二十七)--- rio制I/O包裹的相关文章

Redis源代码分析(十七)--- multi事务操作

redis作为一非关系型数据库,居然相同拥有与RDBMS的事务操作,不免让我认为比較吃惊.在redis就专门有文件就是运行事务的相关操作的.也能够让我们领略一下.在Redis的代码中是怎样实现事务操作.首先亮出mulic.c以下的一些API. /* ================================ MULTI/EXEC ============================== */ void initClientMultiState(redisClient *c) /* 初始

redis 源代码分析(一) 内存管理

一,redis内存管理介绍 redis是一个基于内存的key-value的数据库,其内存管理是很重要的,为了屏蔽不同平台之间的差异,以及统计内存占用量等,redis对内存分配函数进行了一层封装,程序中统一使用zmalloc,zfree一系列函数,其相应的源代码在src/zmalloc.h和src/zmalloc.c两个文件里,源代码点这里. 二,redis内存管理源代码分析 redis封装是为了屏蔽底层平台的差异,同一时候方便自己实现相关的函数,我们能够通过src/zmalloc.h 文件里的相

Redis源代码分析(二十八)--- object创建和释放redisObject物

今天的学习更有效率.该Rio分析过,学习之间的另一种方式RedisObject文件,只想说RedisObject有些生成和转换.都是很类似的.列出里面长长的API列表: /* ------------ API --------------------- */ robj *createObject(int type, void *ptr) /* 最初的创建robj对象方法,后面的创建方法与此相似 */ robj *createStringObject(char *ptr, size_t len)

Redis源代码分析(二十四)--- tool工具类(2)

在上篇文章中初步的分析了一下,Redis工具类文件里的一些使用方法,包含2个随机算法和循环冗余校验算法,今天,继续学习Redis中的其它的一些辅助工具类的使用方法.包含里面的大小端转换算法,sha算法在Redis中的实现和通用工具类算法util.c. 先来看看大小端转换算法,大小端学习过操作系统的人一定知道是什么意思,在不同的操作系统中,高位数字的存储方式存在,高位在前,低位在后,或是高位在后,低位在前,所以这里面就涉及到转换,依据不同的操作系统,有不同的转换方式,所以Redis在这方面就开放了

Redis源代码分析(八)--- t_hash哈希转换

在上次的zipmap分析完之后,事实上关于redis源码结构体部分的内容事实上已经所有结束了.由于以下还有几个和结构体相关的操作类,就页把他们归并到struct包下了.这类的文件有:t_hash.c,z_list,z_set.c,t_string.c,t_zset.c,这些文件的功能事实上都差点儿相同,就是用来实现Client和Server之间的命令处理的操作类,通过robj的形式,把dict,ziplist等存入robj中,进行各个转换.实现命令操作.避开了结构体原先的复杂结构,相当于是封装了

Redis源代码分析(五)--- sparkline微线图

sparkline这个单词,我第一次看的时候.也不知道这什么意思啊,曾经根本没听过啊,可是这真真实实的出如今了redis的代码中了,刚刚開始以为这也是属于普通的队列嘛.就把他分在了struct包里了. 好来分析完了.与原本我所想的差太大了.sparkline英文中的意思"微线图",这么说吧.类似于折线图,由一个一个信息点构成.所以看到这个意思.你也许就明确了sparkline.c是干什么用的了吧,就是绘图用的. 我们看看这个绘图的内部结构是什么,绘图须要的元素是哪些: /* spark

【原创】kafka server源代码分析(二)

十四.AbstractFetcherManager.scala 该scala定义了两个case类和一个抽象类.两个case类很简单: 1. BrokerAndFectherId:封装了一个broker和一个fetcher的数据结构 2. BrokerAndInitialOffset:封装了broker和初始位移的一个数据结构 该scala中最核心的还是那个抽象类:AbstractFetcherManager.它维护了一个获取线程的map,主要保存broker id + fetcher id对应的

【原创】Kakfa utils源代码分析(二)

我们继续研究kafka.utils包 八.KafkaScheduler.scala 首先该文件定义了一个trait:Scheduler——它就是运行任务的一个调度器.任务调度的方式支持重复执行的后台任务或是一次性的延时任务.这个trait定义了三个抽象方法: 1. startup: 启动调度器,用于接收调度任务 2. shutdown: 关闭调度器.一旦关闭就不再执行调度任务了,即使是那些晚于关闭时刻的任务. 3. schedule: 调度一个任务的执行.方法接收4个参数 3.1 任务名称 3.

Redis源代码分析之sds, 动态数组

Redis是用C语言编写的.C语言处理字符串一向是个难点.很容易出现内存越界问题. 其它高级语言很容易实现的字符串拼接,在C这里却是百般艰难.因为需要实现计算出字符串所占内存的大小.即不能过大(浪费内存),也不能太小(越界).甚至在某个用C语言实现的项目中出现了这样的代码 即,先计算出字符串的大小.然后申请内存,再拼接字符串. 这样的操作几乎是无法忍受的.特别是当我们的字符串构成比较复杂,或者字符串经常需要发生变更时. redis的解决方案是使用了一个结构体,一组操作函数,将这个复杂的操作包装起