重新审视进程间的通信(一)

最近干活的时候又被Linux管道和消息队列搞的一脸懵逼。当初自己走马观花似的学习以为内容很简单,结果留下了大坑,借来Unix网络编程来补补,重新审视这两个部分,并且引以为戒!!!

首先看管道

#include<unistd.h>

int pipe(int fd[2]);

返回:成功为0,出错为1,两个文件描述符fd[0]用来读,fd[1]用来写

灵魂作图

单进程管道

刚fork后

父进程关闭管道读出端,子进程关闭管道写入端,在父子进程间提供一个单向数据流

管道只能用于父子进程或者兄弟进程间通信,也就是说管道只能用于具有亲缘关系的进程间通信

管道的缓冲区大小是受限制的。管道所传输的是无格式的字节流。这就需要管道输入方和输出方事先约定好数据格式

有名管道可用于没有亲缘关系的进程间通信(name pipe或者叫FIFO)

#include<sys/types.h>
#include<sys/stat.h>

int mkinfo(const char *pathname, mode_t mode);//<span style="font-family: 宋体, Arial; line-height: 26px;"><span style="font-size:12px;">pathname为创建有名管道的全路径名,mode为创建有名管道的模式</span></span>

返回:若成功则0,不成功则-1

实现分析

//管道缓冲区个数
#define PIPE_BUFFERS (16)
//管道缓存区对象结构
struct pipe_buffer {
    struct page *page; //管道缓冲区页框的描述符地址
    unsigned int offset, len; //页框内有效数据的当前位置,和有效数据的长度
    struct pipe_buf_operations *ops; //管道缓存区方法表的地址
};
//管道信息结构
struct pipe_inode_info {
    wait_queue_head_t wait; //管道等待队列
    unsigned int nrbufs, curbuf; //包含待读数据的缓冲区数和包含待读数据的第一个缓冲区的索引
    struct pipe_buffer bufs[PIPE_BUFFERS]; //管道缓冲区描述符数组
    struct page *tmp_page; //高速缓存区页框指针
    unsigned int start;  //当前管道缓存区读的位置
    unsigned int readers; //读进程的标志,或编号
    unsigned int writers; //写进程的标志,或编号
    unsigned int waiting_writers; //在等待队列中睡眠的写进程的个数
    unsigned int r_counter; //与readers类似,但当等待写入FIFO的进程是使用
    unsigned int w_counter; //与writers类似,但当等待写入FIFO的进程时使用
    struct fasync_struct *fasync_readers; //用于通过信号进行的异步I/O通知
    struct fasync_struct *fasync_writers; //用于通过信号的异步I/O通知
};
//管道读操作函数
static ssize_t
pipe_readv(struct file *filp, const struct iovec *_iov,
    unsigned long nr_segs, loff_t *ppos)
{
 struct inode *inode = filp->f_dentry->d_inode; //获取inode结点指针
 struct pipe_inode_info *info;
 int do_wakeup;
 ssize_t ret;
 struct iovec *iov = (struct iovec *)_iov; //获取读缓冲区的结构
 size_t total_len;
 total_len = iov_length(iov, nr_segs);
 /* Null read succeeds. */
 if (unlikely(total_len == 0))
  return 0;
 do_wakeup = 0;
 ret = 0;
 down(PIPE_SEM(*inode)); //获取inode中的i_sem信号量
 info = inode->i_pipe; //获取inode 结构的pipe_inode_info结构指针
 for (;;) {
  int bufs = info->nrbufs; //检查有几个管道缓冲区有被读取的数据
  if (bufs) { //说明有其中有缓冲区包含了读数据
   int curbuf = info->curbuf; //获取当前读数据的管道缓存区的索引
   struct pipe_buffer *buf = info->bufs + curbuf; //共有16个缓冲区,curbuf是当前的
   struct pipe_buf_operations *ops = buf->ops; //获取操作函数列表
   void *addr;
   size_t chars = buf->len;
   int error;
   //若缓冲区长度大于要求读取的数据长度,chars设置成要求读的长度
   if (chars > total_len)
    chars = total_len;
   //执行Map方法
   addr = ops->map(filp, info, buf);
   //从缓存区中复制数据
   error = pipe_iov_copy_to_user(iov, addr + buf->offset, chars);
   //执行umap方法
   ops->unmap(info, buf);
   if (unlikely(error)) {
    if (!ret) ret = -EFAULT; //第一次读失败
    break;
   }
   //更新管道的offset和len字段
   ret += chars;
   buf->offset += chars;
   buf->len -= chars;

   //若现在的缓存区的数据长度为0
   if (!buf->len) {
    buf->ops = NULL;
    ops->release(info, buf);
    curbuf = (curbuf + 1) & (PIPE_BUFFERS-1);
    info->curbuf = curbuf;
    info->nrbufs = --bufs;
    do_wakeup = 1;
   }
   total_len -= chars;  //更新读的总长度
   if (!total_len)  //该读的已读完成
    break; /* common path: read succeeded */
  }
  if (bufs) /* More to do? */
   continue;
  //若bufs为0,说明所有管道为NULL,此时进行一下操作
  if (!PIPE_WRITERS(*inode)) //是否有写操作正在进行
   break;
  if (!PIPE_WAITING_WRITERS(*inode)) { //是否需要等待
   /* syscall merging: Usually we must not sleep
    * if O_NONBLOCK is set, or if we got some data.
    * But if a writer sleeps in kernel space, then
    * we can wait for that data without violating POSIX.
    */
   if (ret)
    break;
   if (filp->f_flags & O_NONBLOCK) { //要等待但又设置了NONBLOCK标记,矛盾了
    ret = -EAGAIN;
    break;
   }
  }
  if (signal_pending(current)) { //设置进程阻塞标志
   if (!ret) ret = -ERESTARTSYS;
   break;
  }
  if (do_wakeup) {
   wake_up_interruptible_sync(PIPE_WAIT(*inode));
    kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT);
  }
  pipe_wait(inode);
 }
 up(PIPE_SEM(*inode));
 /* Signal writers asynchronously that there is more room.  */
 if (do_wakeup) {
  wake_up_interruptible(PIPE_WAIT(*inode));
  kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT);
 }
 if (ret > 0)
  file_accessed(filp);  //更新文件结构的atime对象
 return ret;
}
static ssize_t
pipe_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
{
 struct iovec iov = { .iov_base = buf, .iov_len = count };
 return pipe_readv(filp, &iov, 1, ppos);
}

