多线程编程之生产者消费者问题

  生产者-消费者问题(Producer-consumer problem),也称作有限缓冲问题(Bounded-buffer problem),是多线程领域的一个经典问题,可以描述为:两个或者更多个线程共享同一个缓冲区,其中一个或多个作为“生产者”会不断地向缓冲区中添加数据,另外的一个或者多个作为“消费者”从缓冲区中取走数据。这个问题的关键在于:要保证缓冲区满了之后“生产者”不能再继续添加数据,而缓冲区空了之后“消费者”不能再取走数据了。

  这个问题在多个“生产者”和“消费者”的情况下肯定要麻烦一点,所以先看一下只有一个“生产者”和一个“消费者”以及一个元素缓冲区的情况。这时候情况可以简化为:

  1. 从缓冲区中取走数据和向缓冲区中添加数据需要互斥操作保持同步,所以,这里需要用临界区或者互斥量来实现;
  2. 生产者要等待缓冲区“有空间”(由于这里缓冲区只有一个元素,所以等价于要等待缓冲区为空)才能添加数据,同样消费者也不能在缓冲区为空的时候取数据。这两个过程都需要事件或者信号量来通知进行。

  考虑好了上述两点要求,就可以设计出如下思路算法:

/* 针对生产者的算法1: */{
  WaitForSingleObject(hEmpty, INFINITE);
  WaitForSingleObject(hMutex, INIFINITE);
      /* 生产者的活动 */
  ReleaseMutex(hMutex);
  ReleaseSemaphore(hFull, 1, NULL);}

/* 针对消费者的算法1: */{
  WaitForSingleObject(hFull, INFINITE);
  WaitForSingleObject(hMutex, INIFINITE);
      /* 消费者的活动 */
  ReleaseMutex(hMutex);
  ReleaseSemaphore(hEmpty, 1, NULL);}

  当然,生产者和消费者的互斥操作不用hMutex而改用EnterCriticalSection临界区也是一样的。举一个实例看看如何应用生产者-消费者算法:

typedef struct _MESSAGE_QUEUE    /* 消息队列的数据结构 */
{
    int threadId;
    int msgType[MAX_NUMBER];
    int count;
    HANDLE hFull;
    HANDLE hEmpty;
    HANDLE hMutex;
}MESSAGE_QUEUE;

/* 发送消息,类似于“生产者” */
void send_mseesge(int threadId, MESSAGE_QUEUE* pQueue, int msg)
{
    assert(NULL != pQueue);

    if(threadId != pQueue->threadId)
        return;

    WaitForSingleObject(pQueue->hEmpty, INFINITE);
    WaitForSingleObject(pQueue->hMutex, INFINITE);
    pQueue->msgType[pQueue->count ++] = msg;
    ReleaseMutex(pQueue->hMutex);
    ReleaseSemaphore(pQueue->hFull, 1, NULL);
}

/* 接收消息,类似于“消费者” */
void get_message(MESSAGE_QUEUE* pQueue, int* msg)
{
    assert(NULL != pQueue && NULL != msg);

    WaitForSingleObject(pQueue->hFull, INFINITE);
    WaitForSingleObject(pQueue->hMutex, INFINITE);
    *msg = pQueue->msgType[pQueue->count --];
    ReleaseMutex(pQueue->hMutex);
    ReleaseSemaphore(pQueue->hEmpty, 1, NULL);
}

  搞清楚了一个生产者和一个消费者以及一个元素的缓冲区的简单模式,下面再看看如果消费者有两个而缓冲区可以容纳四个元素的情况:

//1生产者,2消费者,4缓冲区
#include <stdio.h>
#include <process.h>
#include <windows.h>

const int END_PRODUCE_NUMBER = 8;  // 生产产品个数
const int BUFFER_SIZE = 4;          // 缓冲区个数
int g_Buffer[BUFFER_SIZE];          // 缓冲池
int g_i, g_j;
CRITICAL_SECTION g_cs;    // 信号量与关键段
HANDLE g_hSemaphoreBufferEmpty, g_hSemaphoreBufferFull;

