Linux:生产者与消费者模式

生产者:生产数据

消费者:消费数据

提供场所:缓冲区,eg:超市

生产者消费者特点:三种关系,两类人,一个场所

三种关系指的是:生产者与生产者之间是互斥关系

消费者与消费者之间是互斥关系

生产者与消费者之间是同步与互斥关系

两类人:生产者,消费者

一个场所:存储数据(此处用带头单链表实现)

单生产者单消费者模式:此例取数据方式为FIFO先进先出。

利用互斥锁实现单生产者单消费者模式。

#include<stdio.h>
#include<malloc.h>
#include<pthread.h>

typedef int _dataType;
typedef int *_dataType_p;
typedef struct _node{
	_dataType data;
	struct _node* next;
}node,*nodep,**nodepp;

nodep head = NULL;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

nodep buyNode(_dataType val)
{
	nodep tmp = (nodep)malloc(sizeof(node));
	if(tmp!= NULL)
	{
		tmp->data = val;
		tmp->next = NULL;
		return tmp;
	}
	return NULL;
}

void init(nodepp head)
{
	*head = buyNode(0);
}
//存在头节点。
void push_list(nodep head,_dataType val)
{
	nodep tmp = buyNode(val);
	tmp->next = head->next;
	head->next = tmp;
}

int pop_back_list(nodep head,_dataType_p pval)
{
	//当之存在一个哨兵位的时候.
	if(NULL == head->next)
		return -1;

	nodep index = head;
	while(NULL != index->next->next)
	{
	     index = index->next;
	}
	nodep del = index->next;
	*pval = del->data;
	index->next = del->next;
	free(del);
	return 0;
}

void* product(void* arg)
{
	_dataType i = 0;
	while(1)
	{
		sleep(1);
		pthread_mutex_lock(&mutex);
		push_list(head,i++);
		pthread_mutex_unlock(&mutex);
	}
	pthread_exit((void *)1);
}

void* consumer(void *arg)
{
	_dataType val = 0;
	while(1){
		sleep(2);
		pthread_mutex_lock(&mutex);
		if(pop_back_list(head,&val) == -1)
		{
			pthread_mutex_unlock(&mutex);
			continue;
		}
		printf("data:%d\n",val);
		pthread_mutex_unlock(&mutex);
	}
	pthread_exit((void *)1);
}

int main()
{
	pthread_t tid1,tid2;
	init(&head);
	pthread_create(&tid1,NULL,product,NULL);
	pthread_create(&tid2,NULL,consumer,NULL);
	pthread_join(tid1,NULL);
	pthread_join(tid2,NULL);
	free(head);
	pthread_mutex_destroy(&mutex);
	return 0;
}

运行结果:

然后我们说一说条件变量:

线程间的同步还有这样一种情况:线程A需要等某个条件成立才能继续往下执行,现在这个条

件不成立,线程A就阻塞等待,而线程B在执行过程中使这个条件成立了,就唤醒线程A继续执

行。 在pthread库中通过条件变量(Condition Variable)来阻塞等待一个条件,或者唤醒等待这个

条件的线程。Condition Variable用pthread_cond_t类型的变量表示,可以这样初始化和销毁:

返回值:成功返回0,失败返回错误号。

和Mutex的初始化和销毁类似,pthread_cond_init函数初始化.一个Condition Variable,attr参数为

NULL则表.示缺省属性,pthread_cond_destroy函数销毁.一个Condition Variable。如果Condition

Variable是静态分配的,也可以.用宏定义PTHEAD_COND_INITIALIZER初始化,相当于 .用

pthread_cond_init函数初始化并且attr参数为NULL。Condition Variable的操作可以

用下列函数:

返回值:成功返回0,失败返回错误号。

可见,一个Condition Variable总是和一个Mutex搭配使用的。一个线程可以调用

pthread_cond_wait在一个Condition Variable上阻塞等待,这个函数做以下三步操作:

1. 释放Mutex

2. 阻塞等待

3. 当被唤醒时,重新获得Mutex并返回

