Linux程序设计学习笔记----System V进程通信之消息队列

一个或多个进程可向消息队列写入消息,而一个或多个进程可从消息队列中读取消息,这种进程间通讯机制通常使用在客户/服务器模型中,客户向服务器发送请求消息,服务器读取消息并执行相应请求。在许多微内核结构的操作系统中,内核和各组件之间的基本通讯方式就是消息队列。例如,在 MINIX 操作系统中,内核、I/O 任务、服务器进程和用户进程之间就是通过消息队列实现通讯的。

Linux中的消息可以被描述成在内核地址空间的一个内部链表,每一个消息队列由一个IPC的标识号唯一的标识。Linux 为系统中所有的消息队列维护一个 msgque 链表,该链表中的每个指针指向一个 msgid_ds 结构,该结构完整描述一个消息队列。

System V消息队列是Open Group定义的XSI,不属于POSIX标准。System V IPC的历史相对很早,在上个世70年代后期有贝尔实验室的分支机构开发,80年代加入System V的系统内核中,后来商用UNIX系统基本都加入了System V IPC的功能。

System V消息队列相对于POSIX消息队列的区别主要是:

  • POSIX消息队列的读操作总是返回消息队列中优先级最高的最早消息,而对于System V消息队列可以返回任意指定优先级(通过消息类型)的消息。
  • 当向一个空消息队列中写入一个消息时,POSIX消息队列允许产生一个信号或启动一个线程,System V消息队列不提供类似的机制。

数据结构与基本属性

(1)消息缓冲区(msgbuf)

我们在这里要介绍的第一个数据结构是msgbuf结构,可以把这个特殊的数据结构看成一个存放消息数据的模板,它在include/linux/msg.h中声明,描述如下:

/* msgsnd 和msgrcv 系统调用使用的消息缓冲区*/

struct msgbuf {

   long mtype;        /* 消息的类型,必须为正数 */

   char mtext[1];      /* 消息正文 */

};

注意:对于消息数据元素(mtext),不要受其描述的限制。实际上,这个域(mtext)不仅能保存字符数组,而且能保存任何形式的任何数据。这个域本身是任意的,因为这个结构本身可以由应用程序员重新定义:

struct my_msgbuf {

        long    mtype;         /* 消息类型 */

       long   request_id;    /* 请求识别号 */

       struct  client info;    /* 客户消息结构 */

};

我们看到,消息的类型还是和前面一样,但是结构的剩余部分由两个其它的元素代替,而且有一个是结构。这就是消息队列的优美之处,内核根本不管传送的是什么样的数据,任何信息都可以传送。

但是,消息的长度还是有限制的,在Linux中,给定消息的最大长度在include/linux/msg.h中定义如下:

#define MSGMAX  8192    /* max size of message(bytes) */

消息总的长度不能超过8192字节,包括mtype域,它是4字节长。

(2)消息结构(msg)

内核把每一条消息存储在以msg结构为框架的队列中,它在include/ linux/msg.h中定义如下:

struct msg {

   struct msg *msg_next;  /* 队列上的下一条消息 */

   long  msg_type;         /*消息类型*/

   char *msg_spot;        /* 消息正文的地址 */

   short msg_ts;          /* 消息正文的大小 */

};

注意:msg_next是指向下一条消息的指针,它们在内核地址空间形成一个单链表。

(3)消息队列结构(msgid_ds)

当在系统中创建每一个消息队列时,内核创建、存储及维护这个结构的一个实例。

<bits/msq.h>

struct msqid_ds
{
  struct ipc_perm msg_perm;        /*IPC对象的属性信息和访问权限 */
  __time_t msg_stime;              /* time of last msgsnd command */
  __time_t msg_rtime;              /* time of last msgrcv command */
  __time_t msg_ctime;              /* time of last change */
  unsigned long int __msg_cbytes;  /* 当前队列中消息的字节数 */
  msgqnum_t msg_qnum;              /* 当前队列中消息的个数 */
  msglen_t msg_qbytes;             /* 队列允许存放的最大字节数 */
  __pid_t msg_lspid;               /* pid of last msgsnd() */
  __pid_t msg_lrpid;               /* pid of last msgrcv() */

//下面是保留字段
#if __WORDSIZE == 32
  unsigned long int __unused1;
  unsigned long int __unused2;
  unsigned long int __unused3;
#endif
  unsigned long int __unused4;
  unsigned long int __unused5;
};