/* Drop the inode semaphore and wait for a pipe event, atomically */
void pipe_wait(struct inode * inode)
{
    DEFINE_WAIT(wait);
 //把current添加到管道的等待队列中
    prepare_to_wait(PIPE_WAIT(*inode), &wait, TASK_INTERRUPTIBLE);
 //释放i_sem
    up(PIPE_SEM(*inode));
    schedule();
 //被呼醒,把它从等待队列中删除
    finish_wait(PIPE_WAIT(*inode), &wait);
 //再次获取i_sem索引节点信号量
    down(PIPE_SEM(*inode));
}
static ssize_t
pipe_writev(struct file *filp, const struct iovec *_iov,
     unsigned long nr_segs, loff_t *ppos)
{
    struct inode *inode = filp->f_dentry->d_inode;
    struct pipe_inode_info *info;
    ssize_t ret;
    int do_wakeup;
    struct iovec *iov = (struct iovec *)_iov;
    size_t total_len;

    total_len = iov_length(iov, nr_segs);
    /* Null write succeeds. */
    if (unlikely(total_len == 0))
        return 0;

    do_wakeup = 0;
    ret = 0;
    down(PIPE_SEM(*inode));
    info = inode->i_pipe;

    //是否有读者进程存在,若没有写管道操作就没有任何意义

    //此时产生SIGPIPE信号

    if (!PIPE_READERS(*inode)) {
        send_sig(SIGPIPE, current, 0);
        ret = -EPIPE;
        goto out;
    }

    /* We try to merge small writes */
    //若有待读数据的缓冲区,而且写入的数据长度小于PAGE_SIZE

    if (info->nrbufs && total_len < PAGE_SIZE) {
        //第一个待读缓冲区+可读缓冲区数-1得到第一个可写缓冲区的地址

        int lastbuf = (info->curbuf + info->nrbufs - 1) & (PIPE_BUFFERS-1);
        struct pipe_buffer *buf = info->bufs + lastbuf;
        struct pipe_buf_operations *ops = buf->ops;
        int offset = buf->offset + buf->len;
        //若可写缓冲区的剩余的空间大于写入的数据总量total_len

        if (ops->can_merge && offset + total_len <= PAGE_SIZE) {
            void *addr = ops->map(filp, info, buf);
            //把数据复制到管道缓冲区

            int error = pipe_iov_copy_from_user(offset + addr, iov, total_len);
            ops->unmap(info, buf);
            ret = error;
            do_wakeup = 1;
            if (error)
                goto out;
            //更新有效数据长度字段

            buf->len += total_len;
            ret = total_len;
            goto out;
        }

    }

    // 若全部可写(可读缓冲区数为0),

    // 或写入数据长度大于管道缓冲区的长度单位(PAGE_SIZE)

    for (;;) {
        int bufs;
        //是否有读者进程存在

        if (!PIPE_READERS(*inode)) {
            send_sig(SIGPIPE, current, 0);
            if (!ret) ret = -EPIPE;
            break;
        }
        //获取读缓冲区数

        bufs = info->nrbufs;
        if (bufs < PIPE_BUFFERS) {
            ssize_t chars;
            //用第一个可读缓冲区+可读缓冲区数得到可写(空)缓冲区的地址

            int newbuf = (info->curbuf + bufs) & (PIPE_BUFFERS-1);
            struct pipe_buffer *buf = info->bufs + newbuf;
            struct page *page = info->tmp_page;
            int error;

            //若page的值为空,从伙伴系统中获取一页

            if (!page) {
                page = alloc_page(GFP_HIGHUSER);
                if (unlikely(!page)) {
                    ret = ret ? : -ENOMEM;
                    break;
                }
                info->tmp_page = page;
            }
            /* Always wakeup, even if the copy fails. Otherwise
             * we lock up (O_NONBLOCK-)readers that sleep due to
             * syscall merging.
             * FIXME! Is this really true?
             */
            do_wakeup = 1;
            chars = PAGE_SIZE;
            if (chars > total_len)
                chars = total_len;

            //写chars字节到缓冲区中

            error = pipe_iov_copy_from_user(kmap(page), iov, chars);
            kunmap(page);
            if (unlikely(error)) {
                if (!ret) ret = -EFAULT;
                break;
            }
            ret += chars;

            /* Insert it into the buffer array */
            /更新nrbufs,和len字段。
            buf->page = page;
            buf->ops = &anon_pipe_buf_ops;
            buf->offset = 0;
            buf->len = chars;
            info->nrbufs = ++bufs;
            info->tmp_page = NULL;

            //若没有写完继续写入剩下的数据

            total_len -= chars;
            if (!total_len)
                break;
        }
        //还有可写缓冲区,继续写

        if (bufs < PIPE_BUFFERS)
            continue;
        //若设置非阻塞,

        //若没有写入任何的数据ret=0,此时返回错误

        //若已经写完了数据,结束写操作。

        if (filp->f_flags & O_NONBLOCK) {
            if (!ret) ret = -EAGAIN;
            break;
        }
        if (signal_pending(current)) {
            if (!ret) ret = -ERESTARTSYS;
            break;
        }
        if (do_wakeup) {
            wake_up_interruptible_sync(PIPE_WAIT(*inode));
            kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN);
            do_wakeup = 0;
        }
        PIPE_WAITING_WRITERS(*inode)++;
        pipe_wait(inode);
        PIPE_WAITING_WRITERS(*inode)--;
    }