// 生产者线程函数
unsigned int __stdcall ProducerThreadFun(PVOID pM)
{
    for (int i = 1; i <= END_PRODUCE_NUMBER; i++)
    {
        // 等待“缓冲区有剩余空间”的信号!
        WaitForSingleObject(g_hSemaphoreBufferEmpty, INFINITE);

        // 互斥的访问缓冲区
        EnterCriticalSection(&g_cs);
        g_Buffer[g_i] = i;
        g_i = (g_i + 1) % BUFFER_SIZE;
        LeaveCriticalSection(&g_cs);

        // 通知消费者“缓冲区有数据”了!
        ReleaseSemaphore(g_hSemaphoreBufferFull, 1, NULL);
    }
    printf("生产者完成任务,线程结束运行\n");
    return 0;
}

// 消费者线程函数
unsigned int __stdcall ConsumerThreadFun(PVOID pM)
{
    while (true)
    {
        // 等待“缓冲区中有数据”的信号
        WaitForSingleObject(g_hSemaphoreBufferFull, INFINITE);

        // 互斥的访问缓冲区
        EnterCriticalSection(&g_cs);
        if (g_Buffer[g_j] == END_PRODUCE_NUMBER) // 结束标志
        {
            LeaveCriticalSection(&g_cs);
            // 通知其它消费者有新数据了(结束标志)
            ReleaseSemaphore(g_hSemaphoreBufferFull, 1, NULL);
            break;
        }
        g_j = (g_j + 1) % BUFFER_SIZE;
        LeaveCriticalSection(&g_cs);

        Sleep(50);     // some other work to do
        ReleaseSemaphore(g_hSemaphoreBufferEmpty, 1, NULL);      // 给缓冲区增加一个空间
    }

    return 0;
}
int main()
{
    InitializeCriticalSection(&g_cs);
    // 初始化信号量,一个记录有产品的缓冲区个数,另一个记录空缓冲区个数
    g_hSemaphoreBufferEmpty = CreateSemaphore(NULL, 4, 4, NULL);    // 指定缓冲区初始状态存在四个“剩余空间”
    g_hSemaphoreBufferFull  = CreateSemaphore(NULL, 0, 4, NULL);
    g_i = 0;
    g_j = 0;
    memset(g_Buffer, 0, sizeof(g_Buffer));

    const int THREADNUM = 3;
    HANDLE hThread[THREADNUM];
    //生产者线程
    hThread[0] = (HANDLE)_beginthreadex(NULL, 0, ProducerThreadFun, NULL, 0, NULL);
    //消费者线程
    hThread[1] = (HANDLE)_beginthreadex(NULL, 0, ConsumerThreadFun, NULL, 0, NULL);
    hThread[2] = (HANDLE)_beginthreadex(NULL, 0, ConsumerThreadFun, NULL, 0, NULL);
    WaitForMultipleObjects(THREADNUM, hThread, TRUE, INFINITE);
    for (int i = 0; i < THREADNUM; i++)
        CloseHandle(hThread[i]);

    // 销毁信号量和关键段
    CloseHandle(g_hSemaphoreBufferEmpty);
    CloseHandle(g_hSemaphoreBufferFull);
    DeleteCriticalSection(&g_cs);
    return 0;
}    

  上面的代码思路很简单,一个生产者就一直等待“缓冲区有剩余空间”这个信号,而两个消费者就一直等待“缓冲区有数据”这个信号就行了!操作缓冲区的时候采取Mutex互斥操作防止冲突。注意,在创建信号量的初期要指定初始信号量的个数,这个信号量个数决定了缓冲区的大小。每一次WaitForSingleObject都会使信号量减一,而每一次ReleaseSemaphore都会使信号量加一。

  

小结:

  “生产者-消费者”问题只需考虑两个方面即可:

  1. 生产者和消费者要对缓冲区互斥操作
  2. 生产者要等待“缓冲区有空间”这个信号才能添加数据,消费者要等待“缓冲区有数据”这个信号才能取出数据。
时间: 2024-10-07 01:27:45

多线程编程之生产者消费者问题的相关文章

多线程中的生产者消费者Java源代码(带注释)