System V消息队列的创建和打开

System V消息队列的创建和使用会使用下面的函数接口:

#include <sys/msg.h>
int msgget(key_t key, int oflg);
                //成功返回非负消息队列描述符,失败返回-1

key:消息队列的键,用来创建一个消息队列。System IPC都有一个key,作为IPC的外部标识符,创建成功后返回的描述符作为IPC的内部标识符使用。key的主要目的就是使不同进程在同一IPC汇合。key具体说可以有三种方式生成:

  • 不同的进程约定好的一个值;
  • 通过相同的路径名和项目ID,调用ftok()函数,生成一个键;
  • 还可以设置为IPC_PRIVATE,这样就会创建一个新的,唯一的IPC对象;然后将返回的描述符通过某种方式传递给其他进程;

oflg:指定创建或打开消息队列的标志和读写权限(ipc_perm中的mode成员)。我们知道System V IPC定义了自己的操作标志和权限设置标志,而且都是通过该参数传递,这和open函数存在差别,open函数第三个参数mode用于传递文件的权限标志。System V IPC的操作标志包含:IPC_CREAT,IPC_EXCL,权限设置标志如下图:

下面是创建消息队列的测试代码:

#include <iostream>
#include <cstring>
#include <errno.h>

#include <unistd.h>
#include <fcntl.h>
#include <sys/msg.h>

using namespace std;

#define  PATH_NAME "/tmp/anonymQueue"

int main(int argc, char **argv)
{
    key_t key;
    int fd;

    if ((fd = open(PATH_NAME, O_CREAT, 0666)) < 0)
    {
        cout<<"open file "<<PATH_NAME<<"failed.";
        cout<<strerror(errno)<<endl;
        return -1;
    }
    close(fd);

    key = ftok(PATH_NAME, 0);
    int msgID;

    if ((msgID = msgget(key, IPC_CREAT | 0666)) < 0)
    {
        cout<<"open message queue failed...";
        cout<<strerror(errno)<<endl;
        return -1;
    }

    cout<<"key:0x"<<hex<<key<<endl;
    cout<<"descriptor id:"<<dec<<msgID<<endl;
}

结果:

实现System V IPC的任何系统都提供两个特殊的程序ipcs和ipcrm。ipcs输出IPC的各种信息,ipcrm则用于删除各种System V IPC。由于System V IPC不属于POSIX标准,所以这两个命令也未被标准化。

System V消息队列的使用

System V消息队列的写入消息使用下面的函数接口:

#include <sys/msg.h>
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
                        //成功返回0,失败返回-1

msqid:消息队列的描述符;

msgp:指向存放消息的缓冲区,该缓冲区中包含消息类型消息体两部分内容。该缓冲区的结构是由用户定义的,在<sys/msg.h>中有关于该缓冲区结构定义的参版考模

struct msgbuf
{
    long int mtype;             /* type of received/sent message */
    char mtext[1];              /* text of the message */
};

缓冲区的开头是一个long型的消息类型,该消息类型必须是一个非负数。紧跟在消息类型后面的是消息体部分(如果消息长度大于0),参考模版中定义的mtext只是说明消息体,该部分可以自定义长度。我们自己的应用都会定义特定的消息结构。

msgsz:缓冲区中消息体部分的长度;

msgflg:设置操作标志。可以为0,IPC_NOWAIT;用于在消息队列中没有可用的空间时,调用线程采用何种操作方式

标志为IPC_NOWAIT,表示msgsnd操作以非阻塞的方式进行,在消息队列中没有可用的空间时,msgsnd操作会立刻返回。并指定EAGAIN错误;