pthread_cond_timedwait函数还有一个额外的参数可以设定等待超时,如果到达了abstime所指定的时刻仍然没有别的线程来唤醒当前线程,就返回ETIMEDOUT。一个线程可以调用

pthread_cond_signal唤醒在某个Condition Variable上等待的另一个线程,也可以调用

pthread_cond_broadcast唤醒在这个Condition Variable上等待的所有线程。

我们用条件变量来写一个单生产者单消费者模式:

#include<stdio.h>
#include<malloc.h>
#include<pthread.h>

typedef int _dataType;
typedef int *_dataType_p;
typedef struct _node{
	_dataType data;
	struct _node* next;
}node,*nodep,**nodepp;

nodep head = NULL;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

nodep buyNode(_dataType val)
{
	nodep tmp = (nodep)malloc(sizeof(node));
	if(tmp!= NULL)
	{
		tmp->data = val;
		tmp->next = NULL;
		return tmp;
	}
	return NULL;
}

void init(nodepp head)
{
	*head = buyNode(0);
}

void push_list(nodep head,_dataType val)
{
	nodep tmp = buyNode(val);
	tmp->next = head->next;
	head->next = tmp;
}

int pop_list(nodep head,_dataType_p pval)
{
	if(NULL == head->next)
		return -1;
	nodep del = head->next;
	*pval = del->data;
	head->next = del->next;
	free(del);
	return 0;
}

void* product(void* arg)
{
	_dataType i = 0;
	while(1)
	{
		sleep(1);
		pthread_mutex_lock(&mutex);
		push_list(head,i++);
		pthread_cond_signal(&cond);
		pthread_mutex_unlock(&mutex);
	}
	pthread_exit((void *)1);
}

void* consumer(void *arg)
{
	_dataType val = 0;
	while(1){
		sleep(1);
		pthread_mutex_lock(&mutex);
		if(pop_list(head,&val) == -1)
			pthread_cond_wait(&cond,&mutex);
		printf("data:%d\n",val);
		pthread_mutex_unlock(&mutex);
	}
	pthread_exit((void *)1);
}

int main()
{
	pthread_t tid1,tid2;
	init(&head);
	pthread_create(&tid1,NULL,product,NULL);
	pthread_create(&tid2,NULL,consumer,NULL);
	pthread_join(tid1,NULL);
	pthread_join(tid2,NULL);
	free(head);
	pthread_mutex_destroy(&mutex);
	return 0;
}

运行结果同互斥锁一样。

信号量:

Mutex变量是非0即1的,可看作一种资源的可用数量,初始化时Mutex是1,表示有一个可用资源,    加锁时获得该资源,将Mutex减到0,表示不再有可用资源,解锁时释放该资源,将Mutex重新加到1,表示又有了一个可 用资源。

信号量(Semaphore)和Mutex类似,表示可用资源的数量,和Mutex不同的是这个数量可以大于

1。即,如果信号量描述的资源数目是1时,此时的信号量和互斥锁相同!

本节介绍的是POSIX semaphore库函数,详见sem_overview(7),这种信号量不仅可用于同一进程 的线程间同步,也可用于不同进程间的同步。

semaphore变量的类型为sem_t,sem_init()初始化一个semaphore变量,value参数表示可用资源的数量,pshared参数为0表示信号量用于同一进程的线程间同步,在用完semaphore变量之后应该调用sem_destroy()释放与semaphore相关的资源。

调用sem_wait()可以获得资源(P操作),使semaphore的值减1,如果调用sem_wait()时

semaphore的值已 经是0,则挂起等待。如果不希望挂起等待,可以调用sem_trywait() 。调用

sem_post() 可以释放资源(V操作),使semaphore 的值加1,同时唤醒挂起等待的线程。

上面生产者-消费者的例子是基于链表的,其空间可以动态分配,现在基于固定大小的环形队

列重写这个程序(POSIX信号量):

#include<stdio.h>
#include<pthread.h>
#include<semaphore.h>

#define _SEM_PRO_ 20
#define _SEM_COM_ 0

typedef int _dataType;
_dataType blank[_SEM_PRO_];

sem_t sem_product;
sem_t sem_consumer;