out:
    up(PIPE_SEM(*inode));
    if (do_wakeup) {
        wake_up_interruptible(PIPE_WAIT(*inode));
        kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN);
    }
    if (ret > 0)
        inode_update_time(inode, 1);    /* mtime and ctime */
    return ret;
}

PS:管道是作为一组VFS对象来实现的,因此没有对应的磁盘映像。所以管道的安装和实现都是VFS类似,此处不进行探讨

时间: 2024-11-04 16:47:54

重新审视进程间的通信(一)的相关文章

Android 使用AIDL实现进程间的通信

在Android中,如果我们需要在不同进程间实现通信,就需要用到AIDL技术去完成. AIDL(android Interface Definition Language)是一种接口定义语言,编译器通过*.aidl文件的描述信息生成符合通信协议的Java代码,我们无需自己去写这段繁杂的代码,只需要在需要的时候调用即可,通过这种方式我们就可以完成进程间的通信工作.关于AIDL的编写规则我在这里就不多介绍了,读者可以到网上查找一下相关资料. 接下来,我就演示一个操作AIDL的最基本的流程. 首先,我

进程间的通信如何实现

进程间的通信如何实现 版权声明:本文为博主原创文章,未经博主允许不得转载.

linux 进程间的通信

现在linux使用的进程间通信方式:(1)管道(pipe)和有名管道(FIFO)(2)信号(signal)(3)消息队列(4)共享内存(5)信号量(6)套接字(socket) 为何进行进程间的通信:A.数据传输:一个进程需要将它的数据发送给另一个进程,发送的数据量在一个字节到几M字节之间B.共享数据:多个进程想要操作共享数据,一个进程对共享数据的修改,别的进程应该立刻看到.C.通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程).D.资源共享

