ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
      I/O操作对于每个系统来说都是必不可少的一部分。而且I/O操作的好坏,在一定程度上也会影响着系统的效率问题。今天我学习了一下在Redis中的I/O是怎么处理的,同样的,Redis在他自己的系统中,也封装了一个I/O层。简称RIO。得先看看RIO中有什么东西喽: ~~~ struct _rio { /* Backend functions. * Since this functions do not tolerate short writes or reads the return * value is simplified to: zero on error, non zero on complete success. */ /* 数据流的读方法 */ size_t (*read)(struct _rio *, void *buf, size_t len); /* 数据流的写方法 */ size_t (*write)(struct _rio *, const void *buf, size_t len); /* 获取当前的读写偏移量 */ off_t (*tell)(struct _rio *); /* The update_cksum method if not NULL is used to compute the checksum of * all the data that was read or written so far. The method should be * designed so that can be called with the current checksum, and the buf * and len fields pointing to the new block of data to add to the checksum * computation. */ /* 当读入新的数据块的时候,会更新当前的校验和 */ void (*update_cksum)(struct _rio *, const void *buf, size_t len); /* The current checksum */ /* 当前的校验和 */ uint64_t cksum; /* number of bytes read or written */ /* 当前读取的或写入的字节大小 */ size_t processed_bytes; /* maximum single read or write chunk size */ /* 最大的单次读写的大小 */ size_t max_processing_chunk; /* Backend-specific vars. */ /* rio中I/O变量 */ union { //buffer结构体 struct { //buffer具体内容 sds ptr; //偏移量 off_t pos; } buffer; //文件结构体 struct { FILE *fp; off_t buffered; /* Bytes written since last fsync. */ //同步的最小大小 off_t autosync; /* fsync after 'autosync' bytes written. */ } file; } io; }; ~~~ 里面除了3个必须的方法,read,write方法,还有获取偏移量的tell方法,还有2个结构体变量,一个buffer结构体,一个file结构体,作者针对不同的I/O情况,做了不同的处理,当执行临时的I/O操作时,都与rio.buffer打交道,当与文件进行I/O操作时,则执行与rio.file之间的操作。下面看看rio统一定义的读写方法: ~~~ /* The following functions are our interface with the stream. They'll call the * actual implementation of read / write / tell, and will update the checksum * if needed. */ /* rio的写方法 */ static inline size_t rioWrite(rio *r, const void *buf, size_t len) { while (len) { //判断当前操作字节长度是否超过最大长度 size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len; //写入新的数据时,更新校验和 if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write); //执行写方法 if (r->write(r,buf,bytes_to_write) == 0) return 0; buf = (char*)buf + bytes_to_write; len -= bytes_to_write; //操作字节数增加 r->processed_bytes += bytes_to_write; } return 1; } /* rio的读方法 */ static inline size_t rioRead(rio *r, void *buf, size_t len) { while (len) { //判断当前操作字节长度是否超过最大长度 size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len; //读数据方法 if (r->read(r,buf,bytes_to_read) == 0) return 0; //读数据时,更新校验和 if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read); buf = (char*)buf + bytes_to_read; len -= bytes_to_read; r->processed_bytes += bytes_to_read; } return 1; } ~~~ 这里有一个比较不错的地方,每次当有数据发生改变的时候,Redis都会做一个计算校验和的处理算法,表明了数据操作的改变动作,用的算法就是之前介绍过CRC64算法,针对RIO的buffer IO和File IO,Redis定义了2个RIO结构体: ~~~ /* 根据上面描述的方法,定义了BufferRio */ static const rio rioBufferIO = { rioBufferRead, rioBufferWrite, rioBufferTell, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ }; /* 根据上面描述的方法,定义了FileRio */ static const rio rioFileIO = { rioFileRead, rioFileWrite, rioFileTell, NULL, /* update_checksum */ 0, /* current checksum */ 0, /* bytes read or written */ 0, /* read/write chunk size */ { { NULL, 0 } } /* union for io-specific vars */ }; ~~~ 里面分别定义了相对应的读写方法,比如buffer的Read方法和File的Read方法: ~~~ /* Returns 1 or 0 for success/failure. */ /* 读取rio中的buffer内容到传入的参数 */ static size_t rioBufferRead(rio *r, void *buf, size_t len) { if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len) return 0; /* not enough buffer to return len bytes. */ memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len); r->io.buffer.pos += len; return 1; } ~~~ ~~~ /* Returns 1 or 0 for success/failure. */ /* 读取rio中的fp文件内容 */ static size_t rioFileRead(rio *r, void *buf, size_t len) { return fread(buf,len,1,r->io.file.fp); } ~~~ 作用的rio的对象变量不一样,最后在Redis的声明中给出了4种不同类型数据的写入方法: ~~~ /* rio写入不同类型数据方法,最终调用的是riowrite方法 */ size_t rioWriteBulkCount(rio *r, char prefix, int count); size_t rioWriteBulkString(rio *r, const char *buf, size_t len); size_t rioWriteBulkLongLong(rio *r, long long l); size_t rioWriteBulkDouble(rio *r, double d); ~~~ 举其中的一个方法实现: ~~~ /* Write multi bulk count in the format: "*<count>\r\n". */ /* rio写入不同类型数据方法,调用的是riowrite方法 */ size_t rioWriteBulkCount(rio *r, char prefix, int count) { char cbuf[128]; int clen; cbuf[0] = prefix; clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count); cbuf[clen++] = '\r'; cbuf[clen++] = '\n'; if (rioWrite(r,cbuf,clen) == 0) return 0; return clen; } ~~~ 调用的还是里面的rioWrite方法,根据你定义的是buffer IO还是File IO,.各自有各自不同的实现而已。在文件的write方法时,有一个细节,当你把内容读入到rio.file.buffer时,buffer超过给定的同步最小字节,你得必须将buffer内容刷新到文件中了。 ~~~ /* Returns 1 or 0 for success/failure. */ /* 将buf写入rio中的file文件中 */ static size_t rioFileWrite(rio *r, const void *buf, size_t len) { size_t retval; retval = fwrite(buf,len,1,r->io.file.fp); r->io.file.buffered += len; if (r->io.file.autosync && r->io.file.buffered >= r->io.file.autosync) { //判读是否需要同步 fflush(r->io.file.fp); aof_fsync(fileno(r->io.file.fp)); r->io.file.buffered = 0; } return retval; } ~~~