细说linux IPC(十):system V 消息队列

【版权声明:尊重原创,转载请保留出处:blog.csdn.net/shallnet 或 .../gentleliu,文章仅供学习交流,请勿用于商业用途】

system V消息队列和posix消息队列类似,linux系统这两种消息队列都支持。先来看一下system V消息队列相关操作及其函数。

  • msgget()函数创建一个消息队列或打开一个消息队列。
       #include <sys/types.h>
       #include <sys/ipc.h>
       #include <sys/msg.h>
       int msgget(key_t key, int msgflg);

参数key为ftok返回值或IPC_PRIVATE;

参数msgflg为IPC_CREAT或和IPC_CREAT | IPC_EXCL,当消息队列已经存在时,而msgflg指定为IPC_CREAT | IPC_EXCL,函数返回EEXIST。

在系统中的每一个消息队列,内核维护这样一个结构体:

           struct msqid_ds {
               struct ipc_perm msg_perm;     /* Ownership and permissions */
               time_t          msg_stime;    /* Time of last msgsnd(2) */
               time_t          msg_rtime;    /* Time of last msgrcv(2) */
               time_t          msg_ctime;    /* Time of last change */
               unsigned long   __msg_cbytes; /* Current number of bytes in
                                                queue (nonstandard) */
               msgqnum_t       msg_qnum;     /* Current number of messages
                                                in queue */
               msglen_t        msg_qbytes;   /* Maximum number of bytes
                                                allowed in queue */
               pid_t           msg_lspid;    /* PID of last msgsnd(2) */
               pid_t           msg_lrpid;    /* PID of last msgrcv(2) */
           };

其中ipc_perm 结构体定义如下:

           struct ipc_perm {
               key_t          __key;       /* Key supplied to msgget(2) */
               uid_t          uid;         /* Effective UID of owner */
               gid_t          gid;         /* Effective GID of owner */
               uid_t          cuid;        /* Effective UID of creator */
               gid_t          cgid;        /* Effective GID of creator */
               unsigned short mode;        /* Permissions */
               unsigned short __seq;       /* Sequence number */
           };

当一个消息队列初始化完后,该消息队列对应的结构被作如下初始化:

msg_perm.cuid 和 msg_perm.uid被设置为当前进程的用户ID,msg_perm.cuid 和 msg_perm.uid被设置为当前进程的组ID;msg_perm.mode被设置为msgflg;msg_qnum, msg_lspid, msg_lrpid, msg_stime and msg_rtime
 被设置为 0;msg_ctime被设置为当前时间;msg_qbytes被设置为系统限制值MSGMNB。

  • msgsnd()函数向消息队列中添加消息。
       #include <sys/types.h>
       #include <sys/ipc.h>
       #include <sys/msg.h>
       int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

参数msgqid为消息队列标识,为msgget()返回值;

参数msgp为指向用户定义结构体的指针,结构体定义格式模板如下:

           struct msgbuf {
               long mtype;       /* message type, must be > 0 */
               char mtext[1];    /* message data */
           };

mtex域为数组,该数组大小为函数参数msgsz,mtype的值必须大于0;该结构由用户自己定义,一般根据实际应用来定义。

参数msgsz为将要发送消息的长度,可以指定为0,一般指定为msgbuf结构体长度减去long类型长度(sizeof(msgbuf) - sizeof(long));

参数msgflg为0,或IPC_NOWAIT,IPC_NOWAIT为非阻塞,发送时如果对列数据已满,则函数马上返回错误EAGAIN,如果msgflg为0,则阻塞直到消息队列由足够空间添加新的消息,或消息队列从系统中删除,或被信号中断。

  • msgrcv()函数从消息队列中读取消息。
       #include <sys/types.h>
       #include <sys/ipc.h>
       #include <sys/msg.h>
       ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,  int msgflg);

参数msgid同函数msgsnd;

参数msgp通函数函数msgsnd,为指向接收消息结构体指针;

