消息缓冲队列通信机制其基本思想是根据“生产者——消费者”原理,利用内存中公用消息缓冲区实现进程间的信息交换。
在这种通信机制中,首先需要在内存中开辟若干空闲消息缓冲区,用以存放要通信的消息。每当一个进程需要向另一个进程发送消息时,便向系统申请一个空闲消息缓冲区,并把已准备好的消息复制到该缓冲区,然后把该消息缓冲区插入到接收进程的消息队列中,最后通知接收进程。接收进程接收到发送进程发来的通知后,从本进程的消息队列中摘下一消息缓冲区,取出其中的信息,然后把消息缓冲区作为空闲消息缓冲区归还给系统。系统负责管理公用消息缓冲区以及消息的传递。
1 // 消息缓冲队列 2 // 2016.1.7 3 4 #include <stdlib.h> 5 #include <dos.h> 6 #include <stdio.h> 7 8 #define GET_INDOS 0x34 /* 34H 系统功能调用 */ 9 #define GET_CRIT_ERR 0x5d06 /* 5D06H号系统功能调用 */ 10 11 #define BLANK -1 12 #define FINISHED 0 /* 终止 */ 13 #define RUNNING 1 /* 执行 */ 14 #define READY 2 /* 就绪 */ 15 #define BLOCKED 3 /* 阻塞 */ 16 #define NTCB 3 /* 系统线程的最大数 */ 17 18 #define TL 10 /* 时间片大小 */ 19 #define NBUF 2 /* 消息缓冲区数目 */ 20 #define NTEXT 50 /* 文本输出大小 */ 21 22 23 char far* intdos_ptr=0; 24 char far* crit_err_ptr=0; 25 int timecount=0; 26 int current=-1; 27 28 typedef unsigned int UINT16; 29 30 typedef struct/* 信号量 */ 31 { 32 int value; 33 struct TCB* wq; 34 }semaphore; 35 36 semaphore mutexfb={1,NULL}; // freebuf 互斥变量 初值 1 37 semaphore sfb={2,NULL}; // 计数信号量 38 semaphore bufferSem1, bufferSem2; 39 40 // 消息缓冲区 41 // 空闲缓冲队列 freebuf(临界资源) 42 struct buffer 43 { 44 int sender; /*消息发送者的标识数 */ 45 int size; /* 消息长度<=NTEXT 个字节 */ 46 char text[NTEXT]; /* 消息正文 */ 47 struct buffer* next; /* 指向下一个消息缓冲区的指针 */ 48 } *freebuf; 49 50 /* 线程控制块 */ 51 struct TCB 52 { 53 unsigned char* stack; /* 堆栈的起始地址 */ 54 unsigned ss; 55 unsigned sp; /* 堆栈段址和堆栈指针 */ 56 char state; /* 进程状态 */ 57 char name[10]; /* 线程的外部标识符 */ 58 int value; /*优先级*/ 59 struct TCB* next; /* 指向控制快指针 */ 60 struct buffer* mq; /* 消息缓冲队列首指针 */ 61 semaphore mutex; /* 互斥信号量 */ 62 semaphore sm; /* 消息缓冲队列计数信号量*/ 63 }tcb[NTCB]; 64 65 /* 堆栈现场保护和恢复结构体 */ 66 struct int_regs 67 { 68 unsigned BP,DI,SI,DS,ES,DX,CX,BX,AX,IP,CS,Flags,off,seg; 69 }; 70 71 typedef int(far* codeptr)(void); 72 void interrupt(*old_int8)(void); 73 int DosBusy(void); 74 void InitIndos(void); 75 void InitTcb(); 76 void interrupt new_int8(void); 77 void interrupt swtch(); 78 void send(char *receiver,char *a,int size); 79 int receive(char *sender,char *a); 80 void p(semaphore *sem); 81 void v(semaphore *sem); 82 int Create(char* name,codeptr code,int stacklen,int prio); /* 创建线程 */ 83 void Destroy(int i); 84 85 86 // 1#线程 87 void f1() 88 { 89 90 while(1) 91 { 92 p(&bufferSem1); 93 94 send("f2","f1 send message to f2",NTEXT); 95 96 printf("f1 sending!\n"); 97 98 v(&bufferSem2); 99 } 100 } 101 102 // 2#线程 103 void f2() 104 { 105 char a[NTEXT]; 106 107 while(1) 108 { 109 p(&bufferSem2); 110 111 receive("f1",a); 112 113 printf("f2 receiving!\n"); 114 115 v(&bufferSem1); 116 } 117 } 118 119 120 void InitInDos() /* 取得INDOS标志和严重错误标志地址 */ 121 { 122 union REGS regs; 123 struct SREGS segregs; 124 125 regs.h.ah=GET_INDOS; /* 使用34H号系统功能调用 */ 126 intdosx(®s,®s,&segregs); 127 128 intdos_ptr=MK_FP(segregs.es,regs.x.bx); 129 if(_osmajor<3) 130 crit_err_ptr=intdos_ptr+1; /* 严重错误在INDOS后一字节处 */ 131 else if(_osmajor==3&&_osminor==0) 132 crit_err_ptr=intdos_ptr-1; /* 严重错误在INDOS前一字节处 */ 133 else 134 { 135 regs.x.ax=GET_CRIT_ERR; 136 intdosx(®s,®s,&segregs); 137 crit_err_ptr=MK_FP(segregs.ds,regs.x.si); 138 } 139 } 140 141 int DosBusy(void) /* 判断DOS是否忙 */ 142 { 143 if(intdos_ptr&&crit_err_ptr) 144 return(*intdos_ptr||*crit_err_ptr); /* DOS忙,返回严重错误标志 */ 145 else 146 return(-1); /* DOS不忙 */ 147 } 148 149 void InitTcb() /* 初始化线程 */ 150 { 151 int i; 152 153 for(i=0;i<NTCB;i++) 154 { 155 tcb[i].state=BLANK; /* 初始状态标志 */ 156 tcb[i].mq=NULL; 157 tcb[i].mutex.value=1; 158 tcb[i].mutex.wq=NULL; 159 tcb[i].sm.value=0; 160 tcb[i].sm.wq=NULL; 161 } 162 } 163 164 void Destroy(int i) 165 { 166 167 if(tcb[i].state==RUNNING) 168 { 169 disable(); 170 tcb[i].state=FINISHED; 171 strcpy(tcb[i].name,NULL); 172 free(tcb[i].stack); 173 tcb[i].ss=0; 174 tcb[i].sp=0; 175 enable(); 176 } 177 178 } 179 180 void over() 181 { 182 Destroy(current); 183 swtch(); 184 } 185 186 int Create(char *name,codeptr code,int stacklen,int value) 187 { 188 int i; 189 char *p; 190 struct int_regs *pt; 191 unsigned int *pp; 192 193 for(i=1;i<NTCB;i++) 194 { 195 if(tcb[i].state==BLANK||tcb[i].state==FINISHED) 196 break; 197 } 198 if(i==NTCB) 199 return-1; 200 201 tcb[i].value=value; 202 strcpy(tcb[i].name,name); 203 tcb[i].stack=(p=(unsigned char*)malloc(stacklen)); 204 memset(tcb[i].stack, 0xff, stacklen); 205 p=p+stacklen; 206 207 #if 0 208 pt=(struct int_regs*)p; 209 pt--; 210 pt->Flags=0x200; 211 pt->CS=FP_SEG(code); 212 pt->IP=FP_OFF(code); 213 214 pt->off=FP_OFF(over); 215 pt->seg=FP_SEG(over); 216 pt->DS=_DS; 217 pt->ES=_ES; 218 tcb[i].sp=FP_OFF(pt); 219 tcb[i].ss=FP_SEG(pt); 220 #else if 221 /* 222 pp=(UINT16 *)(p-2); 223 *(pp)=FP_SEG(over); 224 *(pp-1)=FP_OFF(over); 225 *(pp-2)=0x200; 226 *(pp-3)=FP_SEG(code); 227 *(pp-4)=FP_OFF(code); 228 229 *(pp-9)=_ES; 230 *(pp-10)=_DS; 231 tcb[i].sp=FP_OFF(pp-13); 232 tcb[i].ss=FP_SEG(pp-13); 233 */ 234 235 *(p-1)=(FP_SEG(over)&0xff00)>>8; 236 *(p-2)=FP_SEG(over)&0x00ff; 237 238 *(p-3)=(FP_OFF(over)&0xff00)>>8; 239 *(p-4)=FP_OFF(over)&0x00ff; 240 241 *(p-5)=0x02; 242 *(p-6)=0x00; 243 244 *(p-7)=(FP_SEG(code)&0xff00)>>8; 245 *(p-8)=FP_SEG(code)&0x00ff; 246 247 *(p-9)=(FP_OFF(code)&0xff00)>>8; 248 *(p-10)=FP_OFF(code)&0x00ff; 249 250 *(p-19)=(_ES&0xff00)>>8; 251 *(p-20)=_ES&0x00ff; 252 253 *(p-21)=(_DS&0xff00)>>8; 254 *(p-22)=_DS&0x00ff; 255 256 tcb[i].sp=FP_OFF((UINT16 *)(p-28)); 257 tcb[i].ss=FP_SEG((UINT16 *)(p-28)); 258 259 #endif 260 261 tcb[i].state=READY; 262 263 return i; 264 } 265 266 void tcb_state() /* 线程状态信息 */ 267 { 268 int i; 269 270 for(i=0;i<NTCB;i++) 271 if(tcb[i].state!=BLANK) 272 { 273 switch(tcb[i].state) 274 { 275 case FINISHED: 276 printf("\ntcb[%d] is FINISHED\n",i); 277 break; 278 279 case RUNNING: 280 printf("tcb[%d] is RUNNING\n",i); 281 break; 282 case READY: 283 printf("tcb[%d] is READY\n",i); 284 break; 285 case BLOCKED: 286 printf("tcb[%d] is BLOCKED\n",i); 287 288 break; 289 } 290 } 291 } 292 293 int all_finished() 294 { 295 int i; 296 297 for(i=1;i<NTCB;i++) 298 if(tcb[i].state==RUNNING||tcb[i].state==BLOCKED||tcb[i].state==READY) 299 return 0; 300 301 return 1; 302 } 303 304 int Find() 305 { 306 int i,j; 307 i=current; 308 309 while(tcb[i=((i+1)%NTCB)].state!=READY||i==current); 310 311 return i; 312 } 313 314 void interrupt new_int8(void) /* CPU 调度*/ 315 { 316 int i; 317 318 (*old_int8)(); /* 指向原来时钟中断处理过程入口的中断处理函数指针 */ 319 timecount++; 320 321 if(timecount==TL) /* 时间片是否到? */ 322 { 323 if(!DosBusy()) /* DOS是否忙? */ 324 { 325 disable(); 326 327 tcb[current].ss=_SS; /* 保存现场 */ 328 tcb[current].sp=_SP; 329 330 if(tcb[current].state==RUNNING) 331 tcb[current].state=READY; 332 333 i=Find(); 334 if(i<0) 335 return; 336 337 _SS=tcb[i].ss; 338 _SP=tcb[i].sp; 339 tcb[i].state=RUNNING; 340 current=i; 341 timecount=0; /* 重新计时 */ 342 343 enable(); 344 } 345 else 346 return; 347 } 348 else 349 return; 350 } 351 352 void interrupt swtch() /* 其他原因CPU调度 */ 353 { 354 int i; 355 356 if(tcb[current].state!=FINISHED 357 &¤t!=0&&tcb[current].state!=BLOCKED) /* 当前线程还没结束 */ 358 return; 359 360 i=Find(); 361 if(i<0) 362 return; 363 364 disable(); 365 tcb[current].ss=_SS; 366 tcb[current].sp=_SP; 367 368 if(tcb[current].state==RUNNING) 369 tcb[current].state=READY; /* 放入就绪队列中 */ 370 371 _SS=tcb[i].ss; 372 _SP=tcb[i].sp; /* 保存现场 */ 373 374 tcb[i].state=RUNNING; 375 current=i; 376 enable(); 377 } 378 379 void block(struct TCB **p) /* 阻塞原语 */ 380 { 381 struct TCB *pp; 382 383 tcb[current].state=BLOCKED; 384 385 if((*p)==NULL) 386 *p=&tcb[current]; /* 阻塞队列空,直接放入 */ 387 else 388 { 389 pp=*p; 390 while(pp->next) 391 pp=pp->next; /* 找到阻塞队列最后一个节点 */ 392 393 pp->next=&tcb[current]; /* 放入阻塞队列 */ 394 } 395 tcb[current].next=NULL; 396 swtch(); /* 重新进行CPU调度 */ 397 } 398 399 void wakeup_first(struct TCB **p) /* 唤醒队首线程 */ 400 { 401 struct TCB *pl; 402 403 if((*p)==NULL) 404 return; 405 406 pl=(*p); 407 (*p)=(*p)->next; /* 得到阻塞队列队首线程 */ 408 pl->state=READY; /* 修为就绪状态 */ 409 pl->next=NULL; 410 } 411 412 void p(semaphore *sem) 413 { 414 struct TCB **qp; 415 416 disable(); 417 sem->value=sem->value-1; 418 419 if(sem->value<0) 420 { 421 qp=&(sem->wq); 422 block(qp); 423 } 424 enable(); 425 } 426 427 void v(semaphore*sem) 428 { 429 struct TCB **qp; 430 431 disable(); 432 qp=&(sem->wq); 433 sem->value=sem->value+1; 434 435 if(sem->value>=0) 436 wakeup_first(qp); 437 438 enable(); 439 } 440 441 /////////////////////////////////////////////////////////////////////////////// 442 // buffer 443 struct buffer*Initbuf(void) 444 { 445 struct buffer *p,*pt,*pt2; 446 int i; 447 448 pt2=pt=(struct buffer*)malloc(sizeof(struct buffer)); 449 pt->sender=-1; 450 pt->size=0; 451 strcmp(pt->text,""); 452 pt->next=NULL; 453 454 for(i=0;i<NBUF-1;i++) 455 { 456 p=(struct buffer*)malloc(sizeof(struct buffer)); 457 p->sender=-1; 458 p->size=0; 459 p->text[NTEXT]=‘\0‘; 460 p->next=NULL; 461 pt2->next=p; 462 pt2=p; 463 } 464 465 return pt; 466 } 467 468 // 从空闲消息缓冲队列队头上取下一缓空闲消息冲区 469 struct buffer* getbuf(void) 470 { 471 struct buffer *buf; 472 473 buf=freebuf; /* 取得缓冲队列的缓冲区*/ 474 freebuf=freebuf->next; 475 476 return(buf); /* 返回指向该缓冲区的指针 */ 477 } 478 479 // 将buff所指的缓冲区插到*mq所指的缓冲队列末尾 480 void insert(struct buffer **mq, struct buffer *buff) 481 { 482 struct buffer *temp; 483 484 if(buff==NULL) 485 return; /* buff为空 */ 486 487 buff->next=NULL; 488 if(*mq==NULL) /* *mq为空 则直接插入*/ 489 *mq=buff; 490 else 491 { 492 temp=*mq; 493 while(temp->next) /* 找到队尾 */ 494 temp=temp->next; 495 496 temp->next=buff; 497 } 498 } 499 500 // 将地址a开始的size个字节发送给外部标识符为receiver的线程 501 void send(char *receiver,char *a, int size) 502 { 503 struct buffer *buff; 504 int i,id=-1; 505 506 disable(); 507 for(i=0;i<NTCB;i++) 508 { 509 if(strcmp(receiver,tcb[i].name)==0) 510 { 511 id=i; 512 break; 513 } 514 } 515 516 if(id==-1) 517 { 518 printf("Error:Receiver not exist!\n"); 519 enable(); 520 return; 521 } 522 523 p(&sfb); 524 525 p(&mutexfb); 526 buff=getbuf(); 527 v(&mutexfb); 528 529 buff->sender=current; 530 buff->size=size; 531 buff->next=NULL; 532 533 for(i=0;i<buff->size;i++,a++) 534 buff->text[i]=*a; 535 536 // 将要发送的消息放到接收者TCB的buffer中 537 p(&tcb[id].mutex); 538 insert(&(tcb[id].mq),buff); 539 v(&tcb[id].mutex); 540 541 // 用于同步 542 v(&tcb[id].sm); 543 enable(); 544 } 545 546 ////////////////////////////////////////////////////////////////////////////////////////////// 547 // 获取消息缓冲区函数 548 struct buffer *remov(struct buffer **mq, int sender) 549 { 550 struct buffer *buff, *p, *q; 551 q = NULL; 552 p = *mq; 553 554 // 在消息缓冲区队列中找到其他进程发送给自己的消息 555 while((p->next != NULL) && (p->sender != sender)) 556 { 557 q = p; 558 p = p->next; 559 } 560 561 // 获取消息后从队列中删除,防止重复接收 562 if(p->sender == sender) 563 { 564 buff = p; 565 if(q == NULL) 566 *mq = buff->next; 567 else 568 q->next = buff->next; 569 570 buff->next = NULL; 571 return buff; 572 } 573 else 574 return NULL; 575 } 576 577 // 接收原语 578 int receive(char *sender, char *b) 579 { 580 int i, id = -1; 581 struct buffer *buff; 582 583 disable(); 584 585 // 寻找 sender 586 for(i = 0; i < NBUF; i++) 587 { 588 if(strcmp(sender, tcb[i].name) == 0) 589 { 590 id = i; 591 break; 592 } 593 } 594 595 if(id == -1) 596 { 597 enable(); 598 return -1; 599 } 600 601 p(&tcb[current].sm); 602 603 p(&tcb[current].mutex); 604 buff = remov(&(tcb[current].mq), id); 605 v(&tcb[current].mutex); 606 607 if(buff == NULL) 608 { 609 v(&tcb[current].sm); 610 enable(); 611 return -1; 612 } 613 // 将消息正文复制到接收区 614 strcpy(b, buff->text); 615 616 // 释放前先把标识去掉,防止重复接收 617 buff->sender = -1; 618 // 释放相应的消息缓冲区 619 p(&mutexfb); 620 insert(&freebuf, buff); 621 v(&mutexfb); 622 623 v(&sfb); 624 625 enable(); 626 627 return buff->size; 628 } 629 630 void main() 631 { 632 long i, j, k; 633 634 bufferSem1.value = 1; 635 bufferSem1.wq = NULL; 636 637 bufferSem2.value = 0; 638 bufferSem2.wq = NULL; 639 640 InitInDos(); 641 InitTcb(); 642 643 freebuf=Initbuf(); 644 old_int8=getvect(8); 645 646 strcpy(tcb[0].name,"main"); 647 tcb[0].state=RUNNING; 648 tcb[0].value=0; 649 current=0; 650 651 Create("f1",(codeptr)f1,1024,5); 652 Create("f2",(codeptr)f2,1024,6); 653 654 tcb_state(); 655 setvect(8,new_int8); 656 657 while(!all_finished()); 658 { 659 660 printf("running!\n"); 661 662 } 663 664 tcb[0].name[0]=‘\0‘; 665 tcb[0].state=FINISHED; 666 setvect(8,old_int8); 667 668 tcb_state(); 669 670 printf("\n Muli_task system teminated \n"); 671 }
时间: 2024-11-03 21:40:08