linux生产者消费者问题

(多进程+共享内存+信号量)

一.分析

生产者和消费者问题是多个相互合作的进程之间的一种抽象。生产者和消费者之间的关系:

1.  对缓冲区的访问是互斥的。由于两者都会修改缓冲区,因此,一方修改缓冲区时,另一方不能修改,这就是互斥。

2.  一方的行为影响另一方。缓冲区不空,才能消费,何时不空?生产了就不空;缓冲区满,就不能生产,何时不满?消费了就不满。这是同步关系。

为了描述这种关系,一方面,使用共享内存代表缓冲区;另一方面,使用 互斥信号量 控制对缓冲区的访问,使用同步信号量描述两者的依赖关系。

二. 共享存储

共享存储是进程间通信的一种手段,通常,使用信号量同步或互斥访问共享存储。共享存储的原理是将进程的地址空间映射到一个共享存储段。在LINUX下,通过使用 shmget 函数创建或者获取共享内存。

1. 创建

1)不指定 KEY

// IPC_PRIVATE指出需要创建内存;

//SHM_SIZE 指出字节大小;

//SHM_MODE 指出访问权限字如 0600表示,用户可以读写该内存

int shmget(key_t IPC_PRIVATE,size_t SHM_SIZE,int SHM_MODE);

2)指定KEY

//如果SHM_KEY指向的共享存储已经存在,则返回共享存储的ID;

//否则,创建共享存储并返回其ID

int  shmget(key_t SHM_KEY,size_t SHM_SIZE,int SHM_MODE);

2. 访问

方法一

只需要共享存储的 ID 就可以通过  shmat  函数获得共享存储所占用的实际地址。因此,可以在父进程的栈中用变量存放指向共享存储的指针,那么 fork 之后,子进程就可以很方便地通过这个指针访问共享存储了。

方法二

如果进程之间并没有父子关系,但是协商好了共享存储的 KEY , 那么在每个进程中,就可以通过 KEY 以及 shmget 函数获得共享存储的 I D , 进而通过 shmat 函数获得共享存储的实际地址,最后访问。

在我的实现中,我把生产者实现为父进程,消费者实现为子进程,并通过方法一实现进程之间共享内存。

三. 信号量集

信号量有两种原语 P 和 V ,P 锁定资源,V 释放资源。LINUX 下的使用信号量集合的接口特别复杂。我所用到的函数如下:

1. 创建或者获取信号量集合

// IPC_PRIVATE 表示创建信号量集, NUM_OF_SEM表示该集合中有多少信号量; FLAGS复杂不追究

semget(IPC_PRIVATE, NUM_OF_SEM, FLAGS );

// SEM_KEY 是 key_t 类型

//如果 SEM_KEY 代表的信号量集存在,则返回信号量集的ID

//如果不存在,则创建信号量集并返回ID

semget(SEM_KEY, NUM_OF_SEM,FLAGS);

2. 初始化信号量

创建的过程并未指定信号量的初始值,需要使用 semctl 函数指定。

semctl(int semSetId , int semIdx , int cmd, union semun su);

其中 semSetId 是指信号量集的 ID , semIdx 指信号量集中某个信号量的索引(从零开始), 如果是要设置信号量的值, 填 SETVAL 即可, 为了设置信号量的值,可以指定su.val为索要设置的值。

我在 UBUNTU 下使用 union semun 编译时总报错:

invalid use of undefined type ‘union semun’

据说是 Linux 下删除了 semun 的定义。可以通过自定义 semun 解决:

  1. #if defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)
  2. #else
  3. union semun{
  4. int val;
  5. struct semid_ds *buf;
  6. unsigned short *array;
  7. };
  8. #endif

四.代码分解

1. 头文件

  1. #include "stdio.h"   //支持 printf
  2. #include <sys/shm.h> //支持 shmget shmat 等
  3. #include <sys/sem.h> //支持 semget
  4. #include <stdlib.h>  //支持 exit