void *product(void *arg)
{
	int index = 0;
	int count = 0;
	while(1)
	{
		sleep(rand()%5);
		sem_wait(&sem_product);
		blank[index++] = count ++;
		sem_post(&sem_consumer);
		index %= _SEM_PRO_;
	}
	pthread_exit((void *)1);
}

void* consumer(void * arg)
{
	int index = 0;
	while(1)
	{
		sem_wait(&sem_consumer);
		printf("data%d\n",blank[index++]);
		sem_post(&sem_product);
		index %= _SEM_PRO_;

	}
	pthread_exit((void*)1);
}

int main()
{
	pthread_t tid1,tid2;
	sem_init(&sem_product,0,20);
	sem_init(&sem_consumer,0,0);
	pthread_create(&tid1,NULL,product,NULL);
	pthread_create(&tid2,NULL,consumer,NULL);
	pthread_join(tid1,NULL);
	pthread_join(tid2,NULL);
	sem_destroy(&sem_product);
	sem_destroy(&sem_consumer);
	return 0;
}

运行结果同上。

最后我们运用信号量来实现一个多消费者多生产者模式:

#include<stdio.h>
#include<pthread.h>
#include<semaphore.h>

#define _SEM_PRO_ 20
#define _SEM_COM_ 0

typedef int _dataType;
_dataType blank[_SEM_PRO_];

sem_t sem_product;
sem_t sem_consumer;
pthread_mutex_t mutex_product = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex_consumer = PTHREAD_MUTEX_INITIALIZER;

void *product(void *arg)
{
	int index = 0;
	int count = 0;
	while(1)
	{
		sleep(rand()%5);
		sem_wait(&sem_product);
		pthread_mutex_lock(&mutex_product);
		printf("%d thread id doing\n",(int )arg);
		blank[index++] = count ++;
		index %= _SEM_PRO_;
		pthread_mutex_unlock(&mutex_product);
		sem_post(&sem_consumer);
	}
	pthread_exit((void *)1);
}

void* consumer(void * arg)
{
	int index = 0;
	while(1)
	{
		sem_wait(&sem_consumer);
		pthread_mutex_lock(&mutex_consumer);

		printf("%d thread is consumer ,data%d\n",(int)arg,blank[index++]);
		index %= _SEM_PRO_;
		pthread_mutex_unlock(&mutex_consumer);
		sem_post(&sem_product);
	}
	pthread_exit((void*)1);
}

int main()
{
	pthread_t tid1,tid2,tid3,tid4;
	sem_init(&sem_product,0,20);
	sem_init(&sem_consumer,0,0);
	pthread_create(&tid1,NULL,product,NULL);
	pthread_create(&tid1,NULL,product,NULL);
	pthread_create(&tid3,NULL,consumer,NULL);
	pthread_create(&tid4,NULL,consumer,NULL);
	pthread_join(tid1,NULL);
	pthread_join(tid2,NULL);
	pthread_join(tid3,NULL);
	pthread_join(tid4,NULL);
	sem_destroy(&sem_product);
	sem_destroy(&sem_consumer);
	pthread_mutex_destroy(&mutex_product);
	pthread_mutex_destroy(&mutex_consumer);
	return 0;
}

运行结果:

总结:

其实本篇是借助生产者消费者模式来讲解线程的互斥锁,条件变量,信号量的操作。

对于生产者/消费者模型我们只需要记住三种关系,两类人,一个场所

三种关系指的是:生产者与生产者之间是互斥关系

消费者与消费者之间是互斥关系

生产者与消费者之间是同步与互斥关系

两类人:生产者,消费者

一个场所:存储数据。

然后对于生产者/消费者我以后会单独写一篇博客进行描述。

一定要熟悉运用本篇中的线程操作函数。

时间: 2024-10-11 10:22:40

Linux:生产者与消费者模式的相关文章

同步函数 生产者和消费者模式 加强版(多人生产和多人消费)

曾经搞了半天, 生产者和消费者模式  加强版(多人生产 多人消费 ).. 以前的代码格式就不再吐槽了(以后努力改进) //输出结果是一个无限循环 import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 多个生产者&多个消费者模式 * 多个生产者不断生产,多个消费者不停的消费

