消息队列是在两个进程之间传递二进制块数据的一种简单有效的方式。每个数据块都有一个特定的类型,接收方可以根据类型来有选择的接收数据,而不一定像管道和匿名管道那样必须以先进先出的方式接收数据。
Linux消息队列的4个API包括四个系统调用:msgget、msgsnd、msgcrv和msgctl
#include <sys/msg.h> int msgget( key_t key, int msgflg ); int msgsnd( int sigid, const void* msg_ptr, size_t msg_sz, int msgflg ); int msgrcv( int msqid, void* msg_ptr, size_t msg_sz, long int msgtypes, int msgflg ); int msgctl( int msqid, int command, struct msqid_ds* buf);
l msgget系统调用创建一个消息队列,或者获取一个已有的消息队列。key参数是一个键值,用来标识一个全局唯一的消息队列。msgflg是一个权限标志,表示消息队列的访问权限,它与文件的访问权限一样。msgget成功时返回一个正整数值,它是消息队列的标识符。msgget失败时返回-1,并设置errno。
l msgsnd函数用来把消息添加到消息队列中,msqid是由msgget调用返回的消息队列标识符。msg_ptr参数指向一个准备发送的消息,消息必须被定义如下类型:
struct msgbuf { long mtype; /* message type, must be > 0 */ char mtext[1]; /* message data */ };
其中,mtype何曾元指定消息的类型,它必须是一个正整数。mtext是消息数据。msg_sz参数是消息的数据部分(mtext)的长度。这个长度可以为0,表示没有消息数据。
msgflg参数控制msgsnd的行为。它通常仅支持IPC_NOWAIT标志,即以非阻塞的方式发送消息。默认情况下,发送消息如果消息队列满了,则msgsnd将阻塞。若IPC_NOWAIT标志被指定,则msgsnd调用可能立即返回并设置errno为EAGAIN。
l msgrcv函数从消息队列中获取消息,msqid是消息队列标识符,msgp用于存储接收的消息,msgsz指的是消息数据部分的长度。msgtype参数指定接收何种类型的消息,我们可以使用如下几种方式指定消息类型:
msgtype等于0:读取消息队列中的第一个消息。
msgtype大于0:读取消息队列中的第一个类型为msgtype的消息。
msggtype小于0:读取消息队列中第一个类型值比msgtype绝对值小的消息。
msgflag控制msgrcv函数的行为。它可以是如下标志的按位或:
IPC_NOWAIT:如果消息队列中没有消息,则msgrcv调用立即返回并设置errno为ENOMSG。
MSG_CEXEPT:如果msgtype大于0,则接受消息队列中第一个非msgtype类型的消息。
MSG_NOERROR:如果消息数据部分的长度超过了msgsz就将它截断。,
l msgctl控制消息队列的属性,msgqid是消息队列的标识符。command是将要采取的动作,它可以取3个值
IPC_STAT:把msgid_ds结构中的数据设置为消息队列的当前关联值,即用消息队列的当前关联值覆盖msgid_ds的值。
IPC_SET:如果进程有足够的权限,就把消息列队的当前关联值设置为msgid_ds结构中给出的值
IPC_RMID:删除消息队列
buf是指向msgid_ds结构的指针,它指向消息队列模式和访问权限的结构。msgid_ds结构至少包括以下成员:
struct msgid_ds { uid_t shm_perm.uid; uid_t shm_perm.gid; mode_t shm_perm.mode; };
成功时返回0,失败时返回-1.
消息队列简单程序示例:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/msg.h> #include <time.h> #define err_sys(msg) do { perror(msg); exit(-1); } while(0) void msg_stat(int msgid, struct msqid_ds msg_info) { sleep(1); if(msgctl(msgid, IPC_STAT, &msg_info) < 0) err_sys("msgctl"); printf("\n"); printf("msg_cbytes: %ld\n", msg_info.msg_cbytes); printf("msg_qnum: %ld\n", msg_info.msg_qnum); printf("msg_qbytes: %ld\n", msg_info.msg_qbytes); printf("msg_lspid: %d\n", msg_info.msg_lspid); printf("msg_lrpid: %d\n", msg_info.msg_lrpid); printf("msg_stime: %s\n", ctime(&(msg_info.msg_stime))); printf("msg_rtime: %s\n", ctime(&(msg_info.msg_rtime))); printf("msg_ctime: %s\n", ctime(&(msg_info.msg_ctime))); printf("msg_uid: %d\n", msg_info.msg_perm.uid); printf("msg_gid: %d\n", msg_info.msg_perm.gid); } struct msgsbuf { int mtype; char mtext[1]; }msg_sbuf; struct msgmbuf { int mtype; char mtext[10]; }msg_rbuf; int main(void) { int msgid; struct msqid_ds msg_ginfo; pid_t pid; msgid = msgget(IPC_PRIVATE, 0666 | IPC_CREAT | IPC_EXCL); if(msgid < 0) err_sys("msgget"); //msg_stat(msgid, msg_ginfo); if((pid = fork()) < 0) err_sys("fork"); else if(pid == 0) { sleep(1); msg_sbuf.mtype = 10; msg_sbuf.mtext[0] = 'a'; if(msgsnd(msgid, &msg_sbuf, sizeof(msg_sbuf.mtext), IPC_NOWAIT) < 0) printf("msgsnd error\n"); printf("Child: send to msg queue 'a'\n"); exit(0); //msg_stat(msgid, msg_ginfo); } else { //sleep(1); if(msgrcv(msgid, &msg_rbuf, 4, 10, 0) < 0) /* msgrcv默认是会阻塞的 */ perror("msgrcv\n"); else printf("Parent: read from msg queue. it is %c.\n", msg_rbuf.mtext[0]); printf("Recevied\n"); //msg_stat(msgid, msg_ginfo); } waitpid(pid, NULL, 0); msgctl(msgid, IPC_RMID, NULL); return 0; }
参考资料:
1、《Linux高性能服务器编程》 第13章 多进程编程/共享内存
版权声明:本文为博主原创文章,未经博主允许不得转载。