一概述
消息队列就是一段有一定格式的内存区,即一个消息的链表,位于内核中,可以把消息看成一条记录,这个记录有特定的格式和优先级。
消息队列的读和写是异步的,发送方不必等到接收方接收,接收方发现没有数据也不用等待。
新的消息总是放在队尾,接收的时候不一定要遵守先进先出的原则,可以根据优先级获取数据。
消息队列只有在内核重启或者显示的删除的时候才会被删除掉。
二 操作函数
1 创建消息队列
#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int msgget(key_t key, //key值,标识消息队列的唯一性 int msgflg); //设置消息队列的属性。一般为 0666 | IPC_CREAT //返回值:>0(成功,消息队列的标识符) -1(创建失败,失败信息见errno)
2 创建之后shell查看的方法
ipcs -q | grep -i key
如:ipcs -q | grep -i 4d2 即查看key值为1234(16进程为4d2)的消息队列
3 向消息队列中写入数据
#include <sys/ipc.h> #include <sys/msg.h> #include <sys/types.h> int msgsnd(int msqid, //消息队列的标识符 const void *msgp, //包含数据的结构体指针 size_t msgsz, //结构体中数据的长度 int msgflg); //用于设置发送数据时的一些特性 //返回值: 0(写数据成功) -1(写数据失败,错误信息见errno)
msgp是一个结构体的指针,结构体是由用户定义的,定义的结构应该按照如下格式:
struct msgbuf { long mtype; /* message type, must be > 0 */ char mtext[1]; /* message data */ };
其中,
msgflg决定发送消息时的一些特性。
比如:默认情况下,如果消息队列中暂时没有空间的话,默认消息队列是会阻塞的。
但,如果msgflg字段赋值为IPC_NOWAIT,则这种情况下会直接返回EAGAIN错误。
4 从消息队列中读取数据
#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> ssize_t msgrcv(int msqid, //消息队列的标识符 void *msgp, //接收消息存放的容器 size_t msgsz, //接收消息的大小 long msgtyp, //接收消息的消息类型 int msgflg); //设置接收消息时的一些特性 //返回值:>0 (成功读取到的数据大小) -1(失败,错误信息见errno)
其中,
msgtype就是接受的优先级,即结构体msgbuf中的mtype字段。
如果,=0 就从消息队列中的第一个消息取;
>0 就从有相同消息类型的第一个消息开始取;
<0 就从<= abs(msgtype)的第一个消息开始取。
msgflg决定接收消息时的一些特性。
如,当发送过来的消息大小超过接收容器的大小时,默认情况返回失败,errno设置为E2BIG.
但,此时如果msgflg设置为MSG_NOERROR,则会将消息按照接收容器截断并返回成功。
5 消息队列的属性操作函数
#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int msgctl(int msqid, //消息队列的标识符 int cmd, //对消息队列属性的操作 struct msqid_ds *buf); //消息队列的属性 //返回值:0(成功) -1(失败,失败信息见errno)
其中,
cmd取值如下:IPC_STAT 获取消息队列的属性信息
IPC_SET 设置消息队列的属性信息
IPC_RMID 删除消息队列,并将相关联的读写进程返回错误,errno设置为EIDRM
buf参数是消息属性的结构体指针,结构体定义如下:
struct msqid_ds { struct ipc_perm msg_perm; /* Ownership and permissions */ time_t msg_stime; /* Time of last msgsnd(2) */ time_t msg_rtime; /* Time of last msgrcv(2) */ time_t msg_ctime; /* Time of last change */ unsigned long __msg_cbytes; /* Current number of bytes in queue (non-standard) */ msgqnum_t msg_qnum; /* Current number of messages in queue */ msglen_t msg_qbytes; /* Maximum number of bytes allowed in queue */ pid_t msg_lspid; /* PID of last msgsnd(2) */ pid_t msg_lrpid; /* PID of last msgrcv(2) */ }; struct ipc_perm { key_t __key; /* Key supplied to msgget(2) */ uid_t uid; /* Effective UID of owner */ gid_t gid; /* Effective GID of owner */ uid_t cuid; /* Effective UID of creator */ gid_t cgid; /* Effective GID of creator */ unsigned short mode; /* Permissions */ unsigned short __seq; /* Sequence number */ };
消息队列的大小在以下文件中:
/proc/sys/kernel/msgmax 单个消息的最大值 缺省值为 8192
/proc/sys/kernel/msgmnb 单个消息体的容量的最大值 缺省值为 16384
/proc/sys/kernel/msgmni 消息体的数量 缺省值为 16
也可以通过字段 msg_qbytes 修改消息队列的总大小。
6 shell手动删除的方法
ipcrm -q msqid
其中,msqid也可以通过ipcs查看
可以知道这个环境中key值1234(4d2)对应的msqid是32768。
执行ipcrm -q 32768 就可以删除消息队列。
三 使用实例
1 向消息队列写数据
/************************************************************************* > File Name: testmsgqueueW.c > Author: qiaozp > Mail: [email protected] > Created Time: 2014-9-19 17:15:09 > Step: 1 连接消息队列 2 读数据 ************************************************************************/ #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #include <iostream> #include <stdio.h> #include <string.h> #include <errno.h> using namespace std; #define KEY 1234 #define MAX_BUFF_SIZE 512 typedef struct { long mtype; char mtext[MAX_BUFF_SIZE]; }_msgbuf; int main() { //1 创建消息队列 int msgid = 0; if((msgid = msgget((key_t)KEY, 0666|IPC_CREAT)) == -1) { cout << "创建key值为[" << KEY << "]的消息队列失败" << endl; return -1; } _msgbuf buff; //2 向消息队列中写数据 while(1) { cout << "请输入要放入消息内存中的数据:"; scanf("%s", buff.mtext); if(msgsnd(msgid, (void*)&buff, MAX_BUFF_SIZE, 0) == -1) { cout << "向key值为[" << KEY << "]的消息队列发送数据失败" << endl; return -1; } if(strncasecmp(buff.mtext, "end", strlen("end")) == 0) { //结束的时候删除消息队列 if(msgctl(msgid, IPC_RMID, NULL) == -1) { cout << "删除key值为[" << KEY << "]的消息队列失败" << endl; return -1; } break; } } return 0; }
2 从消息队列读取数据
/************************************************************************* > File Name: testmsgqueueR.c > Author: qiaozp > Mail: [email protected] > Created Time: 2014-9-19 17:15:28 > Step: 1 创建消息队列 2 写数据 3 end退出 ************************************************************************/ #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #include <iostream> #include <stdio.h> #include <string.h> #include <errno.h> using namespace std; #define KEY 1234 #define MAX_BUFF_SIZE 512 typedef struct { long mtype; char mtext[MAX_BUFF_SIZE]; }_msgbuf; int main() { //1 创建消息队列 int msgid = 0; if((msgid = msgget((key_t)KEY, 0666|IPC_CREAT)) == -1) { cout << "创建key值为[" << KEY << "]的消息队列失败" << endl; return -1; } _msgbuf buff; //2 向消息队列中写数据 cout << "开始从消息队列中获取数据,请在服务端输入数据..." << endl; while(1) { if(msgrcv(msgid, (void*)&buff, MAX_BUFF_SIZE, 0, 0) == -1) { cout << "从key值为[" << KEY << "]的消息队列获取数据失败" << endl; return -1; } cout << "获取到的数据为 : " << buff.mtext << endl; if(strncasecmp(buff.mtext, "end", strlen("end")) == 0) { cout << "退出." << endl; break; } } return 0; }