标志为0,表示msgsnd操作以阻塞的方式进行,这种情况下在消息队列中没有可用的空间时调用线程会被阻塞,直到下面的情况发生:

  • 等到有存放消息的空间;
  • 消息队列从系统中删除,这种情况下回返回一个EIDRM错误;
  • 调用线程被某个捕捉到的信号中断,这种情况下返回一个EINTR错误;

System V消息队列的读取消息使用下面的函数接口:

#include <sys/msg.h>
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
                      //成功返回接收到的消息的消息体的字节数,失败返回-1

msqid:消息队列的描述符;

msgp:指向待存放消息的缓冲区,该缓冲区中将会存放接收到的消息的消息类型消息体两部分内容。该缓冲区的结构是由用户定义的,和msgsnd相对应。

msgsz:缓冲区中能存放消息体部分的最大长度,这也是该函数能返回的最大数据量;该字段的大小应该是sizeof(msg buffer) - sizeof(long)

msgtyp:希望从消息队列中获取的消息类型。

  • msgtyp为0,返回消息队列中的第一个消息;
  • msgtyp > 0,返回该消息类型的第一个消息;
  • msgtyp < 0,返回小于或等于msgtyp绝对值的消息中类型最小的第一个消息;

msgflg:设置操作标志。可以为0,IPC_NOWAIT,MSG_NOERROR;用于在消息队列中没有可用的指定消息时,调用线程采用何种操作方式

标志为IPC_NOWAIT,表示msgrcv操作以非阻塞的方式进行,在消息队列中没有可用的指定消息时,msgrcv操作会立刻返回,并设定errno为ENOMSG。

标志为0,表示msgrcv操作是阻塞操作,直到下面的情况发生:

  • 消息队列中有一个所请求的消息类型可以获取;
  • 消息队列从系统中删除,这种情况下回返回一个EIDRM错误;
  • 调用线程被某个捕捉到的信号中断,这种情况下返回一个EINTR错误;

标志为MSG_NOERROR,表示接收到的消息的消息体的长度大于msgsz长度时,msgrcv采取的操作。如果设置了该标志msgrcv在这种情况下回截断数据部分,而不返回错误,否则返回一个E2BIG错误。

下面是关于消息队列读写的测试代码:

#include <iostream>
#include <cstring>
#include <errno.h>

#include <unistd.h>
#include <fcntl.h>
#include <sys/msg.h>

using namespace std;

#define  PATH_NAME "/tmp/anonymQueue"

key_t CreateKey(const char *pathName)
{
    int fd;

    if ((fd = open(PATH_NAME, O_CREAT, 0666)) < 0)
    {
        cout<<"open file "<<PATH_NAME<<"failed.";
        cout<<strerror(errno)<<endl;
        return -1;
    }

    close(fd);

    return ftok(PATH_NAME, 0);
}

int main(int argc, char **argv)
{
    key_t key;
    key = CreateKey(PATH_NAME);

    int msgID;
    if ((msgID = msgget(key, IPC_CREAT | 0666)) < 0)
    {
        cout<<"open message queue failed...";
        cout<<strerror(errno)<<endl;

        return -1;
    }

    if (fork() == 0)
    {
        char msg[] = "cute yuki...";
        char *msgBuf = new char[sizeof(long) + sizeof(msg)];
        long mtype = 1;

        memmove(msgBuf, &mtype, sizeof(mtype));
        memmove(msgBuf + sizeof(mtype), msg, sizeof(msg));

        for (int i = 1; i <= 5; ++i)
        {
            if (msgsnd(msgID, msgBuf, sizeof(msg), 0) < 0)
            {
                cout<<"send message "<<i<<"failed...";
                cout<<strerror(errno)<<endl;
                continue;
            }

            cout<<"child: send message "<<i<<" success..."<<endl;
            sleep(1);
        }

        exit(0);
    }

    char msgBuf[256];
    long mtype;
    int recvLen;

    for (int i = 1; i <= 5; ++i)
    {
        recvLen = msgrcv(msgID, msgBuf, 256 - sizeof(long), 0, 0);
        if (recvLen < 0)
        {
            cout<<"receive message failed...";
            cout<<strerror(errno)<<endl;
            continue;
        }

        memmove(&mtype, msgBuf, sizeof(long));

        cout<<"parent receive a message:"<<endl;
        cout<<"message type:"<<mtype<<endl;
        cout<<"message body:"<<msgBuf + sizeof(long)<<endl;
    }
    return 0;
}

