[转]Linux进程通信之POSIX消息队列

进程间的消息队列可以用这个实现,学习了下。

http://blog.csdn.net/anonymalias/article/details/9799645?utm_source=tuicool&utm_medium=referral

消息队列是Linux IPC中很常用的一种通信方式,它通常用来在不同进程间发送特定格式的消息数据。

消息队列和之前讨论过的管道和FIFO有很大的区别,主要有以下两点:

  • 一个进程向消息队列写入消息之前,并不需要某个进程在该队列上等待该消息的到达,而管道和FIFO是相反的,进程向其中写消息时,管道和FIFO必需已经打开来读,那么内核会产生SIGPIPE信号(感谢shanshan_fangfang的指正)。
  • IPC的持续性不同。管道和FIFO是随进程的持续性,当管道和FIFO最后一次关闭发生时,仍在管道和FIFO中的数据会被丢弃。消息队列是随内核的持续性,即一个进程向消息队列写入消息后,然后终止,另外一个进程可以在以后某个时刻打开该队列读取消息。只要内核没有重新自举,消息队列没有被删除。

消息队列中的每条消息通常具有以下属性:

  • 一个表示优先级的整数;
  • 消息的数据部分的长度;
  • 消息数据本身;

POSIX消息队列的一个可能的设计是一个如下图所示的消息链表,链表头部有消息队列的属性信息。

图1消息队列的可能布局

1 POSIX消息队列的创建和关闭

POSIX消息队列的创建,关闭和删除用到以下三个函数接口:

[cpp] view plaincopyprint?

  1. #include <mqueue.h>
  2. mqd_t mq_open(const char *name, int oflag, /* mode_t mode, struct mq_attr *attr */);
  3. //成功返回消息队列描述符,失败返回-1
  4. mqd_t mq_close(mqd_t mqdes);
  5. mqd_t mq_unlink(const char *name);
  6. //成功返回0,失败返回-1

mq_open用于打开或创建一个消息队列。

name:表示消息队列的名字,它符合POSIX IPC的名字规则。

oflag:表示打开的方式,和open函数的类似。有必须的选项:O_RDONLY,O_WRONLY,O_RDWR,还有可选的选项:O_NONBLOCK,O_CREAT,O_EXCL。

mode:是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时,才需要提供该参数。表示默认访问权限。可以参考open。

attr:也是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时才需要。该参数用于给新队列设定某些属性,如果是空指针,那么就采用默认属性。

mq_open返回值是mqd_t类型的值,被称为消息队列描述符。在Linux 2.6.18中该类型的定义为整型:

[cpp] view plaincopyprint?

  1. #include <bits/mqueue.h>
  2. typedef int mqd_t;

mq_close用于关闭一个消息队列,和文件的close类型,关闭后,消息队列并不从系统中删除。一个进程结束,会自动调用关闭打开着的消息队列。

mq_unlink用于删除一个消息队列。消息队列创建后只有通过调用该函数或者是内核自举才能进行删除。每个消息队列都有一个保存当前打开着描述符数的引用计数器,和文件一样,因此本函数能够实现类似于unlink函数删除一个文件的机制。

POSIX消息队列的名字所创建的真正路径名和具体的系统实现有关,关于具体POSIX IPC的名字规则可以参考《UNIX 网络编程 卷2:进程间通信》的P14。

经过测试,在Linux 2.6.18中,所创建的POSIX消息队列不会在文件系统中创建真正的路径名。且POSIX的名字只能以一个’/’开头,名字中不能包含其他的’/’。

2 POSIX消息队列的属性

POSIX标准规定消息队列属性mq_attr必须要含有以下四个内容:

[cpp] view plaincopyprint?

  1. long    mq_flags //消息队列的标志:0或O_NONBLOCK,用来表示是否阻塞
  2. long    mq_maxmsg  //消息队列的最大消息数
  3. long    mq_msgsize  //消息队列中每个消息的最大字节数
  4. long    mq_curmsgs  //消息队列中当前的消息数目

在Linux 2.6.18中mq_attr结构的定义如下:

[cpp] view plaincopyprint?

  1. #include <bits/mqueue.h>
  2. struct mq_attr
  3. {
  4. long int mq_flags;      /* Message queue flags.  */
  5. long int mq_maxmsg;   /* Maximum number of messages.  */
  6. long int mq_msgsize;   /* Maximum message size.  */
  7. long int mq_curmsgs;   /* Number of messages currently queued.  */
  8. long int __pad[4];
  9. };

POSIX消息队列的属性设置和获取可以通过下面两个函数实现:

[cpp] view plaincopyprint?

  1. #include <mqueue.h>
  2. mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);
  3. mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);
  4. //成功返回0,失败返回-1

mq_getattr用于获取当前消息队列的属性,mq_setattr用于设置当前消息队列的属性。其中mq_setattr中的oldattr用于保存修改前的消息队列的属性,可以为空。

mq_setattr可以设置的属性只有mq_flags,用来设置或清除消息队列的非阻塞标志。newattr结构的其他属性被忽略。mq_maxmsg和mq_msgsize属性只能在创建消息队列时通过mq_open来设置。mq_open只会设置该两个属性,忽略另外两个属性。mq_curmsgs属性只能被获取而不能被设置。

下面是测试代码:

[cpp] view plaincopyprint?

  1. #include <iostream>
  2. #include <cstring>
  3. #include <errno.h>
  4. #include <unistd.h>
  5. #include <fcntl.h>
  6. #include <mqueue.h>
  7. using namespace std;
  8. int main()
  9. {
  10. mqd_t mqID;
  11. mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT, 0666, NULL);
  12. if (mqID < 0)
  13. {
  14. cout<<"open message queue error..."<<strerror(errno)<<endl;
  15. return -1;
  16. }
  17. mq_attr mqAttr;
  18. if (mq_getattr(mqID, &mqAttr) < 0)
  19. {
  20. cout<<"get the message queue attribute error"<<endl;
  21. return -1;
  22. }
  23. cout<<"mq_flags:"<<mqAttr.mq_flags<<endl;
  24. cout<<"mq_maxmsg:"<<mqAttr.mq_maxmsg<<endl;
  25. cout<<"mq_msgsize:"<<mqAttr.mq_msgsize<<endl;
  26. cout<<"mq_curmsgs:"<<mqAttr.mq_curmsgs<<endl;
  27. }

在Linux 2.6.18中执行结果是:

[cpp] view plaincopyprint?

  1. mq_flags:0
  2. mq_maxmsg:10
  3. mq_msgsize:8192
  4. mq_curmsgs:0

3 POSIX消息队列的使用

POSIX消息队列可以通过以下两个函数来进行发送和接收消息:

[cpp] view plaincopyprint?

  1. #include <mqueue.h>
  2. mqd_t mq_send(mqd_t mqdes, const char *msg_ptr,
  3. size_t msg_len, unsigned msg_prio);
  4. //成功返回0,出错返回-1
  5. mqd_t mq_receive(mqd_t mqdes, char *msg_ptr,
  6. size_t msg_len, unsigned *msg_prio);
  7. //成功返回接收到消息的字节数,出错返回-1
  8. #ifdef __USE_XOPEN2K
  9. mqd_t mq_timedsend(mqd_t mqdes, const char *msg_ptr,
  10. size_t msg_len, unsigned msg_prio,
  11. const struct timespec *abs_timeout);
  12. mqd_t mq_timedreceive(mqd_t mqdes, char *msg_ptr,
  13. size_t msg_len, unsigned *msg_prio,
  14. const struct timespec *abs_timeout);
  15. #endif

mq_send向消息队列中写入一条消息,mq_receive从消息队列中读取一条消息。

mqdes:消息队列描述符;

msg_ptr:指向消息体缓冲区的指针;

msg_len:消息体的长度,其中mq_receive的该参数不能小于能写入队列中消息的最大大小,即一定要大于等于该队列的mq_attr结构中mq_msgsize的大小。如果mq_receive中的msg_len小于该值,就会返回EMSGSIZE错误。POXIS消息队列发送的消息长度可以为0。

