操作系统课程设计 消息缓冲队列通信

消息缓冲队列通信机制其基本思想是根据“生产者——消费者”原理,利用内存中公用消息缓冲区实现进程间的信息交换。

在这种通信机制中,首先需要在内存中开辟若干空闲消息缓冲区,用以存放要通信的消息。每当一个进程需要向另一个进程发送消息时,便向系统申请一个空闲消息缓冲区,并把已准备好的消息复制到该缓冲区,然后把该消息缓冲区插入到接收进程的消息队列中,最后通知接收进程。接收进程接收到发送进程发来的通知后,从本进程的消息队列中摘下一消息缓冲区,取出其中的信息,然后把消息缓冲区作为空闲消息缓冲区归还给系统。系统负责管理公用消息缓冲区以及消息的传递。

  1 // 消息缓冲队列
  2 // 2016.1.7
  3
  4 #include <stdlib.h>
  5 #include <dos.h>
  6 #include <stdio.h>
  7
  8 #define GET_INDOS 0x34         /* 34H 系统功能调用    */
  9 #define GET_CRIT_ERR 0x5d06    /* 5D06H号系统功能调用 */
 10
 11 #define BLANK -1
 12 #define FINISHED 0        /*     终止    */
 13 #define RUNNING 1         /*   执行  */
 14 #define READY 2           /*   就绪  */
 15 #define BLOCKED 3         /*   阻塞  */
 16 #define NTCB 3            /* 系统线程的最大数 */
 17
 18 #define TL 10             /* 时间片大小  */
 19 #define NBUF 2            /* 消息缓冲区数目 */
 20 #define NTEXT 50          /* 文本输出大小  */
 21
 22
 23 char far* intdos_ptr=0;
 24 char far* crit_err_ptr=0;
 25 int timecount=0;
 26 int current=-1;
 27
 28 typedef unsigned int UINT16;
 29
 30 typedef struct/* 信号量 */
 31 {
 32   int value;
 33   struct TCB* wq;
 34 }semaphore;
 35
 36 semaphore mutexfb={1,NULL};        // freebuf 互斥变量     初值 1
 37 semaphore sfb={2,NULL};            // 计数信号量
 38 semaphore bufferSem1, bufferSem2;
 39
 40 // 消息缓冲区
 41 // 空闲缓冲队列 freebuf(临界资源)
 42 struct buffer
 43 {
 44   int sender;             /*消息发送者的标识数 */
 45   int size;               /* 消息长度<=NTEXT 个字节   */
 46   char text[NTEXT];       /* 消息正文   */
 47   struct buffer* next;    /* 指向下一个消息缓冲区的指针 */
 48 } *freebuf;
 49
 50 /* 线程控制块  */
 51 struct TCB
 52 {
 53   unsigned char* stack;          /* 堆栈的起始地址  */
 54   unsigned ss;
 55   unsigned sp;            /* 堆栈段址和堆栈指针 */
 56   char state;             /* 进程状态   */
 57   char name[10];          /* 线程的外部标识符  */
 58   int value;              /*优先级*/
 59   struct TCB* next;       /* 指向控制快指针  */
 60   struct buffer* mq;      /* 消息缓冲队列首指针  */
 61   semaphore mutex;        /* 互斥信号量   */
 62   semaphore sm;           /* 消息缓冲队列计数信号量*/
 63 }tcb[NTCB];
 64
 65 /* 堆栈现场保护和恢复结构体 */
 66 struct int_regs
 67 {
 68   unsigned BP,DI,SI,DS,ES,DX,CX,BX,AX,IP,CS,Flags,off,seg;
 69 };
 70
 71 typedef int(far* codeptr)(void);
 72 void interrupt(*old_int8)(void);
 73 int DosBusy(void);
 74 void InitIndos(void);
 75 void InitTcb();
 76 void interrupt new_int8(void);
 77 void interrupt swtch();
 78 void send(char *receiver,char *a,int size);
 79 int receive(char *sender,char *a);
 80 void p(semaphore *sem);
 81 void v(semaphore *sem);
 82 int Create(char* name,codeptr code,int stacklen,int prio);  /* 创建线程 */
 83 void Destroy(int i);
 84
 85
 86 // 1#线程
 87 void f1()
 88 {
 89
 90   while(1)
 91   {
 92         p(&bufferSem1);
 93
 94       send("f2","f1 send message to f2",NTEXT);
 95
 96       printf("f1 sending!\n");
 97
 98       v(&bufferSem2);
 99   }
100 }
101
102 // 2#线程
103 void f2()
104 {
105   char a[NTEXT];
106
107   while(1)
108   {
109         p(&bufferSem2);
110
111       receive("f1",a);
112
113       printf("f2 receiving!\n");
114
115       v(&bufferSem1);
116   }
117 }
118
119
120 void InitInDos()      /* 取得INDOS标志和严重错误标志地址 */
121 {
122   union REGS regs;
123   struct SREGS segregs;
124
125   regs.h.ah=GET_INDOS;      /* 使用34H号系统功能调用 */
126   intdosx(&regs,&regs,&segregs);
127
128   intdos_ptr=MK_FP(segregs.es,regs.x.bx);
129   if(_osmajor<3)
130     crit_err_ptr=intdos_ptr+1;      /* 严重错误在INDOS后一字节处 */
131   else if(_osmajor==3&&_osminor==0)
132     crit_err_ptr=intdos_ptr-1;      /* 严重错误在INDOS前一字节处 */
133   else
134     {
135      regs.x.ax=GET_CRIT_ERR;
136      intdosx(&regs,&regs,&segregs);
137      crit_err_ptr=MK_FP(segregs.ds,regs.x.si);
138    }
139 }
140
141 int DosBusy(void)            /* 判断DOS是否忙 */
142 {
143   if(intdos_ptr&&crit_err_ptr)
144     return(*intdos_ptr||*crit_err_ptr);  /* DOS忙,返回严重错误标志 */
145    else
146     return(-1);         /* DOS不忙 */
147 }
148
149 void InitTcb()           /* 初始化线程 */
150 {
151   int i;
152
153   for(i=0;i<NTCB;i++)
154    {
155     tcb[i].state=BLANK;       /* 初始状态标志   */
156     tcb[i].mq=NULL;
157     tcb[i].mutex.value=1;
158     tcb[i].mutex.wq=NULL;
159     tcb[i].sm.value=0;
160     tcb[i].sm.wq=NULL;
161    }
162 }
163
164 void Destroy(int i)
165 {
166
167  if(tcb[i].state==RUNNING)
168  {
169    disable();
170    tcb[i].state=FINISHED;
171    strcpy(tcb[i].name,NULL);
172    free(tcb[i].stack);
173    tcb[i].ss=0;
174    tcb[i].sp=0;
175    enable();
176   }
177
178 }
179
180 void over()
181 {
182    Destroy(current);
183    swtch();
184 }
185
186 int Create(char *name,codeptr code,int stacklen,int value)
187 {
188    int i;
189    char *p;
190    struct int_regs *pt;
191    unsigned int *pp;
192
193    for(i=1;i<NTCB;i++)
194    {
195      if(tcb[i].state==BLANK||tcb[i].state==FINISHED)
196          break;
197    }
198    if(i==NTCB)
199      return-1;
200
201    tcb[i].value=value;
202    strcpy(tcb[i].name,name);
203    tcb[i].stack=(p=(unsigned char*)malloc(stacklen));
204    memset(tcb[i].stack, 0xff, stacklen);
205    p=p+stacklen;
206
207 #if 0
208    pt=(struct int_regs*)p;
209    pt--;
210    pt->Flags=0x200;
211    pt->CS=FP_SEG(code);
212    pt->IP=FP_OFF(code);
213
214    pt->off=FP_OFF(over);
215    pt->seg=FP_SEG(over);
216    pt->DS=_DS;
217    pt->ES=_ES;
218    tcb[i].sp=FP_OFF(pt);
219    tcb[i].ss=FP_SEG(pt);
220 #else if
221    /*
222    pp=(UINT16 *)(p-2);
223    *(pp)=FP_SEG(over);
224    *(pp-1)=FP_OFF(over);
225    *(pp-2)=0x200;
226    *(pp-3)=FP_SEG(code);
227    *(pp-4)=FP_OFF(code);
228
229    *(pp-9)=_ES;
230    *(pp-10)=_DS;
231    tcb[i].sp=FP_OFF(pp-13);
232    tcb[i].ss=FP_SEG(pp-13);
233    */
234
235    *(p-1)=(FP_SEG(over)&0xff00)>>8;
236    *(p-2)=FP_SEG(over)&0x00ff;
237
238    *(p-3)=(FP_OFF(over)&0xff00)>>8;
239    *(p-4)=FP_OFF(over)&0x00ff;
240
241    *(p-5)=0x02;
242    *(p-6)=0x00;
243
244    *(p-7)=(FP_SEG(code)&0xff00)>>8;
245    *(p-8)=FP_SEG(code)&0x00ff;
246
247    *(p-9)=(FP_OFF(code)&0xff00)>>8;
248    *(p-10)=FP_OFF(code)&0x00ff;
249
250    *(p-19)=(_ES&0xff00)>>8;
251    *(p-20)=_ES&0x00ff;
252
253    *(p-21)=(_DS&0xff00)>>8;
254    *(p-22)=_DS&0x00ff;
255
256    tcb[i].sp=FP_OFF((UINT16 *)(p-28));
257    tcb[i].ss=FP_SEG((UINT16 *)(p-28));
258
259 #endif
260
261    tcb[i].state=READY;
262
263    return i;
264 }
265
266 void tcb_state()        /* 线程状态信息 */
267 {
268   int i;
269
270   for(i=0;i<NTCB;i++)
271     if(tcb[i].state!=BLANK)
272     {
273         switch(tcb[i].state)
274         {
275             case FINISHED:
276             printf("\ntcb[%d] is FINISHED\n",i);
277             break;
278
279             case RUNNING:
280             printf("tcb[%d] is RUNNING\n",i);
281             break;
282             case READY:
283             printf("tcb[%d] is READY\n",i);
284             break;
285             case BLOCKED:
286             printf("tcb[%d] is BLOCKED\n",i);
287
288             break;
289         }
290     }
291 }
292
293 int all_finished()
294 {
295     int i;
296
297     for(i=1;i<NTCB;i++)
298         if(tcb[i].state==RUNNING||tcb[i].state==BLOCKED||tcb[i].state==READY)
299             return 0;
300
301     return 1;
302 }
303
304 int Find()
305 {
306     int i,j;
307     i=current;
308
309     while(tcb[i=((i+1)%NTCB)].state!=READY||i==current);
310
311     return i;
312 }
313
314 void interrupt new_int8(void)       /* CPU 调度*/
315 {
316     int i;
317
318     (*old_int8)();      /* 指向原来时钟中断处理过程入口的中断处理函数指针 */
319     timecount++;
320
321     if(timecount==TL)        /* 时间片是否到? */
322     {
323         if(!DosBusy())     /* DOS是否忙? */
324         {
325             disable();
326
327             tcb[current].ss=_SS;     /* 保存现场 */
328             tcb[current].sp=_SP;
329
330             if(tcb[current].state==RUNNING)
331                 tcb[current].state=READY;
332
333             i=Find();
334             if(i<0)
335                 return;
336
337             _SS=tcb[i].ss;
338             _SP=tcb[i].sp;
339             tcb[i].state=RUNNING;
340             current=i;
341             timecount=0;      /* 重新计时 */
342
343             enable();
344         }
345         else
346             return;
347     }
348     else
349         return;
350 }
351
352 void interrupt swtch()            /* 其他原因CPU调度  */
353 {
354     int i;
355
356     if(tcb[current].state!=FINISHED
357         &&current!=0&&tcb[current].state!=BLOCKED) /* 当前线程还没结束 */
358         return;
359
360     i=Find();
361     if(i<0)
362         return;
363
364     disable();
365     tcb[current].ss=_SS;
366     tcb[current].sp=_SP;
367
368     if(tcb[current].state==RUNNING)
369         tcb[current].state=READY;      /* 放入就绪队列中 */
370
371     _SS=tcb[i].ss;
372     _SP=tcb[i].sp;        /* 保存现场 */
373
374     tcb[i].state=RUNNING;
375     current=i;
376     enable();
377 }
378
379 void block(struct TCB **p)         /* 阻塞原语 */
380 {
381     struct TCB *pp;
382
383     tcb[current].state=BLOCKED;
384
385     if((*p)==NULL)
386         *p=&tcb[current];    /* 阻塞队列空,直接放入 */
387     else
388     {
389         pp=*p;
390         while(pp->next)
391             pp=pp->next;         /* 找到阻塞队列最后一个节点 */
392
393         pp->next=&tcb[current];      /* 放入阻塞队列 */
394     }
395     tcb[current].next=NULL;
396     swtch();       /* 重新进行CPU调度 */
397 }
398
399 void wakeup_first(struct TCB **p)    /* 唤醒队首线程 */
400 {
401   struct TCB *pl;
402
403   if((*p)==NULL)
404       return;
405
406   pl=(*p);
407   (*p)=(*p)->next;     /* 得到阻塞队列队首线程 */
408   pl->state=READY;        /* 修为就绪状态 */
409   pl->next=NULL;
410 }
411
412 void p(semaphore *sem)
413 {
414     struct TCB **qp;
415
416     disable();
417     sem->value=sem->value-1;
418
419     if(sem->value<0)
420     {
421         qp=&(sem->wq);
422         block(qp);
423     }
424     enable();
425 }
426
427 void v(semaphore*sem)
428 {
429     struct TCB **qp;
430
431     disable();
432     qp=&(sem->wq);
433     sem->value=sem->value+1;
434
435     if(sem->value>=0)
436         wakeup_first(qp);
437
438     enable();
439 }
440
441 ///////////////////////////////////////////////////////////////////////////////
442 // buffer
443 struct buffer*Initbuf(void)
444 {
445   struct buffer *p,*pt,*pt2;
446   int i;
447
448   pt2=pt=(struct buffer*)malloc(sizeof(struct buffer));
449   pt->sender=-1;
450   pt->size=0;
451   strcmp(pt->text,"");
452   pt->next=NULL;
453
454   for(i=0;i<NBUF-1;i++)
455   {
456    p=(struct buffer*)malloc(sizeof(struct buffer));
457    p->sender=-1;
458    p->size=0;
459    p->text[NTEXT]=‘\0‘;
460    p->next=NULL;
461    pt2->next=p;
462    pt2=p;
463   }
464
465   return pt;
466 }
467
468 // 从空闲消息缓冲队列队头上取下一缓空闲消息冲区
469 struct buffer* getbuf(void)
470 {
471   struct buffer *buf;
472
473   buf=freebuf;        /* 取得缓冲队列的缓冲区*/
474   freebuf=freebuf->next;
475
476   return(buf);        /* 返回指向该缓冲区的指针 */
477 }
478
479 // 将buff所指的缓冲区插到*mq所指的缓冲队列末尾
480 void insert(struct buffer **mq, struct buffer *buff)
481 {
482   struct buffer *temp;
483
484   if(buff==NULL)
485       return;       /* buff为空 */
486
487   buff->next=NULL;
488   if(*mq==NULL)     /* *mq为空 则直接插入*/
489     *mq=buff;
490   else
491   {
492     temp=*mq;
493     while(temp->next)      /* 找到队尾 */
494         temp=temp->next;
495
496     temp->next=buff;
497   }
498 }
499
500 // 将地址a开始的size个字节发送给外部标识符为receiver的线程
501 void send(char *receiver,char *a, int size)
502 {
503     struct buffer *buff;
504     int i,id=-1;
505
506     disable();
507     for(i=0;i<NTCB;i++)
508     {
509         if(strcmp(receiver,tcb[i].name)==0)
510         {
511             id=i;
512             break;
513         }
514     }
515
516     if(id==-1)
517     {
518         printf("Error:Receiver not exist!\n");
519         enable();
520         return;
521     }
522
523     p(&sfb);
524
525     p(&mutexfb);
526     buff=getbuf();
527     v(&mutexfb);
528
529     buff->sender=current;
530     buff->size=size;
531     buff->next=NULL;
532
533     for(i=0;i<buff->size;i++,a++)
534         buff->text[i]=*a;
535
536 // 将要发送的消息放到接收者TCB的buffer中
537     p(&tcb[id].mutex);
538     insert(&(tcb[id].mq),buff);
539     v(&tcb[id].mutex);
540
541   // 用于同步
542     v(&tcb[id].sm);
543     enable();
544 }
545
546 //////////////////////////////////////////////////////////////////////////////////////////////
547 // 获取消息缓冲区函数
548 struct buffer *remov(struct buffer **mq, int sender)
549 {
550   struct buffer *buff, *p, *q;
551   q = NULL;
552   p = *mq;
553
554   // 在消息缓冲区队列中找到其他进程发送给自己的消息
555   while((p->next != NULL) && (p->sender != sender))
556   {
557     q = p;
558     p = p->next;
559   }
560
561   // 获取消息后从队列中删除,防止重复接收
562   if(p->sender == sender)
563   {
564     buff = p;
565     if(q == NULL)
566       *mq = buff->next;
567     else
568       q->next = buff->next;
569
570     buff->next = NULL;
571     return buff;
572   }
573   else
574       return NULL;
575 }
576
577 // 接收原语
578 int receive(char *sender, char *b)
579 {
580   int i, id = -1;
581   struct buffer *buff;
582
583   disable();
584
585   // 寻找 sender
586   for(i = 0; i < NBUF; i++)
587   {
588     if(strcmp(sender, tcb[i].name) == 0)
589     {
590       id = i;
591       break;
592     }
593   }
594
595   if(id == -1)
596   {
597     enable();
598     return -1;
599   }
600
601   p(&tcb[current].sm);
602
603   p(&tcb[current].mutex);
604   buff = remov(&(tcb[current].mq), id);
605   v(&tcb[current].mutex);
606
607   if(buff == NULL)
608   {
609     v(&tcb[current].sm);
610     enable();
611     return -1;
612   }
613   // 将消息正文复制到接收区
614   strcpy(b, buff->text);
615
616   // 释放前先把标识去掉,防止重复接收
617   buff->sender = -1;
618   // 释放相应的消息缓冲区
619   p(&mutexfb);
620   insert(&freebuf, buff);
621   v(&mutexfb);
622
623   v(&sfb);
624
625   enable();
626
627   return buff->size;
628 }
629
630 void main()
631 {
632   long i, j, k;
633
634   bufferSem1.value = 1;
635   bufferSem1.wq = NULL;
636
637   bufferSem2.value = 0;
638   bufferSem2.wq = NULL;
639
640   InitInDos();
641   InitTcb();
642
643   freebuf=Initbuf();
644   old_int8=getvect(8);
645
646   strcpy(tcb[0].name,"main");
647   tcb[0].state=RUNNING;
648   tcb[0].value=0;
649   current=0;
650
651   Create("f1",(codeptr)f1,1024,5);
652   Create("f2",(codeptr)f2,1024,6);
653
654   tcb_state();
655   setvect(8,new_int8);
656
657   while(!all_finished());
658   {
659
660       printf("running!\n");
661
662   }
663
664   tcb[0].name[0]=‘\0‘;
665   tcb[0].state=FINISHED;
666   setvect(8,old_int8);
667
668   tcb_state();
669
670   printf("\n Muli_task system teminated \n");
671 }
时间: 2024-11-03 21:40:08

