Unix IPC之Posix信号量实现生产者消费者

采用多生产者,多消费者模型。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

/**

 * 生产者

 */

P(nempty);

P(mutex);

// 写入一个空闲位置

V(mutex);

V(nstored);

/**

 * 消费者

 */

P(nstored);

P(mutex):

// 清空一个非空闲位置

V(mutex);

V(nempty);

全局性说明:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

#include    "unpipc.h"

#define NBUFF        10

#define MAXNTHREADS 100

int     nitems, nproducers, nconsumers;     /* read-only */

struct      /* data shared by producers and consumers */

{

    int   buff[NBUFF];

    int   nput;           /* item number: 0, 1, 2, ... */

    int   nputval;        /* value to store in buff[] */

    int   nget;           /* item number: 0, 1, 2, ... */

    int   ngetval;        /* value fetched from buff[] */

    sem_t mutex, nempty, nstored;     /* semaphores, not pointers */

} shared;

void    *produce(void *);

void    *consume(void *);

/* end globals */

主函数:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

/* include main */

int

main(int argc, char **argv)

{

    int     i, prodcount[MAXNTHREADS], conscount[MAXNTHREADS];

    pthread_t   tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS]; 

    if (argc != 4)

        err_quit("usage: prodcons4 <#items> <#producers> <#consumers>");

    nitems = atoi(argv[1]);

    nproducers = min(atoi(argv[2]), MAXNTHREADS);

    nconsumers = min(atoi(argv[3]), MAXNTHREADS);

    /* 4initialize three semaphores */

    Sem_init(&shared.mutex, 0, 1);

    Sem_init(&shared.nempty, 0, NBUFF);

    Sem_init(&shared.nstored, 0, 0);

    /* 4create all producers and all consumers */

    Set_concurrency(nproducers + nconsumers);

    for (i = 0; i < nproducers; i++)

    {

        prodcount[i] = 0;

        Pthread_create(&tid_produce[i], NULL, produce, &prodcount[i]);

    }

    for (i = 0; i < nconsumers; i++)

    {

        conscount[i] = 0;

        Pthread_create(&tid_consume[i], NULL, consume, &conscount[i]);

    }

    /* 4wait for all producers and all consumers */

    for (i = 0; i < nproducers; i++)

    {

        Pthread_join(tid_produce[i], NULL);

        printf("producer count[%d] = %d\n", i, prodcount[i]);

    }

    for (i = 0; i < nconsumers; i++)

    {

        Pthread_join(tid_consume[i], NULL);

        printf("consumer count[%d] = %d\n", i, conscount[i]);

    }

    Sem_destroy(&shared.mutex);

    Sem_destroy(&shared.nempty);

    Sem_destroy(&shared.nstored);

    exit(0);

}

/* end main */

生产者线程:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

/* include produce */

void *

produce(void *arg)

{

    for ( ; ; )

    {

        Sem_wait(&shared.nempty);   /* wait for at least 1 empty slot */

        Sem_wait(&shared.mutex);

        if (shared.nput >= nitems)

        {

            Sem_post(&shared.nstored);  /* let consumers terminate */

            Sem_post(&shared.nempty);

            Sem_post(&shared.mutex);

            return(NULL);           /* all done */

        }

        shared.buff[shared.nput % NBUFF] = shared.nputval;

        shared.nput++;

        shared.nputval++;

        Sem_post(&shared.mutex);

        Sem_post(&shared.nstored);  /* 1 more stored item */

        *((int *) arg) += 1;

    }

}

/* end produce */

消费者线程:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

/* include consume */

void *

consume(void *arg)

{

    int     i;

    for ( ; ; )

    {

        Sem_wait(&shared.nstored);  /* wait for at least 1 stored item */

        Sem_wait(&shared.mutex);

        if (shared.nget >= nitems)

        {

            Sem_post(&shared.nstored);

            Sem_post(&shared.mutex);

            return(NULL);           /* all done */

        }

        i = shared.nget % NBUFF;

        if (shared.buff[i] != shared.ngetval)

            printf("error: buff[%d] = %d\n", i, shared.buff[i]);

        shared.nget++;

        shared.ngetval++;

        Sem_post(&shared.mutex);

        Sem_post(&shared.nempty);   /* 1 more empty slot */

        *((int *) arg) += 1;

    }

}

/* end consume */

来自为知笔记(Wiz)

时间: 2024-09-29 16:05:16

Unix IPC之Posix信号量实现生产者消费者的相关文章

基于POSIX的信号量的生产者消费者模型

