多进程编程之进程间通信-管道和消息队列

1.进程间通信

Linux作为一种新兴的操作系统,几乎支持所有的Unix下常用的进程间通信方法:管道、消息队列、共享内存、信号量、套接口等等。

2.2.1 管道

管道是进程间通信中最古老的方式,它包括无名管道和有名管道两种,前者用于父进程和子进程间的通信,后者用于运行于同一台机器上的任意两个进程间的通信。

无名管道pipe

无名管道由pipe()函数创建:

 #include <unistd.h>
 int pipe(int filedis[2]); 

参数filedis返回两个文件描述符:filedes[0]为读而打开,filedes[1]为写而打开。filedes[1]的输出是filedes[0]的输入。下面的例子示范了如何在父进程和子进程间实现通信。

跨越fork调用的管道:

例子代码:

#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>

int main()
{
    int data_processed;
    int file_pipes[2];
    const char some_data[] = "123";
    char buffer[BUFSIZ + 1];
    pid_t fork_result;

    memset(buffer, ‘\0‘, sizeof(buffer));

    if (pipe(file_pipes) == 0)
    {
        fork_result = fork();
        if (fork_result == -1) {
            fprintf(stderr, "Fork failure");
            exit(EXIT_FAILURE);
        }

// We‘ve made sure the fork worked, so if fork_result equals zero, we‘re in the child process.

        if (fork_result == 0)
        {
            // sleep(1);//如果父进程在子进程之前退出,则可以在两个输出内容之间看到shell的提示符
            data_processed = read(file_pipes[0], buffer, BUFSIZ);
            printf("Read %d bytes: %s\n", data_processed, buffer);
            exit(EXIT_SUCCESS);
        }

// Otherwise, we must be the parent process.

        else
        {

            data_processed = write(file_pipes[1], some_data,
                                   strlen(some_data));
            printf("Wrote %d bytes\n", data_processed);//不管sleep(1)还是sleep(10)多少,都是先执行父进程???

        }
    }
    exit(EXIT_SUCCESS);
}

运行结果:

基于管道的进程间通信:

int main()
{
    int mycount=0;
    ifstream m_InStream("subdata.txt");//读取的任务列表
    int data_processed;
    int file_pipes[3][2];
    string some_data;

    pid_t fork_result[3];
    for (int i = 0; i < 3; ++i)
    {
        if (pipe(file_pipes[i]) == 0)
        {
            fork_result[i] = fork();
            if (fork_result[i] == 0)
            {
                close(file_pipes[i][1]);
                char buf[1024] = {0};
                while(true)
                {
                    int readsize = read(file_pipes[i][0], buf, sizeof(buf)+1);
                    buf[readsize]=‘\0‘;//注意这里,可以存在字符之后有其他字符,可以尝试下,注释掉该语句的效果!!
                    // printf("%X,%X,%X\n",buf[readsize],buf[readsize+1],buf[readsize+2]);
                    printf("Read %d byte,data=%s\n",readsize, buf);
                    std::cout << flush;// fflush(stdout);
                    if (strncmp(buf, "exit", 4) == 0)
                    {
                        break;
                    }
                }
                exit(0);
            }
            else if(fork_result[i] == -1)
            {
                fprintf(stderr, "Fork failure");
                exit(EXIT_FAILURE);
            }
        }
    }

    string oneline;
    int count=0;
    while(getline(m_InStream, oneline,‘\n‘))
    {
        some_data = oneline;
        char bufs[128] = {0};
        strncpy(bufs,some_data.c_str(),some_data.size());
        data_processed = write(file_pipes[count][1], bufs, strlen(bufs));
        printf("Wrote %d bytes,data=%s\n", data_processed,bufs);
        count++;
        if(count>=3)
        {
            count = count -3;
        }
        sleep(1);
    }
    for(int i=0; i<3; ++i)
    {
        char bufs[128] = {0};
        snprintf(bufs, sizeof(bufs), "exit");
        write(file_pipes[i][1], bufs, strlen(bufs));
        sleep(1);
    }
    int status =0;
    int mpid =0;
    for(int i=0;i<3;i++)
    {
        mpid = wait(&status);
        printf("pid[%d] is exit with status[%d]\n",mpid,status);
    }
    return 0;
}