操作系统课程设计 消息缓冲队列通信的相关文章

操作系统课程设计摸鱼全纪录

读了几年,都没留下什么东西方便学弟学妹借鉴.感觉这个大三上学期的操作系统课程设计恰好在寒假,大概是最后一次机会了,写点东西. 首先老师要求用的是: ubuntu-18.04 fuse-2.7.0 这种事情当然是使用虚拟机啦!下载并安装VMware,然后下载并安装ubuntu-18.04,拷贝老师给的fuse-2.7.0进去解压.找找看怎么安装. 看了一圈,发现有个README是看得懂的,里面有很多话,我相信来看我博客的人也是摸鱼混及格的,那当然只关心下面的: 那么打开一个Terminal,输入对

文件系统的设计与实现(操作系统课程设计)

 转发请注明:http://blog.csdn.net/tianqingdezhuanlan/article/details/51344739 源码下载地址:http://download.csdn.net/download/u013255737/9513460 目录 一.设计目的.意义 1 二. 设计分析 1 三.方案分析 2 四.功能模块实现 3 五.最终结果分析 4 六. 设计体会 5 一.设计目的.意义 1.通过模拟文件系统的实现,深入理解操作系统中文件系统的理论知识, 加深对教材中的重

操作系统课程设计--Linux平台哲学家问题