2. 信号量

共需要三个信号量:

第一个信号量用于限制生产者必须在缓冲区不满时才能生产,是同步信号量

第二个信号量用于限制消费者必须在缓冲区有产品时才消费,是同步信号量

第三个信号量用于限制生产者和消费者在访问缓冲区时必须互斥,是互斥信号量

创建信号量集合,semget

  1. if((semSetId = semget(IPC_PRIVATE,3,SEM_MODE)) < 0)
  2. {
  3. perror("create semaphore failed");
  4. exit(1);
  5. }

初始化三个信号量,semctl,需要用到 union semun

  1. union semun su;
  2. su.val = N_BUFFER;
  3. if(semctl(semSetId,0,SETVAL, su) < 0){
  4. perror("semctl failed");
  5. exit(1);
  6. }
  7. su.val = 0;
  8. if(semctl(semSetId,1,SETVAL,su) < 0){
  9. perror("semctl failed");
  10. exit(1);
  11. }
  12. su.val = 1;
  13. if(semctl(semSetId,2,SETVAL,su) < 0){
  14. perror("semctl failed");
  15. exit(1);
  16. }

封装对信号量集中的某个信号量的值的+1或者-1操作


  1. void waitSem(int semSetId,int semNum)
  2. {
  3. struct sembuf sb;
  4. sb.sem_num = semNum;
  5. sb.sem_op = -1;
  6. sb.sem_flg = SEM_UNDO;
  7. if(semop(semSetId,&sb,1) < 0){
  8. perror("waitSem failed");
  9. exit(1);
  10. }
  11. }
  12. void sigSem(int semSetId,int semNum)
  13. {
  14. struct sembuf sb;
  15. sb.sem_num = semNum;
  16. sb.sem_op = 1;
  17. sb.sem_flg = SEM_UNDO;
  18. if(semop(semSetId,&sb,1) < 0){
  19. perror("waitSem failed");
  20. exit(1);
  21. }
  22. }

3. 使用共享内存


  1. struct ShM{
  2. int start;
  3. int end;
  4. }* pSM;
  5. if((shmId = shmget(IPC_PRIVATE,SHM_SIZE,SHM_MODE)) < 0)
  6. {
  7. perror("create shared memory failed");
  8. exit(1);
  9. }
  10. pSM = (struct ShM *)shmat(shmId,0,0);
  11. pSM->start = 0;
  12. pSM->end = 0;

4. 生产过程

  1. while(1)
  2. {
  3. waitSem(semSetId,0);
  4. waitSem(semSetId,2);
  5. produce();
  6. sigSem(semSetId,2);
  7. sleep(1);
  8. sigSem(semSetId,1);
  9. }

5. 消费过程

  1. while(1)
  2. {
  3. waitSem(semSetId,1);
  4. waitSem(semSetId,2);
  5. consume();
  6. sigSem(semSetId,2);
  7. sigSem(semSetId,0);
  8. sleep(2);
  9. }