参数msgsz为接受消息的缓存大小;

参数msgtyp为读取何种消息,如果为:

0 - 读取消息队列中最早的消息,以先进先出的方式读取消息。

> 0 - 读取消息队列类型为msgtyp最早的消息。

< 0 - 读取消息队列中,消息类型小于或等于msgtyp绝对值的消息中最小消息类型的最早消息。

参数msgflg为如下的值或者几个值得或:

IPC_NOWAIT - 如果队列中没有则立即返回;如果未指定该选项,函数将一直阻塞,直到指定的消息可读取,或消息队列从系统中删除,或被信号中断;

MSG_NOERROR - 截断消息如果接收到的消息大小大于msgsz给定的值;

MSG_EXCEPT - 使用大于0的msgtyp去读取消息类型不为msgtyp第一个消息;

msgctl()函数对一个队列做控制操作。

#include <sys/msg.h>

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

参数cmd为对队列要做的操作,这些操作有:

IPC_STAT - 获取消息队列信息,从内核数据结构中拷贝消息队列为msqid的结构体到buf指针指向的msqid_ds 结构体中。

IPC_SET - 使用buf指向的结构体来设置msgqid对应的消息队列内核结构体;

IPC_RMID - 删除msgqid消息队列;

IPC_INFO,MSG_INFO, MSG_STAT详见linux man函数手册。

示例:

服务进程:

#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <errno.h>
#include <unistd.h>
#include <signal.h>
#include <stdlib.h>

#include "slnmq.h"

static int recv_type, msgid;

int sln_server_loop(void)
{
    int                 rcvmsg_len;
    key_t               key;
    struct sln_msgbuf   slnmsg;

    key = ftok(SLN_IPC_MQ_NAME, 0);
    if (key < 0) {
        fprintf(stderr, "ftok: %s\n", strerror(errno));
        return -1;
    }

    msgid = msgget(key, IPC_CREAT | IPC_EXCL);
    if ((msgid < 0) && (errno != EEXIST)) {
        fprintf(stderr, "msgget: %s\n", strerror(errno));
        return -1;
    }

    if (msgid < 0) {
        printf("Open existing mq!\n");
        msgid = msgget(key, IPC_CREAT);
        if (msgid < 0) {
            fprintf(stderr, "msgget: %s\n", strerror(errno));
            return -1;
        }
    }

    sleep(10); //just for test
    for (;;) {
        rcvmsg_len = msgrcv(msgid, &slnmsg, SLN_IPC_MQ_MSGSIZE, recv_type, 0);
        if (rcvmsg_len < 0) {
            fprintf(stderr, "msgrcv: %s\n", strerror(errno));
            sleep(1);
            continue;
        }

        fprintf(stdout, "receive -  recv len: %d, msg type: %d, msg: %s\n",
                rcvmsg_len, slnmsg.msgtype, slnmsg.msg);
    }

    return 0;
}

static void sigint_func(int sig)
{
    if (msgctl(msgid, IPC_RMID, NULL) < 0) {
        fprintf(stderr, "msgctl: %s\n", strerror(errno));
    }
    exit(0);
}

int main(int argc, const char *argv[])
{
    FILE                *fp = NULL;

    if (argc != 2) {
        fprintf(stderr, "Usage: %s <msg type>\n", argv[0]);//运行时,指定服务要进程接收的消息类型
        return -1;
    }

    recv_type = atoi(argv[1]);

    if (access(SLN_IPC_MQ_NAME, F_OK) < 0) {
        fp = fopen(SLN_IPC_MQ_NAME, "w+");
        if (NULL != fp) {
            fclose(fp);
        }
    }

    signal(SIGINT, sigint_func); //服务进程捕获中断信号,在中断处理函数里面删除消息队列并推出程序

    sln_server_loop();

    return 0;
}

客户进程:

#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <errno.h>

#include "slnmq.h"