多线程生产者、消费者模式中,如何停止消费者?多生产者情况下对“毒丸”策略的应用。

生产者.消费者模式是多线程中的经典问题.通过中间的缓冲队列,使得生产者和消费者的速度可以相互调节. 对于比较常见的单生产者.多消费者的情况,主要有以下两种策略: 通过volatile boolean producerDone =false 来标示是否完成.生产者结束后标示为true, 消费者轮询这个变量来决定自己是否退出. 这种方式对producerDone产生比较大的争用,实现起来也有诸多问题需要考虑. 比较经典的"毒丸"策略,生产者结束后,把一个特别的对象:"毒丸&quo

生产者与消费者模式(线程的同步与互斥)

死锁产生的四个条件: 1.互斥使用(资源独占) 一个资源每次只能给一个进程使用 .2.不可强占(不可剥夺) 资源申请者不能强行的从资源占有者手中夺取资源,资源只能由占有者自愿释放 .3.请求和保持(部分分配,占有申请) 一个进程在申请新的资源的同时保持对原有资源的占有(只有这样才是动态申请,动态分配) .4.循环等待 存在一个进程等待队列 {P1 , P2 , - , Pn}, 其中P1等待P2占有的资源,P2等待P3占有的资源,-,Pn等待P1占有的资源,形成一个进程等待环路 生产者:生产数据

Java多线程设计模式(2)生产者与消费者模式

1 Producer-Consumer Pattern Producer-Consumer Pattern主要就是在生产者与消费者之间建立一个“桥梁参与者”,用来解决生产者线程与消费者线程之间速度的不匹配. 当要从某个线程Produccer参与者将数据传输给其它线程Consumer参与者的时候,此时就可以在中间加一个Channel参与者,在Channel参与者中以某种方式存放接受的数据,再以某方式来获取收到的数据,Channel就可以来缓存两个线程之间传输的数据,在Channel参与者为了保证安

python实现生产者和消费者模式

利用python的线程实现简单生产者和消费者模式,这种模式在多线程编程时还是用的比较多吧,下面是源代码: 1 #!/usr/bin/python 2 # -*- coding: utf-8 -*- 3 import requests,time 4 import threading,queue 5 6 7 class mythread_1(threading.Thread): 8 def __init__(self,queue): 9 super(mythread_1,self).__init__

python 生产者与消费者模式

生产者与消费者模式 1. 队列 先进先出 2. 栈 先进后出 Python的Queue模块中提供了同步的.线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue.这些队列都实现了锁原语(可以理解为原子操作,即要么不做,要么就做完),能够在多线程中直接使用.可以使用队列来实现线程间的同步. 用FIFO队列实现上述生产者与消费者问题的代码如下: import threading import time from q

Java并发编程(4)--生产者与消费者模式介绍

一.前言 这种模式在生活是最常见的,那么它的场景是什么样的呢? 下面是我假象的,假设有一个仓库,仓库有一个生产者和一个消费者,消费者过来消费的时候会检测仓库中是否有库存,如果没有了则等待生产,如果有就先消费直至消费完成:而生产者每天的工作就是先检测仓库是否有库存,如果没有就开始生产,满仓了就停止生产等待消费,直至工作结束.下图是根据假象画的流程图: 那么在程序中怎么才能达到这样的效果呢?下面介绍三种方式实现. 二.使用notify() 和 wait()实现 相信大家这两个方法都不陌生,它是Obj

实现生产者与消费者模式

实现生产者与消费者模式 目录 生产者与消费者模式 实现 生产者与消费者模式 什么是生产者消费者模式 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题.生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力. 这个阻塞队列就是用来给生产者和消费者解耦的.纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式

多线程之生产者和消费者模式

package com.git.base.thread.productandconsumer; /** * 核心实现: * 生产者消费者模式: * 生产一个,消费一个,如果生产未被消费,那么就等待消费后再生产 * 如果消费后,没有下一个生产的,就等待生产后在消费 * <p>Title: DoMain.java</p> * <p>Description: </p> * <p>Copyright: Copyright (c) 2016</p&g