msg_prio:消息的优先级;它是一个小于MQ_PRIO_MAX的数,数值越大,优先级越高。POSIX消息队列在调用mq_receive时总是返回队列中最高优先级的最早消息。如果消息不需要设定优先级,那么可以在mq_send是置msg_prio为0,mq_receive的msg_prio置为NULL。

还有两个XSI定义的扩展接口限时发送和接收消息的函数:mq_timedsend和mq_timedreceive函数。默认情况下mq_send和mq_receive是阻塞进行调用,可以通过mq_setattr来设置为O_NONBLOCK。

下面是消息队列使用的测试代码:

[cpp] view plaincopyprint?

  1. #include <iostream>
  2. #include <cstring>
  3. #include <errno.h>
  4. #include <unistd.h>
  5. #include <fcntl.h>
  6. #include <mqueue.h>
  7. using namespace std;
  8. int main()
  9. {
  10. mqd_t mqID;
  11. mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT | O_EXCL, 0666, NULL);
  12. if (mqID < 0)
  13. {
  14. if (errno == EEXIST)
  15. {
  16. mq_unlink("/anonymQueue");
  17. mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT, 0666, NULL);
  18. }
  19. else
  20. {
  21. cout<<"open message queue error..."<<strerror(errno)<<endl;
  22. return -1;
  23. }
  24. }
  25. if (fork() == 0)
  26. {
  27. mq_attr mqAttr;
  28. mq_getattr(mqID, &mqAttr);
  29. char *buf = new char[mqAttr.mq_msgsize];
  30. for (int i = 1; i <= 5; ++i)
  31. {
  32. if (mq_receive(mqID, buf, mqAttr.mq_msgsize, NULL) < 0)
  33. {
  34. cout<<"receive message  failed. ";
  35. cout<<"error info:"<<strerror(errno)<<endl;
  36. continue;
  37. }
  38. cout<<"receive message "<<i<<": "<<buf<<endl;
  39. }
  40. exit(0);
  41. }
  42. char msg[] = "yuki";
  43. for (int i = 1; i <= 5; ++i)
  44. {
  45. if (mq_send(mqID, msg, sizeof(msg), i) < 0)
  46. {
  47. cout<<"send message "<<i<<" failed. ";
  48. cout<<"error info:"<<strerror(errno)<<endl;
  49. }
  50. cout<<"send message "<<i<<" success. "<<endl;
  51. sleep(1);
  52. }
  53. }

在Linux 2.6.18下的执行结构如下:

[cpp] view plaincopyprint?

  1. send message 1 success.
  2. receive message 1: yuki
  3. send message 2 success.
  4. receive message 2: yuki
  5. send message 3 success.
  6. receive message 3: yuki
  7. send message 4 success.
  8. receive message 4: yuki
  9. send message 5 success.
  10. receive message 5: yuki

4 POSIX消息队列的限制

POSIX消息队列本身的限制就是mq_attr中的mq_maxmsg和mq_msgsize,分别用于限定消息队列中的最大消息数和每个消息的最大字节数。在前面已经说过了,这两个参数可以在调用mq_open创建一个消息队列的时候设定。当这个设定是受到系统内核限制的。

下面是在Linux 2.6.18下shell对启动进程的POSIX消息队列大小的限制:

[cpp] view plaincopyprint?

  1. # ulimit -a |grep message
  2. POSIX message queues     (bytes, -q) 819200

限制大小为800KB,该大小是整个消息队列的大小,不仅仅是最大消息数*消息的最大大小;还包括消息队列的额外开销。前面我们知道Linux 2.6.18下POSIX消息队列默认的最大消息数和消息的最大大小分别为:

[cpp] view plaincopyprint?

  1. mq_maxmsg = 10
  2. mq_msgsize = 8192

为了说明上面的限制大小包括消息队列的额外开销,下面是测试代码:

[cpp] view plaincopyprint?

  1. #include <iostream>
  2. #include <cstring>
  3. #include <errno.h>
  4. #include <unistd.h>
  5. #include <fcntl.h>
  6. #include <mqueue.h>
  7. using namespace std;
  8. int main(int argc, char **argv)
  9. {
  10. mqd_t mqID;
  11. mq_attr attr;
  12. attr.mq_maxmsg = atoi(argv[1]);
  13. attr.mq_msgsize = atoi(argv[2]);
  14. mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT | O_EXCL, 0666, &attr);
  15. if (mqID < 0)
  16. {
  17. if (errno == EEXIST)
  18. {
  19. mq_unlink("/anonymQueue");
  20. mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT, 0666, &attr);
  21. if(mqID < 0)
  22. {
  23. cout<<"open message queue error..."<<strerror(errno)<<endl;
  24. return -1;
  25. }
  26. }
  27. else
  28. {
  29. cout<<"open message queue error..."<<strerror(errno)<<endl;
  30. return -1;
  31. }
  32. }
  33. mq_attr mqAttr;
  34. if (mq_getattr(mqID, &mqAttr) < 0)
  35. {
  36. cout<<"get the message queue attribute error"<<endl;
  37. return -1;
  38. }
  39. cout<<"mq_flags:"<<mqAttr.mq_flags<<endl;
  40. cout<<"mq_maxmsg:"<<mqAttr.mq_maxmsg<<endl;
  41. cout<<"mq_msgsize:"<<mqAttr.mq_msgsize<<endl;
  42. cout<<"mq_curmsgs:"<<mqAttr.mq_curmsgs<<endl;
  43. }

下面进行创建消息队列时设置最大消息数和消息的最大大小进行测试:

[cpp] view plaincopyprint?

  1. [[email protected] program]# g++ -g test.cpp -lrt
  2. [[email protected] program]# ./a.out 10 81920
  3. open message queue error...Cannot allocate memory
  4. [[email protected] program]# ./a.out 10 80000
  5. open message queue error...Cannot allocate memory
  6. [[email protected] program]# ./a.out 10 70000
  7. open message queue error...Cannot allocate memory
  8. [[email protected] program]# ./a.out 10 60000
  9. mq_flags:0
  10. mq_maxmsg:10
  11. mq_msgsize:60000
  12. mq_curmsgs:0

从上面可以看出消息队列真正存放消息数据的大小是没有819200B的。可以通过修改该限制参数,来改变消息队列的所能容纳消息的数量。可以通过下面方式来修改限制,但这会在shell启动进程结束后失效,可以将设置写入开机启动的脚本中执行,例如.bashrc,rc.local。

[cpp] view plaincopyprint?

  1. [[email protected] ~]# ulimit -q 1024000000
  2. [[email protected] ~]# ulimit -a |grep message
  3. POSIX message queues     (bytes, -q) 1024000000

下面再次测试可以设置的消息队列的属性。

[cpp] view plaincopyprint?

  1. [[email protected] program]# ./a.out 10 81920
  2. mq_flags:0
  3. mq_maxmsg:10
  4. mq_msgsize:81920
  5. mq_curmsgs:0
  6. [[email protected] program]# ./a.out 10 819200
  7. mq_flags:0
  8. mq_maxmsg:10
  9. mq_msgsize:819200
  10. mq_curmsgs:0
  11. [[email protected] program]# ./a.out 1000 8192
  12. mq_flags:0
  13. mq_maxmsg:1000
  14. mq_msgsize:8192
  15. mq_curmsgs:0

POSIX消息队列在实现上还有另外两个限制:

MQ_OPEN_MAX:一个进程能同时打开的消息队列的最大数目,POSIX要求至少为8;

MQ_PRIO_MAX:消息的最大优先级,POSIX要求至少为32;

Aug 7, 2013 AM 08:53 @lab

时间: 2024-10-13 15:24:24

[转]Linux进程通信之POSIX消息队列的相关文章

Linux IPC实践(7) --Posix消息队列

1. 创建/获取一个消息队列 #include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ #include <mqueue.h> mqd_t mq_open(const char *name, int oflag); //专用于打开一个消息队列 mqd_t mq_open(const char *name, int oflag, mode_t mode

进程间通信(二)——Posix消息队列