哲学家问题是操作系统中资源分配的经典问题 linux平台下的系统api不同于Windows下的实现 要求:一个正确的哲学家程序(不会发生死锁) 一个错误的哲学家程序(会发生死锁) 系统环境:ElementaryOS wrong.c #include<stdio.h> #include<stdlib.h> #include<sys/ipc.h> #include<sys/msg.h> #include<sys/types.h> #include&l

进程间通信之消息队列通信

概念 消息队列 消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法 每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型值 消息队列也有管道一样的不足,就是每条消息的最大长度是有上限的(MSGMAX),每个消息队列的总字节数(内核缓冲上限)是有上限的(MSGMNB),系统上消息队列的总数(消息条目数)也有一个上限(MSGMNI) 对比: 管道 消息 流管道 有边界 先进先出 可以后进入.先出来 消息大小三大限制 cat /proc/sys/kernel/msgmax最

架构设计:系统间通信(20)——MQ:消息协议(下)

(接上文<架构设计:系统间通信(19)--MQ:消息协议(上)>) 上篇文章中我们重点讨论了"协议"的重要性,并为各位读者介绍了Stomp协议和XMPP协议.这两种协议是消息队列中两种不同使用场景下的典型代表.本文主要接续上文的篇幅,继续讨论消息队列中另一种典型协议:AMQP协议. 3-3.AMQP协议 AMQP协议的全称是:Advanced Message Queuing Protocol(高级消息队列协议).目前AMQP协议的版本为 Version 1.0,这个协议标准