System V消息队列的控制操作

对System V消息队列的删除,属性的设置和获取等控制操作要使用下面的函数接口:

[cpp] view plaincopyprint?

  1. #include <sys/msg.h>
  2. int msgctl(int msqid, int cmd, struct msqid_ds *buf);
  3. //成功返回0,失败返回-1

msqid:消息队列的描述符;

cmd:控制操作的命令,SUS标准提供以下三个命令:

  • IPC_RMID,删除一个消息队列。执行该命令系统会立刻把该消息队列从内核中删除,该消息队列中的所有消息将会被丢弃。这和已经讨论过的POSIX消息队列有很大差别,POSIX消息队列通过调用mq_unlink来从内核中删除一个消息队列,但消息队列的真正析构会在最后一个mq_close结束后发生。
  • IPC_SET,根据buf的所指的值来设置消息队列msqid_ds结构中的msg_perm.uid,msg_perm.gid,msg_perm.mode,msg_qbytes四个成员。
  • IPC_STAT,通过buf返回当前消息队列的msqid_ds结构。
  • 在Linux下还有例如IPC_INFO,MSG_INFO等命令,具体可以参考Linux手册;

buf:指向msqid_ds结构的指针;

下面是测试代码:

[cpp] view plaincopyprint?

  1. #include <iostream>
  2. #include <cstring>
  3. #include <errno.h>
  4. #include <unistd.h>
  5. #include <fcntl.h>
  6. #include <sys/msg.h>
  7. using namespace std;
  8. #define  PATH_NAME "/tmp/anonymQueue"
  9. key_t CreateKey(const char *pathName)
  10. {
  11. int fd;
  12. if ((fd = open(PATH_NAME, O_CREAT, 0666)) < 0)
  13. {
  14. cout<<"open file "<<PATH_NAME<<"failed.";
  15. cout<<strerror(errno)<<endl;
  16. return -1;
  17. }
  18. close(fd);
  19. return ftok(PATH_NAME, 0);
  20. }
  21. int main(int argc, char **argv)
  22. {
  23. key_t key;
  24. key = CreateKey(PATH_NAME);
  25. int msgID;
  26. if ((msgID = msgget(key, IPC_CREAT | 0666)) < 0)
  27. {
  28. cout<<"open message queue failed...";
  29. cout<<strerror(errno)<<endl;
  30. return -1;
  31. }
  32. msqid_ds msgInfo;
  33. msgctl(msgID, IPC_STAT, &msgInfo);
  34. cout<<"msg_qbytes:"<<msgInfo.msg_qbytes<<endl;
  35. cout<<"msg_qnum:"<<msgInfo.msg_qnum<<endl;
  36. cout<<"msg_cbytes:"<<msgInfo.msg_cbytes<<endl;
  37. return 0;
  38. }

在Linux 2.6.18下的执行结果为:

[cpp] view plaincopyprint?

  1. msg_qbytes:65536
  2. msg_qnum:2
  3. msg_cbytes:26

关于消息队列中允许存放最大的字节数可以通过IPC_SET命令进行修改,该修改只能针对本消息队列生效。如下测试代码:

[cpp] view plaincopyprint?

  1. int main(int argc, char **argv)
  2. {
  3. key_t key;
  4. key = CreateKey(PATH_NAME);
  5. int msgID;
  6. if ((msgID = msgget(key, IPC_CREAT | 0666)) < 0)
  7. {
  8. cout<<"open message queue failed...";
  9. cout<<strerror(errno)<<endl;
  10. return -1;
  11. }
  12. msqid_ds msgInfo;
  13. msgctl(msgID, IPC_STAT, &msgInfo);
  14. cout<<"msg_qbytes:"<<msgInfo.msg_qbytes<<endl;
  15. cout<<"msg_qnum:"<<msgInfo.msg_qnum<<endl;
  16. cout<<"msg_cbytes:"<<msgInfo.msg_cbytes<<endl;
  17. msgInfo.msg_qbytes = 6553600;
  18. if (msgctl(msgID, IPC_SET, &msgInfo) < 0)
  19. {
  20. cout<<"set message queue failed...";
  21. cout<<strerror(errno)<<endl;
  22. return -1;
  23. }
  24. msgctl(msgID, IPC_STAT, &msgInfo);
  25. cout<<"msg_qbytes:"<<msgInfo.msg_qbytes<<endl;
  26. cout<<"msg_qnum:"<<msgInfo.msg_qnum<<endl;
  27. cout<<"msg_cbytes:"<<msgInfo.msg_cbytes<<endl;
  28. return 0;
  29. }

在Linux 2.6.18下的执行结果为:

[cpp] view plaincopyprint?

  1. msg_qbytes:65536
  2. msg_qnum:0
  3. msg_cbytes:0
  4. msg_qbytes:6553600
  5. msg_qnum:0
  6. msg_cbytes:0

System V消息队列的内核限制

对System V IPC,系统往往会存在一些限制,对于消息队列,在Linux2.6.18中,系统内核存在以下限制:

[cpp] view plaincopyprint?

  1. [[email protected] program]# sysctl -a |grep msg
  2. kernel.msgmnb = 65536 //一个消息队列上允许的最大字节数
  3. kernel.msgmni = 16 //系统范围内允许存在的最大消息队列数
  4. kernel.msgmax = 65536     //每个消息的最大字节数

对于System V消息队列一般内核还有一个限制:系统范围内的最大消息数,在Linux下这个限制由msgmnb*msgmni决定。

上面已经说过可以通过IPC_SET来设置使用中的消息队列的最大字节数。但是要在系统范围内对内核限制进行修改,在Linux下面可以通过修改/etc/sysctl.conf内核参数配置文件,然后配合sysctl命令来对内核参数进行设置。例如下面示例:

[cpp] view plaincopyprint?

  1. [[email protected] program]#echo "kernel.msgmnb = 6553600" >>/etc/sysctl.conf
  2. [[email protected] program]#echo "kernel.msgmni = 100" >>/etc/sysctl.conf
  3. [[email protected] program]#echo "kernel.msgmax = 6553600" >>/etc/sysctl.conf
  4. [[email protected] program]#sysctl -p
  5. [[email protected] program]# sysctl -a |grep msg
  6. kernel.msgmnb = 6553600
  7. kernel.msgmni = 100
  8. kernel.msgmax = 6553600

Linux程序设计学习笔记----System V进程通信之消息队列

时间: 2024-10-08 13:38:59

Linux程序设计学习笔记----System V进程通信之消息队列的相关文章

Linux程序设计学习笔记----System V进程通信(共享内存)

转载请注明出处:http://blog.csdn.net/suool/article/details/38515863 共享内存可以被描述成内存一个区域(段)的映射,这个区域可以被更多的进程所共享.这是IPC机制中最快的一种形式,因为它不需要中间环节,而是把信息直接从一个内存段映射到调用进程的地址空间. 一个段可以直接由一个进程创建,随后,可以有任意多的进程对其读和写.但是,一旦内存被共享之后,对共享内存的访问同步需要由其他 IPC 机制,例如信号量来实现.象所有的System V IPC 对象

Linux程序设计学习笔记----System V进程间通信(信号量)

关于System V Unix System V,是Unix操作系统众多版本中的一支.它最初由AT&T开发,在1983年第一次发布,因此也被称为AT&T System V.一共发行了4个System V的主要版本:版本1.2.3和4.System V Release 4,或者称为SVR4,是最成功的版本,成为一些UNIX共同特性的源头,例如"SysV 初始化脚本"(/etc/init.d),用来控制系统启动和关闭,System V Interface Definitio