1.概述 消息队列可认为是消息链表.有足够写权限的线程可以往队列中放置消息,有足够读权限的进程可以从队列中取走消息.每个消息是一个记录,由发送着赋予一个优先级. 在像队列中写入消息时,不需要某个进程在该队列上等待消息到达.这与管道不同,管道必须现有读再有写. 消息队列具有随内核的持续性,与管道不同.进程结束后,消息队列中消息不会消失.当管道最后一次关闭,其中的数据将丢弃. 消息队列具有名字,可用于非亲缘关系的进程间. Posix消息队列读总是返回最高优先级的最早消息,而System V消息队列的

Linux进程间通信(IPC)编程实践(十二)Posix消息队列--基本API的使用

posix消息队列与system v消息队列的差别: (1)对posix消息队列的读总是返回最高优先级的最早消息,对system v消息队列的读则可以返回任意指定优先级的消息. (2)当往一个空队列放置一个消息时,posix消息队列允许产生一个信号或启动一个线程,system v消息队列则不提供类似机制. 队列中的每个消息具有如下属性: 1.一个无符号整数优先级(posix)或一个长整数类型(system v) 2.消息的数据部分长度(可以为0) 3.数据本身(如果长度大于0) Posix消息队

Linux 进程间通信(posix消息队列 简单)实例

Linux 进程间通信(posix消息队列 简单)实例 详情见: http://www.linuxidc.com/Linux/2011-10/44828.htm 编译: gcc -o consumer consumer.c -lrt gcc -o producer producer.c -lrt /* * * Filename: producer.c * * Description: 生产者进程 * * Version: 1.0 * Created: 09/30/2011 04:52:23 PM

Linux环境编程之IPC进程间通信(五):Posix消息队列1

对于管道和FIFO来说,必须应该先有读取者存在,否则先有写入者是没有意义的.而消息队列则不同,它是一个消息链表,有足够写权限的线程可往别的队列中放置消息,有足够读权限的线程可从队列中取走消息.每个消息都是一个记录,它由发送者赋予一个优先级.在某个进程往一个队列写入消息之前,并不需要另外某个进程在该队列上等待消息的到达.消息队列是随内核的持续性,一个进程可以往某个队列写入一些消息,然后终止,再让另外一个进程在以后的某个时刻读出这些消息.这跟管道和FIFO不一样,当一个管道或FIFO的最后一次关闭时

细说linux IPC(九):posix消息队列

[版权声明:尊重原创,转载请保留出处:blog.csdn.net/shallnet 或 .../gentleliu,文章仅供学习交流,请勿用于商业用途] 消息队列可以看作一系列消息组织成的链表,一个程序可以往这个链表添加消息,另外的程序可以从这个消息链表读走消息. mq_open()函数打开或创建一个posix消息队列. #include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode cons

linux进程间通信之Posix消息队列

Posix消息队列与System V 消息队列的用法很相似,主要有以下区别:1. 对Posix消息队列的读取总是返回最高优先级的最早消息,对System V 消息队列的读取可以返回指定优先级的消息.2. Posix 消息队列允许产生一个信号或启动一个线程去向一个空队列写入一个消息,System V消息队列不可以. Posix消息队列常用函数及头文件#include <mqueue.h>1.  mqd_t mq_open(const char *name,int oflag,.../* mode

Linux笔记--Linux进程通信

Linux进程间通信 文章来源: http://www.cnblogs.com/linshui91/archive/2010/09/29/1838770.html 一.进程间通信概述进程通信有如下一些目的:A.数据传输:一个进程需要将它的数据发送给另一个进程,发送的数据量在一个字节到几M字节之间B.共享数据:多个进程想要操作共享数据,一个进程对共享数据的修改,别的进程应该立刻看到.C.通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程).D.

linux进程通信之SYSTEM V信号量

信号量的使用主要是用来保护共享资源,使得资源在一个时刻只有一个进程(线程)所拥有.信号量的值为正的时候,说明它空闲.所测试的线程可以锁定而使用它.若为0,说明它被占用,测试的线程要进入睡眠队列中,等待被唤醒. 一.信号量的分类: 在学习信号量之前,我们必须先知道--Linux提供两种信号量: (1) 内核信号量,由内核控制路径使用. (2) 用户态进程使用的信号量,这种信号量又分为POSIX信号量和SYSTEM V信号量. POSIX信号量又分为有名信号量和无名信号量.有名信号量,其值保存在文件