运行结果如下:

popen和pclose

有名管道(也叫做命名管道)

基于有名管道,我们可以实现不相关进程之间的数据交换,而不必有一个共同的祖先进程。有名管道是基于FIFO文件来完成的,有名管道是一种特殊类型的文件,它在系统文件中以文件名的形式存在,行为和上述介绍的无名管道pipe类似。

有名管道可由两种方式创建:命令行方式mknod系统调用和函数mkfifo。下面的两种途径都在当前目录下生成了一个名为myfifo的有名管道:

方式一:mkfifo(“my_fifo”,”rw”);

方式二:mknod my_fifo p ,my_fifo 是指文件名

在程序中可以通过以下两个不同的函数进行调用:

#include <sys/types.h>
#include <sys/stat.h>
int mkfifo(const char *pathname, mode_t mode);
int mknod(const char *pathname, mode_t mode | S_FIFO, (dev_t)0);  

生成了有名管道后,就可以使用一般的文件I/O函数如open、close、read、write等来对它进行操作。

有名管道创建示例代码:

/*创建有名管道*/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>

int main()
{
    int res = mkfifo("tmp/my_fifo", 0777);
    if (res == 0)
        printf("FIFO created\n");
    exit(EXIT_SUCCESS);
}

与通过pipe调用创建管道不同,FIFO是以命名文件的形式存在,而不是打开的文件描述符,所以在对它进行读写操作之前必须先打开它。FIFO也用open和close函数进行打开和关闭。传递给open函数的是FIFO的路径名,而不是一个一般下的文件。

下面即是一个简单的例子,假设我们已经创建了一个名为my_fifo的有名管道。

【待补充 】

2.2.2 消息队列

消息队列就是一个消息的链表,可以把消息看作一个记录,具有特定的格式以及特定的优先级。一个有写权限的进程按照一定的规则对消息队列进行信息添加,对消息队列有读权限的进程则可以从消息队列中读走消息,从而实现进程间的通信。。

代码版本1:

涉及函数:

1)创建新消息队列或者取得已经存在的消息队列

int msgget(key_t key, int msgflg);

参数:

key:可以认为是一个端口号,是用来命名某个特定的消息队列,进行消息队列标识。

msgflg:

IPC_CREAT值,若没有该队列,则创建一个并返回新标识符;若已存在,则返回原标识符。

IPC_EXCL值,若没有该队列,则返回-1;若已存在,则返回0。

2)向队列读/写消息

原型:

int msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

参数:

msqid:消息队列的标识码

msgp:指向消息缓冲区的指针,此位置用来暂时存储发送和接收的消息,是一个用户可定义的通用结构,形态如下:

         struct msgstru
         { long mtype; /* 消息类型,必须 > 0 */
           char mtext[1024]; /* 消息文本 */
         };

msgsz:消息的大小。

msgtyp:从消息队列内读取的消息形态。如果值为零,则表示消息队列中的所有消息都会被读取。

msgflg:用以控制当队列中没有对应类型的消息可以接收时的处理逻辑。对于msgsnd函数,msgflg控制着当前消息队列满或消息队列到达系统范围的限制时将要发生的事情。如果msgflg设置为IPC_NOWAIT,则在msgsnd()执行时若是消息队列已满,则msgsnd()将不会阻塞,不会发送信息,立即返回-1。如果执行的是msgrcv(),则在消息队列呈空时,不做等待马上返回-1,并设定错误码为ENOMSG。当msgflg为0时,msgsnd()及msgrcv()在队列呈满或呈空的情形时,采取阻塞等待的处理模式。此时对于发送进程而已发送进程将挂起以等待队列中腾出可用的空间;对于接收进程而言,该进程将会挂起以等待一条相应类型的信息到达。

3)设置消息队列属性