Linux 程序设计学习笔记----进程管理与程序开发(下)

转载请注明出处:http://blog.csdn.net/suool/article/details/38419983,谢谢! 进程管理及其控制 创建进程 fork()函数 函数说明具体参见:http://pubs.opengroup.org/onlinepubs/009695399/functions/fork.html 返回值:Upon successful completion, fork() shall return 0 to the child process and shall re

Linux 程序设计学习笔记----进程管理与程序开发(上)

转载请注明出处,http://blog.csdn.net/suool/article/details/38406211,谢谢! Linux进程存储结构和进程结构 可执行文件结构 如下图: 可以看出,此ELF可执行文件存储时(没有调入内存)分为代码区.数据区和未出花数据区三部分. 代码区:存放cpu的执行的机器指令. 数据区:包含程序中的已经初始化的静态变量,以及已经初始化的全局变量. 未初始化数据区:存入的是未初始化的全局变量和未初始化的静态变量. 现在在上面的程序代码中增加一个int的静态变量

Linux程序设计学习笔记----进程间通信——管道

转载请注明出处: http://blog.csdn.net/suool/article/details/38444149, 谢谢! 进程通信概述 在Linux系统中,进程是一个独立的资源管理单元,但是独立而不孤立,他们需要之间的通信,因此便需要一个进程间数据传递.异步.同步的机制,这个机制显然需要由OS来完成管理和维护.如下: 1.同一主机进程间数据交互机制:无名管道(PIPE),有名管道(FIFO),消息队列(Message Queue)和共享内存(Share Memory).无名管道多用于亲

Linux 程序设计学习笔记----终端及串口编程及实例应用

转载请注明出处,http://blog.csdn.net/suool/article/details/38385355. 部分内容类源于网络. 终端属性详解及设置 属性 为了控制终端正常工作,终端的属性包括输入属性.输出属性.控制属性.本地属性.线路规程属性以及控制字符. 其在系统源代码的termios.h中定义(具体的说明文档http://pubs.opengroup.org/onlinepubs/7908799/xsh/termios.h.html),其结构体成员主要是 Thetermios

Linux程序设计学习笔记----网络通信编程API及其示例应用

转载请注明出处, http://blog.csdn.net/suool/article/details/38702855. BSD Socket 网络通信编程 BSD TCP 通信编程流程 图为面向连接的Socket通信的双方执行函数流程.使用TCP协议的通信双方实现数据通信的基本流程如下 建立连接的步骤 1.首先服务器端需要以下工作: (1)调用socket()函数,建立Socket对象,指定通信协议. (2)调用bind()函数,将创建的Socket对象与当前主机的某一个IP地址和TCP端口

Linux 程序设计学习笔记----命令行参数处理

转载请注明出处.http://blog.csdn.net/suool/article/details/38089001 问题引入----命令行参数及解析 在使用linux时,与windows最大的不同应该就是经常使用命令行来解决大多数问题.比如下面这样的: 而显然我们知道C语言程序的入口是mian函数,即是从main函数开始执行,而main函数的原型是: int main( int argc, char *argv[] ); int main( int argc, char **argv );

Linux程序设计学习笔记----Socket网络编程基础之TCP/IP协议簇

转载请注明出处: ,谢谢! 内容提要 本节主要学习网络通信基础,主要涉及的内容是: TCP/IP协议簇基础:两个模型 IPv4协议基础:IP地址分类与表示,子网掩码等 IP地址转换:点分十进制\二进制 TCP/IP协议簇基础 OSI模型 我们知道计算机网络之中,有各种各样的设备,那么如何实现这些设备的通信呢? 显然是通过标准的通讯协议,但是,整个网络连接的过程相当复杂,包括硬件.软件数据封包与应用程序的互相链接等等,如果想要写一支将联网全部功能都串连在一块的程序,那么当某个小环节出现问题时,整只