细说linux IPC(九):posix消息队列

【版权声明:尊重原创,转载请保留出处:blog.csdn.net/shallnet 或 .../gentleliu,文章仅供学习交流,请勿用于商业用途】

消息队列可以看作一系列消息组织成的链表,一个程序可以往这个链表添加消息,另外的程序可以从这个消息链表读走消息。

  • mq_open()函数打开或创建一个posix消息队列。
       #include <fcntl.h>           /* For O_* constants */
       #include <sys/stat.h>        /* For mode constants */
       #include <mqueue.h>
       mqd_t mq_open(const char *name, int oflag);
       mqd_t mq_open(const char *name, int oflag, mode_t mode,
                     struct mq_attr *attr);
       Link with -lrt.

参数name为posix IPC名字, 即将要被打开或创建的消息队列对象,为了便于移植,需要指定为“/name”的格式。

参数oflag必须要有O_RDONLY(只读)、标志O_RDWR(读写), O_WRONLY(只写)之一,除此之外还可以指定O_CREAT(没有该对象则创建)、O_EXCL(如果O_CREAT指定,但name不存在,就返回错误),O_NONBLOCK(以非阻塞方式打开消息队列,在正常情况下mq_receive和mq_send函数会阻塞的地方,使用该标志打开的消息队列会返回EAGAIN错误)。

当操作一个新队列时,使用O_CREAT标识,此时后面两个参数需要被指定,参数mode为指定权限位,attr指定新创建队列的属性。

  • mq_close()函数关闭消息队列。
       #include <mqueue.h>
       int mq_close(mqd_t mqdes);
       Link with -lrt.

关闭之后调用进程不在使用该描述符,但消息队列不会从系统中删除,进程终止时,会自动关闭已打开的消息队列,和调用mq_close一样。参数为mq_open()函数返回的值。

  • mq_unlink()函数从系统中删除某个消息队列。

           #include <mqueue.h>
           int mq_unlink(const char *name);
           Link with -lrt.

删除会马上发生,即使该队列的描述符引用计数仍然大于0。参数为mq_open()函数第一个参数。

  • mq_setattr()函数和mq_getattr()函数分别设置和和获取消息队列属性。
       #include <mqueue.h>
       int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
       int mq_setattr(mqd_t mqdes, struct mq_attr *newattr,  struct mq_attr *oldattr);
       Link with -lrt.

参数mqdes为mq_open()函数返回的消息队列描述符。

参数attr、newattr、oldattr为消息队列属性结构体指针;

           struct mq_attr {
               long mq_flags;       /* Flags: 0 or O_NONBLOCK */
               long mq_maxmsg;      /* Max. # of messages on queue */
               long mq_msgsize;     /* Max. message size (bytes) */
               long mq_curmsgs;     /* # of messages currently in queue */
           };

参数mq_flags在mq_open时被初始化(oflag参数),其值为0
或者 O_NONBLOCK。

参数mq_maxmsg和mq_msgsize在mq_open时在参数attr中初始化设置,mq_maxmsg是指队列的消息个数最大值;mq_msgsize为队列每个消息的最大值。

参数mq_curmsgs为当前队列消息。

mq_getattr()函数把队列当前属性填入attr所指向的结构体。

mq_setattr()函数只能设置mq_flags属性,另外的域会被自动忽略,mq_maxmsg和mq_msgsize的设置需要在mq_open当中来完成,
参数oldattr会和函数mq_getattr函数中参数attr相同的值。

  • mq_send() 函数 和mq_receive()函数分别用于向消息队列放置和取走消息。
       #include <mqueue.h>
       int mq_send(mqd_t mqdes, const char *msg_ptr,   size_t msg_len, unsigned msg_prio);
       ssize_t mq_receive(mqd_t mqdes, char *msg_ptr,  size_t msg_len, unsigned *msg_prio);
       Link with -lrt.

参数msg_ptr为指向消息的指针。

msg_len为消息长度,该值不能大于属性值中mq_msgsize的值。

msg_prio为优先级,消息在队列中将按照优先级大小顺序来排列消息。

如果消息队列已满,mq_send()函数将阻塞,直到队列有可用空间再次允许放置消息或该调用被信号打断;如果O_NONBLOCK被指定,mq_send()那么将不会阻塞,而是返回EAGAIN错误。如果队列空,mq_receive()函数将阻塞,直到消息队列中有新的消息;如果O_NONBLOCK被指定,mq_receive()那么将不会阻塞,而是返回EAGAIN错误。

示例:

服务进程:

int sln_ipc_mq_loop(void)
{
    mqd_t           mqd;
    struct mq_attr  setattr, attr;
    char            *recvbuf = NULL;
    unsigned int    prio;
    int             recvlen;

    setattr.mq_maxmsg = SLN_IPC_MQ_MAXMSG;
    setattr.mq_msgsize = SLN_IPC_MQ_MSGSIZE;

    mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR | O_CREAT | O_EXCL, 0644, &setattr); //创建消息队列并设置消息队列属性
    if ((mqd < 0) && (errno != EEXIST)) {
        fprintf(stderr, "mq_open: %s\n", strerror(errno));
        return -1;
    }

    if ((mqd < 0) && (errno == EEXIST)) { // 消息队列存在则打开
        mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR);
        if (mqd < 0) {
            fprintf(stderr, "mq_open: %s\n", strerror(errno));
            return -1;
        }
    }

    if (mq_getattr(mqd, &attr) < 0) { //获取消息队列属性
        fprintf(stderr, "mq_getattr: %s\n", strerror(errno));
        return -1;
    }

    printf("flags: %ld, maxmsg: %ld, msgsize: %ld, curmsgs: %ld\n",
            attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);

    recvbuf = malloc(attr.mq_msgsize); //为读取消息队列分配当前系统允许的每条消息的最大大小的内存空间
    if (NULL == recvbuf) {
        return -1;
    }

    for (;;) {
        recvlen = mq_receive(mqd, recvbuf, attr.mq_msgsize, &prio); //从消息队列中读取消息
        if (recvlen < 0) {
            fprintf(stderr, "mq_receive: %s\n", strerror(errno));
            continue;
        }

        printf("recvive length: %d, prio: %d, recvbuf: %s\n", recvlen, prio, recvbuf);
    }

    return 0;
}

客户进程:

int sln_ipc_mq_send(const char *sendbuf, int sendlen, int prio)
{
    mqd_t           mqd;

    mqd = mq_open(SLN_IPC_MQ_NAME, O_WRONLY); //客户进程打开消息队列
    if (mqd < 0) {
        fprintf(stderr, "mq_open: %s\n", strerror(errno));
        return -1;
    }

    if (mq_send(mqd, sendbuf, sendlen, prio) < 0) { //客户进程网消息队列中添加一条消息
        fprintf(stderr, "mq_send: %s\n", strerror(errno));
        return -1;
    }

    return 0;
}

程序运行时,服务进程阻塞于mq_receive,客户进程每发一条消息队列,服务进程都会从mq_receive处返回,但不一定接收到的消息就是客户进程最近发送的那一条消息,因为客户进程往消息队列中添加消息时会按照优先级来排序,如果客户进程同时向消息队列添加多条消息,服务进程还未来得及读取,那么当服务进程开始读取的消息一定是优先级最高的那条消息,而不是客户进程最先发送的那一条消息。

我们将服务进程稍作修改来试一下:

int sln_ipc_mq_loop(void)
{
    mqd_t           mqd;
    struct mq_attr  setattr, attr;
    char            *recvbuf = NULL;
    unsigned int    prio;
    int             recvlen;
    memset(&setattr, 0, sizeof(setattr));
    setattr.mq_maxmsg = SLN_IPC_MQ_MAXMSG;
    setattr.mq_msgsize = SLN_IPC_MQ_MSGSIZE;
    mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR | O_CREAT | O_EXCL, 0644, &setattr);
    //mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR | O_CREAT | O_EXCL, 0644, NULL);
    if ((mqd < 0) && (errno != EEXIST)) {
        fprintf(stderr, "mq_open: %s\n", strerror(errno));
        return -1;
    }
    if ((mqd < 0) && (errno == EEXIST)) { // name is exist
        mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR);
        if (mqd < 0) {
            fprintf(stderr, "mq_open: %s\n", strerror(errno));
            return -1;
        }
    }
    if (mq_getattr(mqd, &attr) < 0) {
        fprintf(stderr, "mq_getattr: %s\n", strerror(errno));
        return -1;
    }
    printf("flags: %ld, maxmsg: %ld, msgsize: %ld, curmsgs: %ld\n",
            attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);
    recvbuf = malloc(attr.mq_msgsize);
    if (NULL == recvbuf) {
        return -1;
    }
    sleep(10); //此处等待10秒,此时客户进程一次性向消息队列加入多条消息
    for (;;) {
        if (mq_getattr(mqd, &attr) < 0) {
            fprintf(stderr, "mq_getattr: %s\n", strerror(errno));
            return -1;
        }
        printf("msgsize: %ld, curmsgs: %ld\n", attr.mq_msgsize, attr.mq_curmsgs);
        recvlen = mq_receive(mqd, recvbuf, attr.mq_msgsize, &prio);
        if (recvlen < 0) {
            fprintf(stderr, "mq_receive: %s\n", strerror(errno));
            continue;
        }
        printf("recvive-> prio: %d, recvbuf: %s\n", prio, recvbuf);
        sleep(1); //每秒处理一个消息
    }
    mq_close(mqd);
    return 0;
}

服务进程先运行,然后客户进程立即向消息队列加入12消息,每条消息优先级从1到12,,之后服务进程运行,程序运行如下:

# ./server
flags: 0, maxmsg: 10, msgsize: 1024, curmsgs: 0
msgsize: 1024, curmsgs: 10
recvive-> prio: 10, recvbuf: asdf
msgsize: 1024, curmsgs: 10
recvive-> prio: 11, recvbuf: 1234
msgsize: 1024, curmsgs: 10
recvive-> prio: 12, recvbuf: asdf
msgsize: 1024, curmsgs: 9
recvive-> prio: 9, recvbuf: 1234
msgsize: 1024, curmsgs: 8
recvive-> prio: 8, recvbuf: asdf
msgsize: 1024, curmsgs: 7
recvive-> prio: 7, recvbuf: 1234
msgsize: 1024, curmsgs: 6
recvive-> prio: 6, recvbuf: asdf
msgsize: 1024, curmsgs: 5
recvive-> prio: 5, recvbuf: 1234
msgsize: 1024, curmsgs: 4
recvive-> prio: 4, recvbuf: asdf
msgsize: 1024, curmsgs: 3
recvive-> prio: 3, recvbuf: 1234
msgsize: 1024, curmsgs: 2
recvive-> prio: 2, recvbuf: asdf
msgsize: 1024, curmsgs: 1
recvive-> prio: 1, recvbuf: 1234
msgsize: 1024, curmsgs: 0

可以看到,系统允许最大消息数量是10条,当客户进程一次性加入12条消息时,客户进程在加入最后两条会阻塞在那里,直到服务进程取出消息之后,最后两天消息才能依次加入到消息队列。并且服务进程取出消息时从优先级从高到低取出:10->11->12->9->8->...
->1

本节源码下载:

点击打开链接

时间: 2024-10-09 11:07:27

细说linux IPC(九):posix消息队列的相关文章

Linux 进程间通信(posix消息队列 简单)实例

Linux 进程间通信(posix消息队列 简单)实例 详情见: http://www.linuxidc.com/Linux/2011-10/44828.htm 编译: gcc -o consumer consumer.c -lrt gcc -o producer producer.c -lrt /* * * Filename: producer.c * * Description: 生产者进程 * * Version: 1.0 * Created: 09/30/2011 04:52:23 PM

Unix IPC之Posix消息队列(1)

部分参考:http://www.cnblogs.com/Anker/archive/2013/01/04/2843832.html IPC对象的持续性:http://book.51cto.com/art/201006/207275.htm 消息队列可以认为是一个消息链表,某个进程往一个消息队列中写入消息之前,不需要另外某个进程在该队列上等待消息的达到,这一点与管道和FIFO相反.Posix消息队列与System V消息队列的区别如下: 1. 对Posix消息队列的读总是返回最高优先级的最早消息,

linux进程间通信之Posix消息队列

Posix消息队列与System V 消息队列的用法很相似,主要有以下区别:1. 对Posix消息队列的读取总是返回最高优先级的最早消息,对System V 消息队列的读取可以返回指定优先级的消息.2. Posix 消息队列允许产生一个信号或启动一个线程去向一个空队列写入一个消息,System V消息队列不可以. Posix消息队列常用函数及头文件#include <mqueue.h>1.  mqd_t mq_open(const char *name,int oflag,.../* mode

Unix IPC之Posix消息队列(2)

/* Query status and attributes of message queue MQDES. */ extern int mq_getattr (mqd_t __mqdes, struct mq_attr *__mqstat) __THROW __nonnull ((2)); /* Set attributes associated with message queue MQDES and if OMQSTAT is not NULL also query its old att

Unix IPC之Posix消息队列(3)

struct mq_attr { long mq_flags; /* message queue flag : 0, O_NONBLOCK */ long mq_maxmsg; /* max number of messages allowed on queue*/ long mq_msgsize; /* max size of a message (in bytes)*/ long mq_curmsgs; /* number of messages currently on queue */

Linux IPC机制:消息队列示例

程序msg1.c用于接收消息 <span style="font-size:12px;">#include <stdio.h> #include <unistd.h> #include <string.h> #include <errno.h> #include <stdlib.h> #include <sys/msg.h> struct my_msg_st { long int my_msg_type

细说linux IPC(十一):各种IPC形式比较总结(完)

[版权声明:尊重原创,转载请保留出处:blog.csdn.net/shallnet 或 .../gentleliu,文章仅供学习交流,请勿用于商业用途] 这个系列基本上到现在为止已经差不多把linux上的各种常用的IPC介绍完了,linux上面的IPC大多都是从UNIX上面继承而来. 最初Unix IPC包括:管道.FIFO.信号.System V IPC包括:System V消息队列.System V信号灯.System V共享内存区.由于Unix版本的多样性,电子电气工程协会(IEEE)开发

Linux进程间通信(IPC)编程实践(十二)Posix消息队列--基本API的使用

posix消息队列与system v消息队列的差别: (1)对posix消息队列的读总是返回最高优先级的最早消息,对system v消息队列的读则可以返回任意指定优先级的消息. (2)当往一个空队列放置一个消息时,posix消息队列允许产生一个信号或启动一个线程,system v消息队列则不提供类似机制. 队列中的每个消息具有如下属性: 1.一个无符号整数优先级(posix)或一个长整数类型(system v) 2.消息的数据部分长度(可以为0) 3.数据本身(如果长度大于0) Posix消息队

Linux IPC实践(7) --Posix消息队列

1. 创建/获取一个消息队列 #include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ #include <mqueue.h> mqd_t mq_open(const char *name, int oflag); //专用于打开一个消息队列 mqd_t mq_open(const char *name, int oflag, mode_t mode