int sln_msgsnd(int msg_type, const void *sndmsg, int sndlen)
{
    int                 msgid, rcvmsg_len;
    key_t               key;
    struct sln_msgbuf   slnmsg;

    key = ftok(SLN_IPC_MQ_NAME, 0);
    if (key < 0) {
        fprintf(stderr, "ftok: %s\n", strerror(errno));
        return -1;
    }

    msgid = msgget(key, IPC_CREAT);
    if (msgid < 0) {
        fprintf(stderr, "msgget: %s\n", strerror(errno));
        return -1;
    }

    slnmsg.msgtype = msg_type;
    memcpy(&slnmsg.msg, sndmsg, sndlen);
    msgsnd(msgid, (void *)&slnmsg, sndlen, 0);
}

int main(int argc, const char *argv[])
{
    int     type;

    if (argc != 3) {
        fprintf(stderr, "Usage: %s <msg type> <msg content>\n", argv[0]);
        return -1;
    }

    type = atoi(argv[1]);

    sln_msgsnd(type, argv[2], strlen(argv[2]) + 1);
    return 0;
}

服务进程首先运行,并在开始读取消息处等待10秒,客户进程在服务进程还未开始读取数据时向消息队列加入多条消息,如下:

./client 1 abcd
./client 2 efghi
./client 3 jkl
./client 4 mn
./client 5 opqrst
./client 6 uv
./client 7 w
./client 8 xyz

当服务进程运行时会从消息队列中取出服务进程要取出类型(运行时参数指定)的消息,下面分别是服务器运行时指定的消息类型以及取出的消息(客户端的运行都如上):

# ./server 0(读取所有消息)
receive -  recv len: 5, msg type: 1, msg: abcd
receive -  recv len: 6, msg type: 2, msg: efghi
receive -  recv len: 4, msg type: 3, msg: jkl
receive -  recv len: 3, msg type: 4, msg: mn
receive -  recv len: 7, msg type: 5, msg: opqrst
receive -  recv len: 3, msg type: 6, msg: uv
receive -  recv len: 2, msg type: 7, msg: w
receive -  recv len: 4, msg type: 8, msg: xyz
# ./server 4 (读取消息类型为4的消息)
Open existing mq!
receive -  recv len: 3, msg type: 4, msg: mn
# ./server -5     (读取消息小于5的消息)
Open existing mq!
receive -  recv len: 5, msg type: 1, msg: abcd
receive -  recv len: 6, msg type: 2, msg: efghi
receive -  recv len: 4, msg type: 3, msg: jkl
receive -  recv len: 3, msg type: 4, msg: mn
receive -  recv len: 7, msg type: 5, msg: opqrst

在服务器未退出时,可以使用系统命令ipcs查看系统中的消息队列:

# ipcs

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages
0x001fffde 229376     root       0          0            0          

当用户按下CTRL+C时,程序会删除掉该消息队列,按下CTRL+C后,再查看:

# ipcs

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages

可以看到程序中的删除是成功的。

posix消息队列和system V消息队列大致类似,不过通过这两节可以看到有一些差别:

posix消息队列有优先级的概念,system V没有,只能按照消息类型的先进先出来读取消息即FIFO;

system V消息队列有消息类型的概念,并且可以按照消息类型取出消息,而posix不能区分消息类型;

本节源码下载:

点击打开链接

时间: 2024-12-28 19:40:41

细说linux IPC(十):system V 消息队列的相关文章

Linux进程间通信——使用System V 消息队列

消息队列 消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法. 每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型的数据结构.我们可以通过发送消息来避免命名管道的同步和阻塞问题.但是消息队列与命名管道一样,每个数据块都有一个最大长度的限制. Linux用宏MSGMAX和MSGMNB来限制一条消息的最大长度和一个队列的最大长度. 在Linux中使用消息队列 Linux提供了一系列消息队列的函数接口来让我们方便地使用它来实现进程间的通信. msgget 函数 创建和访问一个

