一、我们先来介绍一下init_pipe_fs。
static DECLARE_FSTYPE(pipe_fs_type, "pipefs", pipefs_read_super, FS_NOMOUNT|FS_SINGLE);
static int __init init_pipe_fs(void) { int err = register_filesystem(&pipe_fs_type); if (!err) { pipe_mnt = kern_mount(&pipe_fs_type); err = PTR_ERR(pipe_mnt); if (IS_ERR(pipe_mnt)) unregister_filesystem(&pipe_fs_type); else err = 0; } return err; }
struct vfsmount *kern_mount(struct file_system_type *type) { kdev_t dev = get_unnamed_dev(); struct super_block *sb; struct vfsmount *mnt; if (!dev) return ERR_PTR(-EMFILE); sb = read_super(dev, NULL, type, 0, NULL, 0); if (!sb) { put_unnamed_dev(dev); return ERR_PTR(-EINVAL); } mnt = add_vfsmnt(NULL, sb->s_root, NULL); if (!mnt) { kill_super(sb, 0); return ERR_PTR(-ENOMEM); } type->kern_mnt = mnt; return mnt; }
static struct super_block * read_super(kdev_t dev, struct block_device *bdev, struct file_system_type *type, int flags, void *data, int silent) { struct super_block * s; s = get_empty_super(); if (!s) goto out; s->s_dev = dev; s->s_bdev = bdev; s->s_flags = flags; s->s_dirt = 0; sema_init(&s->s_vfs_rename_sem,1); sema_init(&s->s_nfsd_free_path_sem,1); s->s_type = type; sema_init(&s->s_dquot.dqio_sem, 1); sema_init(&s->s_dquot.dqoff_sem, 1); s->s_dquot.flags = 0; lock_super(s); if (!type->read_super(s, data, silent))//指向pipefs_read_super goto out_fail; unlock_super(s); /* tell bdcache that we are going to keep this one */ if (bdev) atomic_inc(&bdev->bd_count); out: return s; out_fail: s->s_dev = 0; s->s_bdev = 0; s->s_type = NULL; unlock_super(s); return NULL; }
type->read_super指向了pipefs_read_super,代码如下:
static struct super_block * pipefs_read_super(struct super_block *sb, void *data, int silent) { struct inode *root = new_inode(sb);//分配根节点的inode结构 if (!root) return NULL; root->i_mode = S_IFDIR | S_IRUSR | S_IWUSR; root->i_uid = root->i_gid = 0; root->i_atime = root->i_mtime = root->i_ctime = CURRENT_TIME; sb->s_blocksize = 1024; sb->s_blocksize_bits = 10; sb->s_magic = PIPEFS_MAGIC; sb->s_op = &pipefs_ops; sb->s_root = d_alloc(NULL, &(const struct qstr) { "pipe:", 5, 0 });//分配根节点的dentry结构 if (!sb->s_root) { iput(root); return NULL; } sb->s_root->d_sb = sb; sb->s_root->d_parent = sb->s_root; d_instantiate(sb->s_root, root);//根节点的dentry和根节点的inode结构相连 return sb; }
二、我们来看应用管道通信的相关代码,假设系统中只有这两个进程:
#include <stdio.h> #include <unistd.h> int main() { int n,fd[2]; pid_t pid; int i,j; char strchar str2[512]; if(pipe(fd)<0)//fd[0]是读端,fd[1]是写端 { printf("pipe error\n"); return -1; } if((pid = fork())<0)//父子进程都有了同样file结构 { printf("fork error\n"); return -1; }else if(pid > 0) { close(fd[0]);//父进程做为写端,关闭读端 write(fd[1],str1,strlen(str1)); } else { close(fd[1]);//子进程做为读端,关闭写端 read(fd[0],str2,strlen(str2)); } }
pipe的系统调用是sys_pipe(),代码如下:
asmlinkage int sys_pipe(unsigned long * fildes) { int fd[2]; int error; error = do_pipe(fd);//fd[]返回代表着管道两端的两个已打开文件号 if (!error) { if (copy_to_user(fildes, fd, 2*sizeof(int)))//将数组fd[]复制到用户空间 error = -EFAULT; } return error; }
int do_pipe(int *fd) { struct qstr this; char name[32]; struct dentry *dentry; struct inode * inode; struct file *f1, *f2; int error; int i,j; error = -ENFILE; f1 = get_empty_filp();//获取读端空的file结构 if (!f1) goto no_files; f2 = get_empty_filp();//获取写端空的file结构 if (!f2) goto close_f1; inode = get_pipe_inode();//获取读端和写端共同使用的管道节点的inode结构 if (!inode) goto close_f12; error = get_unused_fd();//获得读端文件号 if (error < 0) goto close_f12_inode; i = error; error = get_unused_fd();//获得写端文件号 if (error < 0) goto close_f12_inode_i; j = error; error = -ENOMEM; sprintf(name, "[%lu]", inode->i_ino); this.name = name; this.len = strlen(name); this.hash = inode->i_ino; /* will go */ dentry = d_alloc(pipe_mnt->mnt_sb->s_root, &this);//分配管道节点的dentry结构 if (!dentry) goto close_f12_inode_i_j; dentry->d_op = &pipefs_dentry_operations; d_add(dentry, inode);//互相关联 f1->f_vfsmnt = f2->f_vfsmnt = mntget(mntget(pipe_mnt)); f1->f_dentry = f2->f_dentry = dget(dentry);//f_dentry指向所打开文件的目录项dentry数据结构 /* read file */ f1->f_pos = f2->f_pos = 0; f1->f_flags = O_RDONLY; f1->f_op = &read_pipe_fops;//读端的f_op f1->f_mode = 1; f1->f_version = 0; /* write file */ f2->f_flags = O_WRONLY; f2->f_op = &write_pipe_fops;//写端的f_op f2->f_mode = 2; f2->f_version = 0; fd_install(i, f1);//f1安装到current->files[i]上 fd_install(j, f2);//f2安装到current->files[j]上 fd[0] = i; fd[1] = j; return 0; close_f12_inode_i_j: put_unused_fd(j); close_f12_inode_i: put_unused_fd(i); close_f12_inode: free_page((unsigned long) PIPE_BASE(*inode)); kfree(inode->i_pipe); inode->i_pipe = NULL; iput(inode); close_f12: put_filp(f2); close_f1: put_filp(f1); no_files: return error; }
get_pipe_inode,获取读端和写端共同使用的管道节点的inode结构,代码如下:
static struct inode * get_pipe_inode(void) { struct inode *inode = get_empty_inode(); if (!inode) goto fail_inode; if(!pipe_new(inode))//分配所需要的缓冲区 goto fail_iput; PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 1; inode->i_fop = &rdwr_pipe_fops;//这里以后会用到 inode->i_sb = pipe_mnt->mnt_sb; /* * Mark the inode dirty from the very beginning, * that way it will never be moved to the dirty * list because "mark_inode_dirty()" will think * that it already _is_ on the dirty list. */ inode->i_state = I_DIRTY; inode->i_mode = S_IFIFO | S_IRUSR | S_IWUSR; inode->i_uid = current->fsuid; inode->i_gid = current->fsgid; inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME; inode->i_blksize = PAGE_SIZE; return inode; fail_iput: iput(inode); fail_inode: return NULL; }
pipe_new,分配所需要的缓冲区,代码如下:
struct inode* pipe_new(struct inode* inode) { unsigned long page; page = __get_free_page(GFP_USER); if (!page) return NULL; inode->i_pipe = kmalloc(sizeof(struct pipe_inode_info), GFP_KERNEL);//i_pipe是指向一个pipe_inode_info数据结构的指针 if (!inode->i_pipe) goto fail_page; init_waitqueue_head(PIPE_WAIT(*inode)); PIPE_BASE(*inode) = (char*) page;//(inode).i_pipe->base指向了页面的起始地址 PIPE_START(*inode) = PIPE_LEN(*inode) = 0; PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 0; PIPE_WAITING_READERS(*inode) = PIPE_WAITING_WRITERS(*inode) = 0; PIPE_RCOUNTER(*inode) = PIPE_WCOUNTER(*inode) = 1; return inode; fail_page: free_page(page); return NULL; }
pipe_inode_info结构如下:
struct pipe_inode_info { wait_queue_head_t wait; char *base; unsigned int start; unsigned int readers; unsigned int writers; unsigned int waiting_readers; unsigned int waiting_writers; unsigned int r_counter; unsigned int w_counter; };
pipe_new里面宏定义如下:
#define PIPE_SEM(inode) (&(inode).i_sem) #define PIPE_WAIT(inode) (&(inode).i_pipe->wait) #define PIPE_BASE(inode) ((inode).i_pipe->base) #define PIPE_START(inode) ((inode).i_pipe->start) #define PIPE_LEN(inode) ((inode).i_size) #define PIPE_READERS(inode) ((inode).i_pipe->readers) #define PIPE_WRITERS(inode) ((inode).i_pipe->writers) #define PIPE_WAITING_READERS(inode) ((inode).i_pipe->waiting_readers) #define PIPE_WAITING_WRITERS(inode) ((inode).i_pipe->waiting_writers) #define PIPE_RCOUNTER(inode) ((inode).i_pipe->r_counter) #define PIPE_WCOUNTER(inode) ((inode).i_pipe->w_counter) #define PIPE_EMPTY(inode) (PIPE_LEN(inode) == 0) #define PIPE_FULL(inode) (PIPE_LEN(inode) == PIPE_SIZE) #define PIPE_FREE(inode) (PIPE_SIZE - PIPE_LEN(inode)) #define PIPE_END(inode) ((PIPE_START(inode) + PIPE_LEN(inode)) & (PIPE_SIZE-1)) #define PIPE_MAX_RCHUNK(inode) (PIPE_SIZE - PIPE_START(inode)) #define PIPE_MAX_WCHUNK(inode) (PIPE_SIZE - PIPE_END(inode))
sys_pipe继续执行,对于管道的两端来说,管道是单向的,所以其f1一端设置成"只读",而另一端则设置成"只写"。同时,两端的文件操作也分别设置成read_pipe_fops和write_pipe_fops。
struct file_operations read_pipe_fops = { llseek: pipe_lseek, read: pipe_read, write: bad_pipe_w, poll: pipe_poll, ioctl: pipe_ioctl, open: pipe_read_open, release: pipe_read_release, };
struct file_operations write_pipe_fops = { llseek: pipe_lseek, read: bad_pipe_r, write: pipe_write, poll: pipe_poll, ioctl: pipe_ioctl, open: pipe_write_open, release: pipe_write_release, };
读者可能会问,前面管道节点的inode数据结构将指针i_fop设置成指向rdwr_pipe_fops,那显然是双向的。这里是不是有矛盾吗?其实不然,对于代表着管道两端的两个已打开文件来说,一个只能写而另一个只能读,这是事情的一个方面。可是另一方面,这两个逻辑上的已打开文件都通向同一个inode、同一个物理上存在的"文件",即用做管道的缓冲区;显然这个缓冲区应该既支持写又支持读,这样才能使数据流通。
struct file_operations rdwr_pipe_fops = { llseek: pipe_lseek, read: pipe_read, write: pipe_write, poll: pipe_poll, ioctl: pipe_ioctl, open: pipe_rdwr_open, release: pipe_rdwr_release, };
fork,父子进程都有相同的file结构,因为copy_file中有如下代码:
new_fds = newf->fd; for (i = open_files; i != 0; i--) { struct file *f = *old_fds++; if (f) get_file(f); *new_fds++ = f; } tsk->files = newf;
我们假设父进程先执行,并且关闭了读端,执行write(fd[1],str1,strlen(str1)),映射到内核,sys_write,代码如下:
asmlinkage ssize_t sys_write(unsigned int fd, const char * buf, size_t count) { ssize_t ret; struct file * file; ret = -EBADF; file = fget(fd); if (file) { if (file->f_mode & FMODE_WRITE) { struct inode *inode = file->f_dentry->d_inode; ret = locks_verify_area(FLOCK_VERIFY_WRITE, inode, file, file->f_pos, count); if (!ret) { ssize_t (*write)(struct file *, const char *, size_t, loff_t *); ret = -EINVAL; if (file->f_op && (write = file->f_op->write) != NULL) ret = write(file, buf, count, &file->f_pos);//pipe_write } } if (ret > 0) inode_dir_notify(file->f_dentry->d_parent->d_inode, DN_MODIFY); fput(file); } return ret; }
write(file, buf, count, &file->f_pos)调用pipe_write(file, buf, count, &file->f_pos)。
static ssize_t pipe_write(struct file *filp, const char *buf, size_t count, loff_t *ppos) { struct inode *inode = filp->f_dentry->d_inode; ssize_t free, written, ret; /* Seeks are not allowed on pipes. */ ret = -ESPIPE; written = 0; if (ppos != &filp->f_pos) goto out_nolock; /* Null write succeeds. */ ret = 0; if (count == 0) goto out_nolock; ret = -ERESTARTSYS; if (down_interruptible(PIPE_SEM(*inode))) goto out_nolock; /* No readers yields SIGPIPE. */ if (!PIPE_READERS(*inode)) goto sigpipe; /* If count <= PIPE_BUF, we have to make it atomic. */ free = (count <= PIPE_BUF ? count : 1);//如果要写入的字节数大于整个缓冲区的大小,那么free为1 /* Wait, or check for, available space. */ if (filp->f_flags & O_NONBLOCK) { ret = -EAGAIN; if (PIPE_FREE(*inode) < free) goto out; } else { while (PIPE_FREE(*inode) < free) {//缓冲区剩余空间小于要写入的字节数,如果free为1,表示只要有一个字节的剩余空间,就不需要睡眠 PIPE_WAITING_WRITERS(*inode)++;//waiting_writers++ pipe_wait(inode);//那么就要睡眠等待 PIPE_WAITING_WRITERS(*inode)--; ret = -ERESTARTSYS; if (signal_pending(current)) goto out; if (!PIPE_READERS(*inode)) goto sigpipe; } } /* Copy into available space. */ ret = -EFAULT; while (count > 0) {//如果缓冲区剩余空间大于要写入的字节数 int space; char *pipebuf = PIPE_BASE(*inode) + PIPE_END(*inode); ssize_t chars = PIPE_MAX_WCHUNK(*inode); if ((space = PIPE_FREE(*inode)) != 0) {//如果没有剩余空间了,那么就只说明,要写的字节大于缓冲区的总大小,执行下面的do_while循环 if (chars > count) chars = count; if (chars > space) chars = space; if (copy_from_user(pipebuf, buf, chars)) goto out; written += chars; PIPE_LEN(*inode) += chars; count -= chars; buf += chars; space = PIPE_FREE(*inode); continue;//由于是缓冲区是一个循环的页面,如果一次写操作,跨过页面开始位置,要分两次写,continue如果count>0,就表示分两次写,如果count不大于0,说明分一次写,并且已经写完了 } ret = written; if (filp->f_flags & O_NONBLOCK) break; do { /* * Synchronous wake-up: it knows that this process * is going to give up this CPU, so it doesnt have * to do idle reschedules. */ wake_up_interruptible_sync(PIPE_WAIT(*inode));//唤醒读端进程 PIPE_WAITING_WRITERS(*inode)++;//waiting_writers++ pipe_wait(inode);//写端进程睡眠等待 PIPE_WAITING_WRITERS(*inode)--; if (signal_pending(current)) goto out; if (!PIPE_READERS(*inode)) goto sigpipe; } while (!PIPE_FREE(*inode));//只要有一个字节的剩余空间,就退出do_while循环,再进行一次外部while循环 ret = -EFAULT; } /* Signal readers asynchronously that there is more data. */ wake_up_interruptible(PIPE_WAIT(*inode));//如果分一次写,分两次写,是不会执行上面的do_while循环的,但也要唤醒读端进程 inode->i_ctime = inode->i_mtime = CURRENT_TIME; mark_inode_dirty(inode); out: up(PIPE_SEM(*inode)); out_nolock: if (written) ret = written; return ret; sigpipe: if (written) goto out; up(PIPE_SEM(*inode)); send_sig(SIGPIPE, current, 0); return -EPIPE; }
pipe_wait,睡眠等待,代码如下:
void pipe_wait(struct inode * inode) { DECLARE_WAITQUEUE(wait, current);//把当前进程的task_struct结构和wait_queue连在一起,见下面代码 current->state = TASK_INTERRUPTIBLE;//当前进程设置为可中断等待状态 add_wait_queue(PIPE_WAIT(*inode), &wait);//把wait加入到(inode).i_pipe->wait up(PIPE_SEM(*inode)); schedule();//切换进程 remove_wait_queue(PIPE_WAIT(*inode), &wait); current->state = TASK_RUNNING; down(PIPE_SEM(*inode)); }
#define DECLARE_WAITQUEUE(wait, current) struct wait_queue wait = { current, NULL }
void add_wait_queue(wait_queue_head_t *q, wait_queue_t * wait) { unsigned long flags; wq_write_lock_irqsave(&q->lock, flags); wait->flags = 0; __add_wait_queue(q, wait); wq_write_unlock_irqrestore(&q->lock, flags); }
static inline void __add_wait_queue(wait_queue_head_t *head, wait_queue_t *new) { ...... list_add(&new->task_list, &head->task_list); }
总结一下:
1、如果剩余空间不足,那么就马上睡眠等待;
2、如果剩余空间足够,分一次写或者分两次写后,唤醒读端进程,程序返回;
3、如果剩余空间足够,但是要写入的字节数大于缓冲区大小,那么也要唤醒读端进程,并睡眠等待。
读和写都是往一个循环的页面,读和写内容的,如下图:
然后子进程开始执行,并且关闭了写端,执行read(fd[0],str2,strlen(str2)),映射到内核,sys_read,代码如下:
asmlinkage ssize_t sys_read(unsigned int fd, char * buf, size_t count) { ssize_t ret; struct file * file; ret = -EBADF; file = fget(fd); if (file) { if (file->f_mode & FMODE_READ) { ret = locks_verify_area(FLOCK_VERIFY_READ, file->f_dentry->d_inode, file, file->f_pos, count); if (!ret) { ssize_t (*read)(struct file *, char *, size_t, loff_t *); ret = -EINVAL; if (file->f_op && (read = file->f_op->read) != NULL) ret = read(file, buf, count, &file->f_pos);//pipe_read } } if (ret > 0) inode_dir_notify(file->f_dentry->d_parent->d_inode, DN_ACCESS); fput(file); } return ret; }
read(file, buf, count, &file->f_pos)指向pipe_read(file, buf, count, &file->f_pos),代码如下:
static ssize_t pipe_read(struct file *filp, char *buf, size_t count, loff_t *ppos) { struct inode *inode = filp->f_dentry->d_inode; ssize_t size, read, ret; /* Seeks are not allowed on pipes. */ ret = -ESPIPE; read = 0; if (ppos != &filp->f_pos) goto out_nolock; /* Always return 0 on null read. */ ret = 0; if (count == 0) goto out_nolock; /* Get the pipe semaphore */ ret = -ERESTARTSYS; if (down_interruptible(PIPE_SEM(*inode))) goto out_nolock; if (PIPE_EMPTY(*inode)) { do_more_read: ret = 0; if (!PIPE_WRITERS(*inode)) goto out; ret = -EAGAIN; if (filp->f_flags & O_NONBLOCK) goto out; for (;;) {//如果缓冲区没有字节可读,那么就睡眠等待 PIPE_WAITING_READERS(*inode)++;//waiting_readers++ pipe_wait(inode); PIPE_WAITING_READERS(*inode)--; ret = -ERESTARTSYS; if (signal_pending(current)) goto out; ret = 0; if (!PIPE_EMPTY(*inode)) break; if (!PIPE_WRITERS(*inode)) goto out; } } /* Read what data is available. */ ret = -EFAULT; while (count > 0 && (size = PIPE_LEN(*inode))) {//如果缓冲区中有字节,分一次或者两次读完缓冲区的字节 char *pipebuf = PIPE_BASE(*inode) + PIPE_START(*inode); ssize_t chars = PIPE_MAX_RCHUNK(*inode); if (chars > count) chars = count; if (chars > size) chars = size; if (copy_to_user(buf, pipebuf, chars)) goto out; read += chars; PIPE_START(*inode) += chars; PIPE_START(*inode) &= (PIPE_SIZE - 1); PIPE_LEN(*inode) -= chars; count -= chars; buf += chars; } /* Cache behaviour optimization */ if (!PIPE_LEN(*inode)) PIPE_START(*inode) = 0; if (count && PIPE_WAITING_WRITERS(*inode) && !(filp->f_flags & O_NONBLOCK)) {//如果count大于0,并且有等待写的进程,那么说明count要读的字节数大于缓冲区现有的字节数,或者大于缓冲区的总大小,要睡眠等待,并唤醒写端进程 /* * We know that we are going to sleep: signal * writers synchronously that there is more * room. */ wake_up_interruptible_sync(PIPE_WAIT(*inode));//唤醒写端进程 if (!PIPE_EMPTY(*inode)) BUG(); goto do_more_read;//返回到do_more_read睡眠等待 } /* Signal writers asynchronously that there is more room. */ wake_up_interruptible(PIPE_WAIT(*inode));//如果count为0,说明分一次或者分两次读完了所需要的字节数,那么也要唤醒写端进程 ret = read; out: up(PIPE_SEM(*inode)); out_nolock: if (read) ret = read; return ret; }
总结一下:
1、如果没有可读的字节,那么就马上睡眠等待;
2、如果有可读的字节,分一次写或者分两次写后,唤醒写端进程,程序返回;
3、如果又可读的字节,但是要读入的字节数大于缓冲区现有的字节数,或者大于缓冲区的总大小,那么也要唤醒写端进程,并睡眠等待。