【进程编程】——msg进程间的消息队列通信

           [进程编程]--msg进程间的消息队列通信 消息队列就像一个链表,先进先出,就像排队打饭,先到先买!键值 用来获取消息队列的描述字,我感觉就像是一个定位标识符! 函数一     key_t ftok(char *pathname,char proj)返回的是一个键值------>创建队列之前一定要获取这个键值!proj:项目名,不为零即可 打开.创建函数二     int msgget(key_t key,int msgflg) //看见没,这里是要求键值的. key:键值

c语言多线程缓冲队列无锁设计思路

公司里开发的一个项目需要在server端添加多线程缓冲队列,来存取数据,我也是初出茅庐没有太多经验,在网上搜集了大量资料后,终于有了一套自己的设计思路,并解决了项目里的问题,因为当时搜集资料时,发现网上这个的具体文章不是太多或者要么太复杂,要么太简陋,对于新手很难能看懂,所有我就打算将我的设计思路发出来,希望能帮助和我一样的同学朋友们,如有不足请指导!谢谢 项目需求: 两个线程,线程1接收客户端数据并有序的存入缓冲队列:线程2从缓冲队列有序的取出数据并解析插入数据库: 解决方法: 1:新建一个结

服务器设计 - 消息队列

我们所能想到的最简单的消息队列可能就是使用stl的list来实现了,即消息队列内部维护一个list和一个互斥锁,putMessage时将message加入到队列尾,getMessage时从队列头取一个message返回,同时在getMessage和putMessage之前都要求先获取锁资源. 实现虽然简单,但功能是绝对满足需求的,只是性能上可能稍稍有些不尽如人意.其最大的问题在频繁的锁竞争上. 对于如何减少锁竞争次数的优化方案,Ghost Cheng提出了一种.提供一个队列容器,里面有多个队列,

架构设计:系统间通信(33)——其他消息中间件及场景应用(下3)

=================================== (接上文:<架构设计:系统间通信(32)--其他消息中间件及场景应用(下2)>) 5-7.解决方案三:非侵入式方案 以上两种方案中为了让业务系统能够集成日志采集功能,我们或多或少需要在业务系统端编写一些代码.虽然通过一些代码结构的设计,可以减少甚至完全隔离这些代码和业务代码的耦合度,但是毕竟需要业务开发团队花费精力对这些代码进行维护,业务系统部署时业务对这些代码的配置信息做相应的调整. 这里我们再为读者介绍一种非侵入式的日