Linux IPC实践(7) --Posix消息队列

1. 创建/获取一个消息队列 #include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ #include <mqueue.h> mqd_t mq_open(const char *name, int oflag); //专用于打开一个消息队列 mqd_t mq_open(const char *name, int oflag, mode_t mode

System V消息队列(2)

msgsnd函数 功能:把一条消息添加到消息队列中 原型 int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg); 参数 msgid: 由msgget函数返回的消息队列标识码 msgp:是一个指针,指针指向准备发送的消息, msgsz:是msgp指向的消息长度,这个长度不含保存消息类型的那个long int长整型 msgflg:控制着当前消息队列满或到达系统上限时将要发生的事情 返回值:成功返回0:失败返回-1 msgs

利用System V消息队列实现回射客户/服务器

一.介绍 在学习UNIX网络编程 卷1时,我们当时可以利用Socket套接字来实现回射客户/服务器程序,但是Socket编程是存在一些不足的,例如: 1. 服务器必须启动之时,客户端才能连上服务端,并与服务端进行通信: 2. 利用套接口描述符进行通信,必须知道对端的IP与端口. 二.相关函数介绍 下面,我们利用System V消息队列来实现进程间的通信: 首先,我们先来了解一下下面几个函数: 1. msgget: 该函数用于打开或创建消息队列,其作用相当与文件操作函数open. #include

进程间通信 System V 消息队列

1.msgget (key_t ket,int flag) ; //创建一个新的消息队列或者访问一个已存在的消息队列 2.msgsnd(int msid, const void *ptr ,size_t length ,int flag ) // 发送 3.msgrcv() //读 4.msgctl(int msid , int cmd ,struct  msqid_ds *buff )//  cmd 提供删除,设置,返回当前 tips : 1.客户端服务端例子 服务端创建两个消息队列,A,B,

Linux IPC实践(4) --System V消息队列(1)

消息队列概述 消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法(仅局限于本机); 每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型值. 消息队列也有管道一样的不足: (1)每个消息的最长字节数的上限(MSGMAX); (2)系统中消息队列的总条数也有一个上限(MSGMNI); (3)每个消息队列所能够保存的总字节数是有上限的(MSGMNB) . 查看系统限制 cat /proc/sys/kernel/msgmax  #最大消息长度限制 cat /proc/sys

Linux IPC实践(6) --System V消息队列(3)

消息队列综合案例 消息队列实现回射客户/服务器 server进程接收时, 指定msgtyp为0, 从队首不断接收消息 server进程发送时, 将mtype指定为接收到的client进程的pid client进程发送的时候, mtype指定为自己进程的pid client进程接收时, 需要将msgtyp指定为自己进程的pid, 只接收消息类型为自己pid的消息; // client/server进程接收/发送的数据结构 const int MSGMAX = 8192; struct msgBuf

Linux IPC实践(5) --System V消息队列(2)

消息发送/接收API msgsnd函数 int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg); 参数 msgid: 由msgget函数返回的消息队列标识码, 也可以是通过ipcs命令查询出来的一个已经存在的消息队列的ID号 msgp:是一个指针,指针指向准备发送的消息, msgsz:是msgp指向的消息长度, 注意:这个长度不含保存消息类型的那个long int长整型 msgflg:控制着当前消息队列满或到达系统上限时

LINUX学习:System V消息队列

介绍: 1.消息队列提供了一个从一个进程向另外一个进程发送数据块的方法 2.每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型值 3.消息队列也有管道一样的不足,就是每个消息最大的长度是有上限的(MSGMAX),每个消息队列 的总的字节数是有上限的(MSGMNB),系统上消息队列的总数也是有一个上限的.   每个IPC对象都在内核维护着一个数据结构, 消息不同于刘模式,消息是有边界的,消息都是被打包在一个一个的数据包里的. 那么我们看下消息队列结构:       消息队列有4