原型:int msgctl ( int msgqid, int cmd, struct msqid_ds *buf );

参数:msgctl 系统调用对 msgqid 标识的消息队列执行 cmd 操作,系统定义了 3 种 cmd 操作: IPC_STAT , IPC_SET , IPC_RMID

IPC_STAT : 该命令用来获取消息队列对应的 msqid_ds 数据结构,并将其保存到 buf 指定的地址空间。

IPC_SET : 该命令用来设置消息队列的属性,要设置的属性存储在buf中。

IPC_RMID : 从内核中删除 msqid 标识的消息队列。

receive代码:

/*receive.cpp */
#include<iostream>
#include <fstream>
#include <string>
#include <memory.h>
#include<stdio.h>
#include<map>
#include<vector>
#include<algorithm>
#include <sys/types.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>

using namespace std;
#define MSGKEY 1024
#define ChildNum 5
struct msgstru
{
    long msgtype;
    char msgtext[2048];
};

/*子进程,监听消息队列*/
void childproc()
{
    struct msgstru msgs;
    int msgid,ret_value;
    char str[512];
    /* First, we set up the message queue. */
    // msgid = msgget((key_t)MSGKEY, 0666 | IPC_CREAT);//该键值则唯一对应一个消息队列
    while(1)
    {
        msgid = msgget(MSGKEY,IPC_EXCL );/*检查消息队列是否存在 */
        if(msgid < 0){
            printf("msq not existed! errno=%d [%s]\n",errno,strerror(errno));
            exit(EXIT_FAILURE);
        }
        /*接收消息队列*/
        ret_value = msgrcv(msgid,&msgs,sizeof(struct msgstru),0,0);
        int len = strlen(msgs.msgtext);
        // std::cout<<len<<std::endl;
        msgs.msgtext[len] = ‘\0‘;
        // std::cout<<"pid="<<getpid()<<","<<msgs.msgtext<<std::endl;
        if(ret_value == -1)
        {
            fprintf(stderr, "msgrcv failed with error: %d\n", errno);//消息队列中的信息被取完??
            exit(EXIT_FAILURE);//消息队列为空的时候,就跳出。也可以设计成,消息队列为空时,不跳出,而是等待。
        }
        else
        {
            printf("pid=%d,data=%s\n",getpid(),msgs.msgtext);
        }
        if (strncmp(msgs.msgtext, "end", 3) == 0)
        {
            exit(EXIT_SUCCESS);//换成break的效果呢???是不一样的啊
        }
        //因为在send的时候,只send了一个end,当该标志信息被读取之后,其他的进程自然是读取不到信息的,

    }
    return;
}

int main()
{
    int i,cpid;

    /* create 5 child process */
    for (i=0;i<ChildNum;i++){
        cpid = fork();
        if (cpid < 0)
            printf("fork failed\n");
        else if (cpid ==0) /*child process*/
            childproc();
    }
    int status =0;
    int mpid =0;
    // std::cout<<"father pid="<<getpid()<<std::endl;
    for(int i=0;i<ChildNum;i++)
    {
        mpid = wait(&status);
        printf("pid[%d] is exit with status[%d]\n",mpid,status);
    }
    return 1;
}

send代码:

/*send.c*/
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>

using namespace std;
#define MSGKEY 1024

struct msgstru
{
   long msgtype;
   char msgtext[2048];
};