Linux进程间的通信

一.管道 管道是Linux支持的最初Unix IPC形式之一,具有以下特点: A. 管道是半双工的,数据只能向一个方向流动: B. 需要双工通信时,需要建立起两个管道: C. 只能用于父子进程或者兄弟进程之间(具有亲缘关系的进程): D. 单独构成一种独立的文件系统:管道对于管道两端的进程而言,就是一个文件,但它不是普通的文件,它不属于某种文件系统,而是自立门户,单独构成一种文件系统,并且只存在与内存中. 匿名管道的创建:该函数创建的管道的两端处于一个进程中间,在实际应用中没有太大意义;因此,一

【转】使用AIDL实现进程间的通信之复杂类型传递

使用AIDL实现进程间的通信之复杂类型传递 首先要了解一下AIDL对Java类型的支持. 1.AIDL支持Java原始数据类型. 2.AIDL支持String和CharSequence. 3.AIDL支持传递其他AIDL接口,但你引用的每个AIDL接口都需要一个import语句,即使位于同一个包中. 4.AIDL支持传递实现了android.os.Parcelable接口的复杂类型,同样在引用这些类型时也需要import语句.(Parcelable接口告诉Android运行时在封送(marsha

Linux进程间的通信方法

linux进程间的通信方法总结如下 通过fork函数把打开文件的描述符传递给子进程 通过wait得到子进程的终结信息 通过加锁的方式,实现几个进行共享读写某个文件 进行间通过信号通信,SIGUSR1和SIGUSR2实现用户定义功能 利用pipe进行通信 FIFO文件进行通信 mmap,几个进程映射到同一内存区 SYS IPC 消息队列,信号量(很少用) UNIX Domain Socket,常用

进程间的通信:管道

进程间的通信:管道 Linux中将命令联系到一起使用实际上就是把一个进程的输出通过管道传递给另一个进程的输入,这些都是shell封装好的,对标准输入和输出流进行了重新连接,使数据流从键盘输入经过两个程序最终输出到屏幕上.如下: cmd1|cmd2 进程管道 在两个程序之间传递数据最简单的方法就是使用popen()和pclose()了.原型如下: #include <stdio.h> FILE *popen(const char *command, const char *open_mode);

Linux/UNIX进程间的通信(1)

进程间的通信(1) 进程间的通信IPC(InterProcessCommunication )主要有以下不同形式: 半双工管道和FIFO:全双工管道和命名全双工管道:消息队列,信号量和共享存储:套接字和STREAMS 管道 pipe函数 当从一个进程连接到另一个进程时,我们使用术语管道.我们通常是把一个进程的输出通过管道连接到另一个进程的输入. 管道是由调用pipe函数创建的: #include<unistd.h> int pipe(intpipefd[2]); 经由参数pipefd返回两个文

进程间的通信简单总结

运行在不同的端系统的之间的通信: 进程指运行在端系统上的一个程序. 若要进行进程之间的通信,那么进程肯定就是成对出现的. 进程间的通信你可以想象是两个人用电话进行交流,然后它必须需要进行通信的一些基础设施,这个基础设施就是套接字,如果你在在两个进程之间进行数据交流,你可以把套接字想象成一个门,你打开门把你要传递的数据丢出去,然后另一个进程打开门,然后就收数据. 但是你把数据丢出门,它是如何找到目的地,另一个进程呢?数据报从一个进程找到另一个进程这就是进程寻址.你在打电话你需要提供电话号码,因此你