Linux 报文队列
- Linux 报文队列
- 一IPC 概述
- 二报文队列
- 1 报文队列简述
- 2 代码分析
- 21 msgget 创建报文队列
- 22 msgsnd 报文发送
- 221 相关数据结构
- 222 sys_msgsnd源码分析
- 3 msgrcv 报文接收
- 4 msgctl 报文控制与设置
一、IPC 概述
早期的Unix系统进程件通信机制主要有管道和信号。管道开始只能在近亲之间通信,于是将pipe推广到VFS层面,形成了FIFO。但有两个显著的缺点:信号能传递的信息太少。而管道只能传递无格式的字节流。于是,为了应对OS的日益发展,IPC新的机制出现,包括了:
- 报文传递
- 共享内存
- 进程同步
- 信号量
- 互斥信号量
- rendezvous
以上三种IPC机制,称之为“system V IPC”Linux为 system V IPC提供了一个统一的系统调用ipc,其接口为:
int ipc(unsigned int call, int first, int second, int third, void *ptr, int forth);
其中第一个参数call为操作码,定义如下:
[include/asm-i386/ipc.h]
13 /*
14 *SEM为信号量设置的
15 *MSG为报文传递设置的
16 *SHM为共享内存设置的
17 * */
18 #define SEMOP 1
19 #define SEMGET 2
20 #define SEMCTL 3
21 #define MSGSND 11
22 #define MSGRCV 12
23 #define MSGGET 13
24 #define MSGCTL 14
25 #define SHMAT 21
26 #define SHMDT 22
27 #define SHMGET 23
28 #define SHMCTL 24
C语言库函数为ipc提供了semget()、msgget()、msgsnd()等库函数,这些库函数最终都落实到统一的系统调用ipc()当中去。
二、报文队列
1.1 报文队列简述
进程可以调用库函数msgget()创建一个报文队列,实际上就是通过操作码为MSGGET的ipc系统调用建立报文队列。报文队列通过一个键值key来标示的,而非文件名。一旦报文队列建立之后,进程就可以用相同的键值通过msgget()来取得报文队列的访问,而发送报文的进程也可以通过msgsnd()发送报文到指定的队列中,接收进程则通过msgrcv()来取得报文。另外进程可以通msgctl()对报文队列进行额外的控制。
1.2 代码分析
1.2.1 msgget() 创建报文队列
库函数long sys_msgget(key_t key, int msgflag)
实际上是用MSGGET操作码调用ipc系统调用,该函数有两个作用:
+ 当msgflag的IPC_CREATE位置位时,就利用key创建一个报文队列。
+ 当msgflag的IPC_CREATE位清零时,就了用key查找一个报文队列。
sys_msgget的代码如下:
[ipc/msg.c]
306 asmlinkage long sys_msgget (key_t key, int msgflg)
307 {
308 int id, ret = -EPERM;
309 struct msg_queue *msq;
310
311 down(&msg_ids.sem);
312 /*每个进程都可以建立一个私用的报文队列,其键值为IPC_PRAVETE.
313 * 该报文队列只能用于进程自己收发*/
314 if (key == IPC_PRIVATE)
315 /*建立一个新的报文队列*/
316 ret = newque(key, msgflg);
317 else if ((id = ipc_findkey(&msg_ids, key)) == -1) { /* key not used */
318 /*如果键值对应的报文队列已经建立,则创建失败*/
319 if (!(msgflg & IPC_CREAT))
320 ret = -ENOENT;
321 else
322 /*否则创建一个报文队列*/
323 ret = newque(key, msgflg);
324 } else if (msgflg & IPC_CREAT && msgflg & IPC_EXCL) {
325 ret = -EEXIST;
326 } else {
327 /*如果查找到报文队列,且msgflag表明为查找报文队列,则返回队列id*/
328 msq = msg_lock(id);
329 if(msq==NULL)
330 BUG();
331 if (ipcperms(&msq->q_perm, msgflg))
332 ret = -EACCES;
333 else
334 ret = msg_buildid(id, msq->q_perm.seq);
335 msg_unlock(id);
336 }
337 up(&msg_ids.sem);
338 return ret;
339 }
IPC_PRIVATE作为私有报文队列可以无条件创建,这意味着key可以不唯一。如果不是IPC_PRIVATE则必须保证key的唯一性。首先查找key的报文队列:
- 如果已经存在且msgflag表明要创建报文队列,则创建失败。
- 如果已经存在,但表msgflag表明是获取队列,则返回队列id.
- 如果不存在,且msgflag表明要创建报文队列,则创建一个报文队列且返回id
(注:队列id和队列键值key是不同的概念,队列id类似于打开的文件描述符fd,而队列键值则类似于文件的路径名)
报文队列的创建由newqueue函数来完成。在读该函数之前,有必要了解几个数据结构及其关系。
内核中有一个全局数据结构msg_ids用于管理报文队列。该变量为struct ipc_ids类型的变量。定义于
[ipc/util.h]
15 struct ipc_ids {
16 int size;
17 int in_use;
18 int max_id;
19 unsigned short seq;
20 unsigned short seq_max;
21 struct semaphore sem;
22 spinlock_t ary;
23 struct ipc_id* entries;
24 };
25
26 struct ipc_id {
27 struct kern_ipc_perm* p;
28 };
其中关键成员为entries,该成员指向一个struct ipc_id结构数组。struct ipc_id结构实际上是一个kern_ipc_perm *p,所以可以把entries实际上指向一个kern_ipc_perm数组。每个报文队列都具有一个报文队列头,用于管理该报文队列,也可以说每个报文队列头是一个报文队列对象。而kern_ipc_perm为报文队列头的第一个成员,用于记录该报文队列的用户id,组id,键值key等参数。报文队列头struct msg_queue定义于:
[msg/ipc.h]
68 struct msg_queue {
69 struct kern_ipc_perm q_perm;
70 time_t q_stime; /* last msgsnd time */
71 time_t q_rtime; /* last msgrcv time */
72 time_t q_ctime; /* last change time */
73 unsigned long q_cbytes; /* current number of bytes on queue */
74 unsigned long q_qnum; /* number of messages in queue */
75 unsigned long q_qbytes; /* max number of bytes on queue */
76 pid_t q_lspid; /* pid of last msgsnd */
77 pid_t q_lrpid; /* last receive pid */
78
79 struct list_head q_messages;
80 struct list_head q_receivers;
81 struct list_head q_senders;
82 };
现在梳理一下上面几个数据结构中的关系:
- msg_ids中有个entries成员指向了一个kern_ipc_perm类型的数组
- kern_ipc_perm结构记录了一个报文队列的重要参数
- kern_ipc_perm为报文队列头msg_queue的第一个成员,其起始地址与报文队列头一致。也就是说,只要找到了kern_ipc_perm就可以找到报文队列头,即报文队列。
现在看newque的代码。
[ipc/msg.c]
117 static int newque (key_t key, int msgflg)
118 {
119 int id;
120 /*报文队列头*/
121 struct msg_queue *msq;
122
123 msq = (struct msg_queue *) kmalloc (sizeof (*msq), GFP_KERNEL);
124 if (!msq)
125 return -ENOMEM;
126 /*分配一个报文队列id,实际是上将msg_ids中的entry分配一个位置
127 * 然后将该位置的指针指向msg->perm即可*/
128 id = ipc_addid(&msg_ids, &msq->q_perm, msg_ctlmni);
129 if(id == -1) {
130 kfree(msq);
131 return -ENOSPC;
132 }
133 /*填充msg_queue中的各个成员*/
134 msq->q_perm.mode = (msgflg & S_IRWXUGO);
135 msq->q_perm.key = key;
136
137 msq->q_stime = msq->q_rtime = 0;
138 msq->q_ctime = CURRENT_TIME;
139 msq->q_cbytes = msq->q_qnum = 0;
140 msq->q_qbytes = msg_ctlmnb;
141 msq->q_lspid = msq->q_lrpid = 0;
142 INIT_LIST_HEAD(&msq->q_messages);
143 INIT_LIST_HEAD(&msq->q_receivers);
144 INIT_LIST_HEAD(&msq->q_senders);
145 msg_unlock(id);
146
147 /*根据msg_ids.entries数组的索引值生成一个id号并且返回*/
148 return msg_buildid(id,msq->q_perm.seq);
149 }
上述的代码中,报文队列对象分配出来后,要注册到msg_ids中,注册操作由函数ipc_addid完成。该函数的主要工作是在msg_ids->entries中查找一个空闲的位置,并且将struct kern_ipc_perm q_perm插入该文职,然后返回索引值id.最后根据索引之生成一个全局的id并且返回即可。可以看出来,报文队列的创建与打开,与进程的文件创建与打开类似。之后进程就可以使用返回的id从msg_ids中索引得到报文队列头,从而能堆报文队列执行发送,接受,控制等等操作了。
1.2.2 msgsnd() 报文发送
1.2.2.1 相关数据结构
调用msgget()创建了报文队列之后,就可以通过msgsnd向队列发送消息了。在读msgsnd的源码之前,有必要了解几个用到的数据结构。
用户空间的报文,报文头为struct msgbuf,其承载报文内容。struct msgbuf定义如下:
[include/linux/msg.h]
34 /* message buffer for msgsnd and msgrcv calls */
35 struct msgbuf {
36 long mtype; /* type of message */
37 char mtext[1]; /* message text */
38 };
mtype标示了报文的类型。而mtext则为一个长度为1的数组,事实上mtext可以看作一个指针,指向了报文的起始地址。内核中使用的报文头为struct msg_msg结构,定义如下:
[ipc/msg.h]
56 struct msg_msg {
57 struct list_head m_list;
58 long m_type;
59 int m_ts; /* message text size */
60 struct msg_msgseg* next;
61 /* the actual message follows immediately */
62 };
内核中的报文头中并没有任何指针成员指向报文内容,而是直接将报文内容安排到了紧接着报文头的位置。有时候报文内容太多,体积过大,就将报文分成多个报文段存放,其中报文头中的struct msg_msgseg指针就用于指向各个报文段头。msg_msgseg定义如下:
51 struct msg_msgseg {
52 struct msg_msgseg* next;
53 /* the next part of the message follows immediately */
54 };
报文段的存放也是将报文段内同安排到紧接着msg_msgseg的位置。报文头和报文段头的关系是这样的:
如果报文体积较小,那么直接安排到紧挨着报文头的位置即可。如果报文提及大于一个物理页的大小,就将其大于物理页的部分安排到一个新的页中,并且页的起始地址为一个msg_msgseg结构,也就是报文段头,报文内同则紧挨着msg_msgseg排列,然后msg_msg中的next指针指向了msg_msgseg结构。
1.2.2.2 sys_msgsnd源码分析
msgsnd的内核实现为sys_msgsnd,该函数的定义如下:
[ipc/msg.c]
645 asmlinkage long sys_msgsnd (int msqid, struct msgbuf *msgp, size_t msgsz, int msgflg)
646 {
647 struct msg_queue *msq;
648 struct msg_msg *msg;
649 long mtype;
650 int err;
651 /*参数检查*/
652 if (msgsz > msg_ctlmax || (long) msgsz < 0 || msqid < 0)
653 return -EINVAL;
654 if (get_user(mtype, &msgp->mtype))
655 return -EFAULT;
656 if (mtype < 1)
657 return -EINVAL;
658 /*将msgbuf总的报文内容从用户空间复制过来,填充到msg_msg结构体当中去*/
659 msg = load_msg(msgp->mtext, msgsz);
660 if(IS_ERR(msg))
661 return PTR_ERR(msg);
662
663 msg->m_type = mtype;
664 msg->m_ts = msgsz;
665
666 /*根据id找到报文队列并且上锁*/
667 msq = msg_lock(msqid);
668 err=-EINVAL;
669 if(msq==NULL)
670 goto out_free;
671 /*参数检查结束*/
672 retry:
673 err= -EIDRM;
674 /*检查id的正确性*/
675 if (msg_checkid(msq,msqid))
676 goto out_unlock_free;
677
678 /*权限检查,不是每个进程都有资格发送报文的*/
679 err=-EACCES;
680 if (ipcperms(&msq->q_perm, S_IWUGO))
681 goto out_unlock_free;
682
683 /* msq->q_cbytes为当前队列大小,msq->q_qbytes为当前队列的容量
684 * msq->q_qnum为报文当前个数,最大报文个数不能超过msq->q_qbytes
685 * 这里是检查容量是否足以容下报文,并且检查报文个数是否超过最大
686 * 限制
687 * */
688 if(msgsz + msq->q_cbytes > msq->q_qbytes ||
689 1 + msq->q_qnum > msq->q_qbytes) {
690 struct msg_sender s;
691
692 /*如果报文不能送达,检查msgflag是否使用无阻塞访问
693 * 如果使用无阻塞访问,则立即返回出错码
694 * 如果没有使用无阻塞访问,则进入睡眠状态*/
695 if(msgflg&IPC_NOWAIT) {
696 err=-EAGAIN;
697 goto out_unlock_free;
698 }
699
700 /*将发送进程加入msq->sender链表中,这样就可以在接受者
701 * 接收完毕之后,报文队列有了新的空间,接受这就唤醒
702 * sender队列上的进程*/
703 ss_add(msq, &s);
704 /*解锁报文队列*/
705 msg_unlock(msqid);
706 /*主动调度,此时发送进程正式进入休眠*/
707 schedule();
708 current->state= TASK_RUNNING;
709
710 msq = msg_lock(msqid);
711 err = -EIDRM;
712 /*进程醒来之后需要进行新的检查,因为各个条件可能在
713 * 进程睡眠时期发生改变*/
714 if(msq==NULL)
715 goto out_free;
716 ss_del(&s);
717
718 /*如果睡眠期间有信号投递到进程,那就立即返回处理信号。
719 * 很可能是信号唤醒的进程*/
720 if (signal_pending(current)) {
721 err=-EINTR;
722 goto out_unlock_free;
723 }
724 /*如前文所述,需要再次检查*/
725 goto retry;
726 }
727
728 /*如果没有阻塞发生,就可以发送消息了。如果有进程正在
729 * 等待接受消息,那么就没有必要将消息挂入队列,直接调用
730 * pipelined_send将消息递交给进程即可*/
731 if(!pipelined_send(msq,msg)) {
732 /* noone is waiting for this message, enqueue it */
733 /*将消息msg_msg挂入消息队列头msq->q_messages队列中去*/
734 list_add_tail(&msg->m_list,&msq->q_messages);
735 msq->q_cbytes += msgsz;
736 msq->q_qnum++;
737 atomic_add(msgsz,&msg_bytes);
738 atomic_inc(&msg_hdrs);
739 }
740
741 err = 0;
742 msg = NULL;
743 msq->q_lspid = current->pid;
744 msq->q_stime = CURRENT_TIME;
745
746 out_unlock_free:
747 msg_unlock(msqid);
748 out_free:
749 if(msg!=NULL)
750 free_msg(msg);
751 return err;
752 }
sys_msg完成的工作有:
(1) 通过id值查找到报文队列msgqueue
(2) 将报文拷贝到内核空间并且建立报文头msg_msg。
(3) 将报文头挂接到msgqueue
具体实现细节已经详细注释,这里不再赘述。
1.3 msgrcv 报文接收
msgrcv的代码如下:
[ipc/msg.c]
773 asmlinkage long sys_msgrcv (int msqid, struct msgbuf *msgp, size_t msgsz,
774 long msgtyp, int msgflg)
775 {
776 struct msg_queue *msq;
777 struct msg_receiver msr_d;
778 struct list_head* tmp;
779 struct msg_msg* msg, *found_msg;
780 int err;
781 int mode;
782 /*开始参数检查*/
783 if (msqid < 0 || (long) msgsz < 0)
784 return -EINVAL;
785 mode = convert_mode(&msgtyp,msgflg);
786
787 /*获取报文队列*/
788 msq = msg_lock(msqid);
789 if(msq==NULL)
790 return -EINVAL;
791 /*参数检查结束*/
792 retry:
793 err=-EACCES;
794 /*权限检查*/
795 if (ipcperms (&msq->q_perm, S_IRUGO))
796 goto out_unlock;
797
798 tmp = msq->q_messages.next;
799 found_msg=NULL;
800 /*遍历队列取出报文*/
801 while (tmp != &msq->q_messages) {
802 msg = list_entry(tmp,struct msg_msg,m_list);
803 /*测试条件是否满足
804 * 如果msgtyp >= 0,则查找到一个msg->m_type == msgtyp的报文将其返回
805 * 如果msgtyp < 0,则查找到一个msg->m_type < msgtype的最小值的报文将其返回
806 * */
807 if(testmsg(msg,msgtyp,mode)) {
808 found_msg = msg;
809 if(mode == SEARCH_LESSEQUAL && msg->m_type != 1) {
810 found_msg=msg;
811 msgtyp=msg->m_type-1;
812 } else {
813 found_msg=msg;
814 break;
815 }
816 }
817 tmp = tmp->next;
818 }
819 if(found_msg) {
820 msg=found_msg;
821
822 /*如果查找到了报文,且用户提供的缓冲区大小不够大,
823 * 有设置了不能截断的标志,则返回出错码*/
824 if ((msgsz < msg->m_ts) && !(msgflg & MSG_NOERROR)) {
825 err=-E2BIG;
826 goto out_unlock;
827 }
828
829 /*从报文队列中将报文取出*/
830 list_del(&msg->m_list);
831 msq->q_qnum--;
832 msq->q_rtime = CURRENT_TIME;
833 msq->q_lrpid = current->pid;
834 msq->q_cbytes -= msg->m_ts;
835 atomic_sub(msg->m_ts,&msg_bytes);
836 atomic_dec(&msg_hdrs);
837
838 /*如果有进程正在等待发送报文,则唤醒进程*/
839 ss_wakeup(&msq->q_senders,0);
840 msg_unlock(msqid);
841 out_success:
842 msgsz = (msgsz > msg->m_ts) ? msg->m_ts : msgsz;
843 if (put_user (msg->m_type, &msgp->mtype) ||
844 store_msg(msgp->mtext, msg, msgsz)) {
845 msgsz = -EFAULT;
846 }
847 free_msg(msg);
848 return msgsz;
849 } else
850 {
851 struct msg_queue *t;
852 /* no message waiting. Prepare for pipelined
853 * receive.
854 */
855 /*如果没有查找到报文,且设置了非阻塞方式访问,则立即返回出错码*/
856 if (msgflg & IPC_NOWAIT) {
857 err=-ENOMSG;
858 goto out_unlock;
859 }
860
861 /*否则进程睡眠*/
862 list_add_tail(&msr_d.r_list,&msq->q_receivers);
863 msr_d.r_tsk = current;
864 msr_d.r_msgtype = msgtyp;
865 msr_d.r_mode = mode;
866 if(msgflg & MSG_NOERROR)
867 msr_d.r_maxsize = INT_MAX;
868 else
869 msr_d.r_maxsize = msgsz;
870 msr_d.r_msg = ERR_PTR(-EAGAIN);
871 current->state = TASK_INTERRUPTIBLE;
872 msg_unlock(msqid);
873
874 schedule();
875 current->state = TASK_RUNNING;
876
该函数主要作的工作就是:
+ 根据id值查找得到报文队列头。
+ 遍历报文队列头中的报文队列
+ 比对报文队列中的报文类型与传进来的参数msgtyp,判断是否查找成功
+ 如果报文查找成功,且用户提供的缓冲区足够大,则将报文复制到用户缓冲区内并且返回即可。
+ 如果报文查找成功,但用户提供的缓冲区不够大,且设置了不能截短报文,则返回出错码。
+ 如果报文查找失败,且用户设置了非阻塞访问标志,则返回出错码,否则进程睡眠
1.4 msgctl() 报文控制与设置
报文机制对比管道机制的一大优点就是报文队列可以通过msgctl获取其状态信息,和设置相关参数。内核实现为long sys_msgsctl(int msqid, int cmd, struct msqid_ds *buf)
其中msqid为报文队列的id, cmd 为具体命令码。定义为:
[include/linux/ipc.h]
34 /*
35 * Control commands used with semctl, msgctl and shmctl
36 * see also specific commands in sem.h, msg.h and shm.h
37 */
38 #define IPC_RMID 0 /* remove resource */
39 #define IPC_SET 1 /* set ipc_perm options */
40 #define IPC_STAT 2 /* get ipc_perm options */
41 #define IPC_INFO 3 /* see ipcs */
这些命令吗并不是专门为报文队列设置的。它适用于所有system V IPC.不过对于具体的机制还有其它具体的专用命令,对于报文队列而言,还有另外两个专用的命令码:
[include/linux/msg.h]
6 /* ipcs ctl commands */
7 #define MSG_STAT 11
8 #define MSG_INFO 12
最后一个参数buf为一个msqid_ds结构指针,这个结构用于IPC_STAT和IPC_SET定义如下:
[include/linux/msg.h]
15 struct msqid_ds {
16 struct ipc_perm msg_perm;
17 struct msg *msg_first; /* first message on queue,unused */
18 struct msg *msg_last; /* last message in queue,unused */
19 __kernel_time_t msg_stime; /* last msgsnd time */
20 __kernel_time_t msg_rtime; /* last msgrcv time */
21 __kernel_time_t msg_ctime; /* last change time */
22 unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */
23 unsigned long msg_lqbytes; /* ditto */
24 unsigned short msg_cbytes; /* current number of bytes on queue */
25 unsigned short msg_qnum; /* number of messages in queue */
26 unsigned short msg_qbytes; /* max number of bytes on queue */
27 __kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */
28 __kernel_ipc_pid_t msg_lrpid; /* last receive pid */
29 };
当命令码为IPC_INFO时,buf指向一个msginfo结构
[include/linux/msg.h]
40 /* buffer for msgctl calls IPC_INFO, MSG_INFO */
41 struct msginfo {
42 int msgpool;
43 int msgmap;
44 int msgmax;
45 int msgmnb;
46 int msgmni;
47 int msgssz;
48 int msgtql;
49 unsigned short msgseg;
50 };
值得注意的是:
- 在使用SET命令设置报文队列时,如果有进程正在接收报文,则全部出错返回。
- IPC_RMID命令用来撤销一个报文队列id,相当与关闭文件。