main()
{
    struct msgstru msgs;
    int msg_type;
    char str[256];
    int ret_value;
    int msqid;

    msqid=msgget(MSGKEY,IPC_EXCL );  /*检查消息队列是否存在*/
    if(msqid < 0){
        msqid = msgget(MSGKEY,IPC_CREAT|0666);/*创建消息队列*/
        if(msqid <0){
            printf("failed to create msq | errno=%d [%s]\n",errno,strerror(errno));
            exit(-1);
        }
    }
    ifstream m_InStream("subdata.txt");
    string oneline;
    int running=1;
    // while(running)//getline(m_InStream, oneline,‘\n‘)
    int linenun=0;
    while (running)
    {
        if(!getline(m_InStream, oneline,‘\n‘))
        {
            oneline = "end";
            running = 0;
        }
        linenun++;
        // printf("input message type[0=end process]:");
        // scanf("%d",&msg_type);
        // if (msg_type == 0)
            // break;
        // printf("input message to be sent:");
        // scanf ("%s",str);
        strncpy(str,oneline.c_str(),256);
        msgs.msgtype = linenun;//这里只是顺便记录下,该字段的值,可以写
        strcpy(msgs.msgtext, str);
        /* 发送消息队列 */
        std::cout<<"pid="<<getpid()<<","<<msgs.msgtext<<std::endl;
        ret_value = msgsnd(msqid,&msgs,sizeof(struct msgstru),0);//消息队列标识符,准备发现信息的指针,信息的长度,控制标志位
        sleep(1);
        if ( ret_value < 0 ) {
            printf("msgsnd() write msg failed,errno=%d[%s]\n",errno,strerror(errno));
            exit(-1);
        }
    }
    msgctl(msqid,IPC_RMID,0); //删除消息队列
    //如果这行代码注释掉的话,则receive进程不会在接收一次send发出的任务之后,就全部子进程全部退出,而是send发送一次(即启动一次send程序)退出一个进程,这里启动了5个进程,所以,需要启动5次send程序,receive程序才会退出(每次退出一个进程)。
}

运行结果如下:

send的结果:

receive的结果:

对于receive之后的结果,由于在getline的时候,在最后一行之后,send程序中往消息队列中写入的是一个end字符,而在receive方,只有其中一个进程获取到end字符,然后,正常退出该进程,而其他的进程,则是由于消息队列中信息为空,获取消息队列失败而退出的。所以,在结果图中可以看到有4个进程的消息队列获取结果失败,43提示符,从而退出进程,256的提示符。

查看消息队列的命令行:

查看全部ipc对象信息:

#ipcs -a

查看消息队列信息

#ipcs -q

查看共享内存信息

#ipcs -m

查看信号量信息

#ipcs -s

删除IPC对象的ipcrm

ipcrm -[smq] ID 或者ipcrm -[SMQ] Key

-q -Q删除消息队列信息 例如ipcrm -q 98307

-m -M删除共享内存信息

-s -S删除信号量信息

如果把两个程序合并到一起,写成一个程序呢?

至于共享内存,信号量和套接字在下文再继续介绍。

时间: 2024-10-09 11:07:31

多进程编程之进程间通信-管道和消息队列的相关文章

Linux进程间通信:管道,信号量,消息队列,信号,共享内存,套接字

Linux下的进程通信手段基本上是从UNIX平台上的进程通信手段继承而来的.而对UNIX发展做出重大贡献的两大主力AT&T的贝尔实验室及BSD(加州大学伯克利分校的伯克利软件发布中心)在进程间的通信方面的侧重点有所不同.前者是对UNIX早期的进程间通信手段进行了系统的改进和扩充,形成了“system V IPC”,其通信进程主要局限在单个计算机内:后者则跳过了该限制,形成了基于套接口(socket)的进程间通信机制.而Linux则把两者的优势都继承了下来 linux进程间通信(IPC)有几种方式

Linux进程间通信--进程,信号,管道,消息队列,信号量,共享内存

Linux进程间通信--进程,信号,管道,消息队列,信号量,共享内存 参考:<linux编程从入门到精通>,<Linux C程序设计大全>,<unix环境高级编程> 参考:C和指针学习 说明:本文非常的长,也是为了便于查找和比较,所以放在一起了 Linux 传统的进程间通信有很多,如各类管道.消息队列.内存共享.信号量等等.但它们都无法介于内核态与用户态使用,原因如表 通信方法 无法介于内核态与用户态的原因 管道(不包括命名管道) 局限于父子进程间的通信. 消息队列 在

Linux 进程间通信(posix消息队列 简单)实例