信号量和Mutex类似,表示可用资源的数量,和Mutex不同的是,这个数量可以大于1,即如果信号量描述的资源数目是1时,此时的信号量和互斥锁相同. 下面我们看看POSIX semaphore库函数,它既可以用于同一进程的线程间同步,也可以用于不同进程间的同步. 1. int sem_init(sem_t *sem,int pshared,unsigned int value) 我们可以用此函数来创建一个未命名的信号量,pshared参数表明是否在多个进程中使用信号量,如果是,将其设置为非0 值,

用信号量解决生产者消费者问题

用信号量解决生产者消费者问题: ipc.h #ifndef _IPC_H_ #define _IPC_H_ #include <sys/types.h> #include <unistd.h> #include <sys/ipc.h> #include <sys/sem.h> #include <sys/shm.h> #include <errno.h> #include <stdio.h> #include <st

Linux下用环形buf以及POSIX版本信号量解决生产者消费者问题

一.Semaphore(信号量) Mutex变量是非0即1的,可看作一种资源的可用数量,初始化时Mutex是1,表示有一个可用资源, 加锁时获得该资源,将Mutex减到0,表示不再有可用资源,解锁时释放该资源,将Mutex重新加到1,表示又有了一个可用资源. 信号量(Semaphore)和Mutex类似,表示可用资源的数量,和Mutex不同的是这个数量可以大于1. 即,如果信号量描述的资源数目是1时,此时的信号量和互斥锁相同! 本次使用的是POSIX semaphore库函数,这种信号量不仅可以

信号量实现生产者消费者问题

生产消费问题是一个经典的数学问题,要求生产者---消费者在固定的仓库空间条件下,生产者每生产一个 产品将占用一个仓库空间,生产者生产的产品库存不能越过仓库的存储量,消费者每消费一个产品将增加 一个仓库空间,消费者在仓库产品为0时不能再消费. 以下使用了两个信号量,一个用来管理消费者即sem_produce,另一个用来管理生产者即sem_custom, sem_produce表示当前仓库可用空间的数量,sem_custom用来表示当前仓库中产品的数量. 对于生产者来说,其需要申请的资源为仓库中的剩

【Windows】用信号量实现生产者-消费者模型

线程并发的生产者-消费者模型: 1.两个进程对同一个内存资源进行操作,一个是生产者,一个是消费者. 2.生产者往共享内存资源填充数据,如果区域满,则等待消费者消费数据. 3.消费者从共享内存资源取数据,如果区域空,则等待生产者填充数据. 4.生产者的填充数据行为和消费者的消费数据行为不可在同一时间发生. 下面用Windows的信号量以及线程等API模拟生产者-消费者模型 #include <Windows.h> #include <stdio.h> #define N 100 #d

35 守护进程 互斥锁 IPC 共享内存 的方式 生产者消费者模型

守护进程 进程:一个正在运行的程序. 主进程创建守护进程: 1.守护进程会在主进程代码执行结束后就终止, 2.守护进程内无法再开启子进程,否则抛出异常. 注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止. 例子:from multiprocessing import Processimport time def task(): print('老了....') time.sleep(2) print('睡了一会..') if __name__ == '__main__': prin

基于POSIX信号量实现生产者消费模型

一.基础知识 1.1 system V版本的信号量和POSIX下信号量的区别 我们前面讲过进程间通信有三种形式:管道.消息队列.信号量.这个信号量是system V版本下,以信号量集形式申请存在的,它标识着一个临界资源的有无从而控制不同的进程能否访问到该临界资源.但,现在,我们要讲的信号量是基于POSIX下的信号量,它用来标识资源的个数. 1.2 互斥锁和信号量 上篇所述,互斥锁(Mutex)可看作是某种资源的可用数,Mutex变量是非0即1的,初始化时Mutex为1,表示有一个可用资源:加锁时

IPC之Posix信号量详解

基本概念: 信号量(semaphore)是一种用于提供不同进程间或一个给定进程的不用线程间同步手段的原语. 共有三种类型的信号量: 1)Posix有名信号量:使用Posix IPC名字标识,可用于进程或线程间的同步. 2)Posix基于内存的信号量:存放在共享内存区中,可用于进程或线程间的同步. 3)System V信号量:在内核中维护,可用于进程或者线程间同步.(本文不讨论System V信号量) 一个进程可以在某个信号量上执行三种操作: (1)创建(create)一个信号量.这还要求调用者指

Unix IPC之Posix消息队列(1)

部分参考:http://www.cnblogs.com/Anker/archive/2013/01/04/2843832.html IPC对象的持续性:http://book.51cto.com/art/201006/207275.htm 消息队列可以认为是一个消息链表,某个进程往一个消息队列中写入消息之前,不需要另外某个进程在该队列上等待消息的达到,这一点与管道和FIFO相反.Posix消息队列与System V消息队列的区别如下: 1. 对Posix消息队列的读总是返回最高优先级的最早消息,