五. 代码全文

  1. #include "stdio.h"
  2. #include <sys/shm.h>
  3. #include <sys/sem.h>
  4. #include <stdlib.h>
  5. #define SHM_SIZE (1024*1024)
  6. #define SHM_MODE 0600
  7. #define SEM_MODE 0600
  8. #if defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)
  9. #else
  10. union semun{
  11. int val;
  12. struct semid_ds *buf;
  13. unsigned short *array;
  14. };
  15. #endif
  16. struct ShM{
  17. int start;
  18. int end;
  19. }* pSM;
  20. const int N_CONSUMER = 3;
  21. const int N_BUFFER = 5;
  22. int shmId = -1,semSetId=-1;
  23. union semun su;
  24. void waitSem(int semSetId,int semNum)
  25. {
  26. struct sembuf sb;
  27. sb.sem_num = semNum;
  28. sb.sem_op = -1;
  29. sb.sem_flg = SEM_UNDO;
  30. if(semop(semSetId,&sb,1) < 0){
  31. perror("waitSem failed");
  32. exit(1);
  33. }
  34. }
  35. void sigSem(int semSetId,int semNum)
  36. {
  37. struct sembuf sb;
  38. sb.sem_num = semNum;
  39. sb.sem_op = 1;
  40. sb.sem_flg = SEM_UNDO;
  41. if(semop(semSetId,&sb,1) < 0){
  42. perror("waitSem failed");
  43. exit(1);
  44. }
  45. }
  46. void produce()
  47. {
  48. int last = pSM->end;
  49. pSM->end = (pSM->end+1) % N_BUFFER;
  50. printf("生产 %d\n",last);
  51. }
  52. void consume()
  53. {
  54. int last = pSM->start;
  55. pSM->start = (pSM->start + 1)%N_BUFFER;
  56. printf("消耗 %d\n",last);
  57. }
  58. void init()
  59. {
  60. if((shmId = shmget(IPC_PRIVATE,SHM_SIZE,SHM_MODE)) < 0)
  61. {
  62. perror("create shared memory failed");
  63. exit(1);
  64. }
  65. pSM = (struct ShM *)shmat(shmId,0,0);
  66. pSM->start = 0;
  67. pSM->end = 0;
  68. if((semSetId = semget(IPC_PRIVATE,3,SEM_MODE)) < 0)
  69. {
  70. perror("create semaphore failed");
  71. exit(1);
  72. }
  73. su.val = N_BUFFER;
  74. if(semctl(semSetId,0,SETVAL, su) < 0){
  75. perror("semctl failed");
  76. exit(1);
  77. }
  78. su.val = 0;
  79. if(semctl(semSetId,1,SETVAL,su) < 0){
  80. perror("semctl failed");
  81. exit(1);
  82. }
  83. su.val = 1;
  84. if(semctl(semSetId,2,SETVAL,su) < 0){
  85. perror("semctl failed");
  86. exit(1);
  87. }
  88. }
  89. int main()
  90. {
  91. int i = 0,child = -1;
  92. init();
  93. for(i = 0; i < N_CONSUMER; i++)
  94. {
  95. if((child = fork()) < 0)
  96. {
  97. perror("the fork failed");
  98. exit(1);
  99. }
  100. else if(child == 0)
  101. {
  102. printf("我是第 %d 个消费者子进程,PID = %d\n",i,getpid());
  103. while(1)
  104. {
  105. waitSem(semSetId,1);
  106. waitSem(semSetId,2);
  107. consume();
  108. sigSem(semSetId,2);
  109. sigSem(semSetId,0);
  110. sleep(2);
  111. }
  112. break;
  113. }
  114. }
  115. if(child > 0)
  116. {
  117. while(1)
  118. {
  119. waitSem(semSetId,0);
  120. waitSem(semSetId,2);
  121. produce();
  122. sigSem(semSetId,2);
  123. sleep(1);
  124. sigSem(semSetId,1);
  125. }
  126. }
  127. return 0;
  128. }

原文地址:https://www.cnblogs.com/chenyibin1995/p/8972547.html

时间: 2024-08-09 11:07:25

linux生产者消费者问题的相关文章

Linux线程编程之生产者消费者问题