Linux 进程间通信(posix消息队列 简单)实例 详情见: http://www.linuxidc.com/Linux/2011-10/44828.htm 编译: gcc -o consumer consumer.c -lrt gcc -o producer producer.c -lrt /* * * Filename: producer.c * * Description: 生产者进程 * * Version: 1.0 * Created: 09/30/2011 04:52:23 PM

【网络编程基础】Linux下进程通信方式(共享内存,管道,消息队列,Socket)

在网络课程中,有讲到Socket编程,对于tcp讲解的环节,为了加深理解,自己写了Linux下进程Socket通信,在学习的过程中,又接触到了其它的几种方式.记录一下. 管道通信(匿名,有名) 管道通信,在一个进程之中,只能单一的对其写或者是读,而不可以及执行写操作又执行读操作.这一点,我们可以将其想象成我们的水管,分别连着不同的两端,在有水流的时候,一端只能进行输入,另一端只能进行输出,而不可以同时输入和输出. 管道又分为有名管道和匿名管道,两者的区别在于对于匿名管道,其只能在具有亲缘关系的父

进程间通信(IPC)之消息队列

★IPC方法包括管道(PIPE).消息队列(Message_Queue).旗语.共用内存(ShareMemory)以及套接字(Socket).进 程间通信主要包括了管道.系统IPC(包括了消息队列.信号以及共享存储).套接字(SOCKET).此文将详细叙述消息队列的相 关内容. ★产生原因: 所谓消息队列,其实就是消息(数据)传输过程中保存的容器.既然有了管道通信的方式,何必又有消息队列呢? 因为根据管道的特性,我们知道其在一定程度上存在或多或少的局限性,首先匿名管道以及命名管道是随进程的,进

进程间通信 System V 消息队列

1.msgget (key_t ket,int flag) ; //创建一个新的消息队列或者访问一个已存在的消息队列 2.msgsnd(int msid, const void *ptr ,size_t length ,int flag ) // 发送 3.msgrcv() //读 4.msgctl(int msid , int cmd ,struct  msqid_ds *buff )//  cmd 提供删除,设置,返回当前 tips : 1.客户端服务端例子 服务端创建两个消息队列,A,B,

进程间通信IPC:消息队列,信号量,共享内存

2015.3.4星期三 阴天 进程间通信:IPC 文件对象:记录文件描述符,文件开关等 IPC标示符:系统全局的流水号两个进程要通信,打开的是唯一的对象进行通讯,通过key操作 XSI IPC:消息队列,信号量,共享内存. ipcs 查看ip对象共享内存,信号量,消息队列等信息ipcrm 删除一个IP对象 Linux为用户提供了完善的,强大的网络功能完善的内置网络:其他操作系统不包含如此紧密的和内核结合在一起的网络部分 共享内存标示符的获取有两种方法:ftok(pathname,id)另一个是K

管道,消息队列,共享内存!

2015.1.26星期一,阴天 linux中使用的较多的进程通信方式主要有一下几种:1.管道(Pipe):管道可用于具有亲缘关系进程间的通信,有名管道,除了具有管道所具有功能外,它 还允许无亲缘关系进程的通信2.信号(signal):信号是在软件层次上对中断机制的一种模拟,它是比较复杂的通信方式,用于通知进程 有某事发生,一个进程收到一个信号与处理器收到一个中断请求效果上可以说是一样的3.消息队列:(Messge Queue):消息队列是消息的连接表,包括Posix消息队列SystemV消息队列

进程间通信六(消息队列)

一.消息队列的特点 1.消息队列是消息的链表,具有特定的格式,存放在内存中并由消息队列标识符标识.    2.消息队列允许一个或多个进程向它写入与读取消息.    3.管道和命名管道都是通信数据都是先进先出的原则.    4.消息队列可以实现消息的随机查询,消息不一定要以先进先出的次序读取,也可以按消息的类型读取.比FIFO更有优势.     目前主要有两种类型的消息队列:POSIX消息队列以及系统V消息队列,系统V消息队列目前被大量使用.系统V消息队列是随内核持续的,只有在内核重起或者人工删除