同步解决了线程中数据存取不一致的问题,而Object类中的等待与唤醒方法解决了重复存取的问题 以下的生产者消费者Java源代码例子,很好的说明了这一点. 其中包括Info类.Producter类.Consumer类.Test类. <1> Info类如下: 1 package per.producterconsumer; 2 3 public class Info { 4 5 private boolean flag = true; 6 /* 7 * flag=true 表示此时可以生产,但不能

Linux线程编程之生产者消费者问题

前言 本文基于顺序循环队列,给出Linux生产者/消费者问题的多线程示例,并讨论编程时需要注意的事项.文中涉及的代码运行环境如下: 本文假定读者已具备线程同步的基础知识. 一  顺序表循环队列 1.1 顺序循环队列定义 队列是一种运算受限的先进先出线性表,仅允许在队尾插入(入队),在队首删除(出队).新元素入队后成为新的队尾元素,元素出队后其后继元素就成为队首元素. 队列的顺序存储结构使用一个数组和两个整型变量实现,其结构如下: 1 struct Queue{ 2 ElemType elem[M

java 多线程 22 :生产者/消费者模式 进阶 利用await()/signal()实现

java多线程15 :wait()和notify() 的生产者/消费者模式 在这一章已经实现了  wait/notify 生产消费模型 利用await()/signal()实现生产者和消费者模型 一样,先定义一个缓冲区: public class ValueObject { public static String value = ""; } 换种写法,生产和消费方法放在一个类里面: public class ThreadDomain41 extends ReentrantLock {

5 并发编程--队列&amp;生产者消费者模型

1.队列的介绍 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的 创建队列的类(底层就是以管道和锁定的方式实现): Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递. 参数介绍: maxsize是队列中允许最大项数,省略则无大小限制. 但需要明确: 1.队列内存放的是消息而非大数据 2.队列占用的是内存空间,因而maxsize即便

Java多线程14:生产者/消费者模型

什么是生产者/消费者模型 一种重要的模型,基于等待/通知机制.生产者/消费者模型描述的是有一块缓冲区作为仓库,生产者可将产品放入仓库,消费者可以从仓库中取出产品,生产者/消费者模型关注的是以下几个点: 1.生产者生产的时候消费者不能消费 2.消费者消费的时候生产者不能生产 3.缓冲区空时消费者不能消费 4.缓冲区满时生产者不能生产 生产者/模型作为一种重要的模型,它的优点在于: 1.解耦.因为多了一个缓冲区,所以生产者和消费者并不直接相互调用,这一点很容易想到,这样生产者和消费者的代码发生变化,

并发编程 之 生产者消费者模型

1 什么是生产者消费者模型 生产者:比喻的是程序中负责产生数据的任务 消费者:比喻的是程序中负责处理数据的任务 生产者->共享的介质(队列)<-消费者 2 为何用 实现了生产者与消费者的解耦和,生产者可以不停地生产,消费者也可以不停地消费 从而平衡了生产者的生产能力与消费者消费能力,提升了程序整体运行的效率 什么时候用? 当我们的程序中存在明显的两类任务,一类负责产生数据,另外一类负责处理数据 此时就应该考虑使用生产者消费者模型来提升程序的效率 from multiprocessing imp

多线程编程之生产者和消费者之间的问题

前段时间没事研究了一些生产者和消费者之间的问题,期间也查看了不少资料.又重新有了新的认识.特别作为一个IT农民工,必须要掌握的技能啊. 个人理解,这个应该说是一种模型吧,学会它,可以应用到多个方面的技术上去.数据流文件的读写,程序中的数据缓冲技术,播放缓冲技术等等. 废话不多说...直接上代码.下面是个C# 写的代码.比较粗糙,请谅解,有问题大家可以一起讨论研究. 1 using System; 2 using System.Threading; 3 4 namespace Consumer 5

java多线程--“朴素版”生产者消费者问题

1. 生产/消费者模型 生产/消费者问题是个非常典型的多线程问题,涉及到的对象包括"生产者"."消费者"."仓库"和"产品".他们之间的关系如下: (01) 生产者仅仅在仓储未满时候生产,仓满则停止生产. (02) 消费者仅仅在仓储有产品时候才能消费,仓空则等待. (03) 当消费者发现仓储没产品可消费时候会通知生产者生产. (04) 生产者在生产出可消费产品时候,应该通知等待的消费者去消费. 2. 生产/消费者实现 下面通过

java中的多线程的实现生产者消费者模式

public class TestAccount { public static void main(String[] args) { Account account = new Account(); account.setAccount("116854398"); account.setBalance(10); Thread t1 = new Wife(account, TestAccount.class); Thread t2 = new Husband(account, Test