我们这里介绍的Producer-Consumer生产者-消费者模式是多线程设计模式中很著名的一个设计模式。说到生产者消费者问题,大部分人都不会陌生,OS课的经典问题,并且其本身就是一个计算机编程中常见的问题。对于它的应用,可以举出无数的例子,小到一个多线程程序对队列的共享互斥操作,大到目前流行的中间件产品,诸如BEA的BMQ(BEA Message Queue),IBM的MQ Serious等中间件就是将生产者消费者问题应用通用化体系化的结果。
实际上,生产者消费者模式跟我们之前的多线程设计模式Guarded Suspension模式的PPL和C++实例非常相似。在计算机科学中,我们将生产者消费者问题描述为:
一群生产者在生产消息,并将此消息提供给消费者去消费。它们中间设了具有N个缓存区的缓冲池,生产者每次可将生产的消息放入一个缓存区内,消费者每次可将一个缓存区内的消息拿出来消费。但这个过程有两个条件:
(1)任何一方操作一个缓冲区时不能有其它同时对该缓冲区进行操作;
(2)只有当缓冲区还有空余,生产者才能生产,只有当缓冲区至少有一个产品,消费者才能从中取出来消费。
这里两个条件分别对应了互斥和同步。
大家可以发现,生产者消费者模式就是在Guarded Suspension模式的基础上作了更多的限制,Guarded Suspension模式不限制缓冲区的容量,随时都可以向缓冲区添加新的消息,但是生产者消费者模式对缓冲区的容量作了限制,当缓冲区满了之后,就不能了在向其中添加消息了。简单来说,Producer Consumer模式就像是加上了双重防护与等待的Guarded Suspension模式,而它的两个防护与等待的条件洽好相反,我们将Guarded Suspension模式修改一下,就可以实现生产者消费者模式。
// ProducerConsumer.cpp : 定义控制台应用程序的入口点。
using namespace Concurrency;
using namespace std;
class Request
{
public:
Request(unsigned int nID)
: m_nID(nID)
{
}
unsigned int getID()
{
return m_nID;
}
private:
unsigned int m_nID;
};
// 请求队列
// 客户端将请求添加进入这个队列
// 服务器从这个队列中获取来自客户端的请求进行处理
class RequestQueue
{
public:
// 获取请求,如果当前请求队列为空,则进行等待
Request getRequest()
{
bool bWait = true;
while( bWait )
{
// 如果缓冲区中没有请求,则进行等待,
wait(500);
cs.lock();
bWait = m_Queue.size() > 0 ? false : true;
cs.unlock();
}
cs.lock();
Request req = m_Queue.front();
m_Queue.pop();
wait(1000);
cout<<"GET:"< cs.unlock();
return req;
}
// 将请求加入请求队列中
void setRequest(Request req)
{
bool bWait = false;
do
{
// 限制缓冲区的大小,这是生产者消费者模式比Guarded Suspension模式多添加的一个保护
cs.lock();
bWait = m_Queue.size() < 5 ? false : true;
if(bWait)
{
cout<<"Wait..."< wait(500);
}
cs.unlock();
}
while(bWait);
cs.lock();
m_Queue.push(req);
cout<<"SET:"< cs.unlock();
}
private:
critical_section cs; // 为了保护队列访问的临界区
queue m_Queue;
};
int _tmain(int argc, _TCHAR* argv[])
{
srand((int)time(NULL));
RequestQueue queue;
// 模拟多个客户端的请求,也就是生产者线程
auto request = make_task( [&]()
{
int nCount = 0;
while(nCount<20)
{
wait(rand()%10);
queue.setRequest(Request(nCount));
++nCount;
}
});
// 模拟客户端的处理,也就是消费者线程
auto process = make_task( [&]()
{
while(true)
queue.getRequest();
});
// 执行请求和处理的线程
task_group tg;
tg.run(process);
tg.run_and_wait(request);
return 0;
}
在这个例子中,生产者线程不断产生请求,向RequestQueue中添加请求,当RequestQueue中的请求大于5时,就等待,不再向其中添加请求,而消费者线程不断从RequestQueue中获得请求进行处理,当queue中没有请求时,它就进行等待。生产者消费者模式简单而有效,被得到广泛应用。