前言 本文基于顺序循环队列,给出Linux生产者/消费者问题的多线程示例,并讨论编程时需要注意的事项.文中涉及的代码运行环境如下: 本文假定读者已具备线程同步的基础知识. 一  顺序表循环队列 1.1 顺序循环队列定义 队列是一种运算受限的先进先出线性表,仅允许在队尾插入(入队),在队首删除(出队).新元素入队后成为新的队尾元素,元素出队后其后继元素就成为队首元素. 队列的顺序存储结构使用一个数组和两个整型变量实现,其结构如下: 1 struct Queue{ 2 ElemType elem[M

Linux组件封装(五)一个生产者消费者问题示例

生产者消费者问题是计算机中一类重要的模型,主要描述的是:生产者往缓冲区中放入产品.消费者取走产品.生产者和消费者指的可以是线程也可以是进程. 生产者消费者问题的难点在于: 为了缓冲区数据的安全性,一次只允许一个线程进入缓冲区,它就是所谓的临界资源. 生产者往缓冲区放物品时,如果缓冲区已满,那么需要等待,一直到消费者取走产品为止. 消费者取走产品时,如果没有物品,需要等待,一直到有生产者放入为止. 第一个问题属于互斥问题,我们需要使用一把互斥锁,来实现对缓冲区的安全访问. 后两个属于同步问题,两类

Linux 信号量 生产者消费者小例题 (嘘,我是菜鸟~)

菜鸟偶遇信号量,擦出火花(只有不熟才会有火花).于是上网搜资料和看<Unix环境高级编程>实现了几个小例题,高手请勿喷!这几位写得非常好啊: 题目来源: http://www.it165.net/os/html/201312/7039.html 信号量及其用法:http://www.cnblogs.com/hjslovewcl/archive/2011/03/03/2314341.html Mutex与Semaphore区别著名的厕所理论:http://koti.mbnet.fi/niclas

Linux多线程实践(8) --Posix条件变量解决生产者消费者问题

Posix条件变量 int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr); int pthread_cond_destroy(pthread_cond_t *cond); int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex); int pthread_cond_timedwait(pthread_cond_t *cond

Linux多线程实践(5) --Posix信号量与互斥量解决生产者消费者问题

Posix信号量 Posix 信号量 有名信号量 无名信号量 sem_open sem_init sem_close sem_destroy sem_unlink sem_wait sem_post 有名信号量 #include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ #include <semaphore.h> sem_t *sem_open(co

Linux下用环形buf以及POSIX版本信号量解决生产者消费者问题

一.Semaphore(信号量) Mutex变量是非0即1的,可看作一种资源的可用数量,初始化时Mutex是1,表示有一个可用资源, 加锁时获得该资源,将Mutex减到0,表示不再有可用资源,解锁时释放该资源,将Mutex重新加到1,表示又有了一个可用资源. 信号量(Semaphore)和Mutex类似,表示可用资源的数量,和Mutex不同的是这个数量可以大于1. 即,如果信号量描述的资源数目是1时,此时的信号量和互斥锁相同! 本次使用的是POSIX semaphore库函数,这种信号量不仅可以

Linux多线程模拟生产者/消费者问题

描述: 生产者-消费者问题是一个经典的进程同步问题,该问题最早由Dijkstra提出,用以演示他提出的信号量机制.在同一个进程地址空间内执行的N个线程生产者线程生产物品,然后将物品放置在一个空缓冲区中供N个消费者线程消费.消费者线程从缓冲区中获得物品,然后释放缓冲区.当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费者线程释放出一个空缓冲区.当消费者线程消费物品时,如果没有满的缓冲区,那么消费者线程将被阻塞,直到新的物品被生产出来. 输入: 生产者个数.消费者个数.还是缓冲

LINUX学习:生产者&amp;消费者模型复习

回顾一下生产者消费者模型. #include <stdio.h> #include <stdlib.h> #include <string.h> #include <semaphore.h> #include <unistd.h> #include <sys/types.h> #include <pthread.h> #define ERR_EXIT(m) do { perror(m); exit(EXIT_FAILURE

Linux平台下线程同步,实现“生产者消费者问题”

(1)线程同步,实现"生产者消费者问题" 要求:缓冲区大小为20,生产者每次放一个产品,消费者每次取走一个产品:生产者和消费者至少2个. (2)代码如下: #include <stdio.h> #include <pthread.h> #include <unistd.h> #include <sched.h> void *producter_f (void *arg); /*生产者*/ void *consumer_f (void *a