线程同步之生产者消费者

前言:

  前面因时间关系,未将“生产者消费者问题”实例的介绍发布在博客随笔中,故本文作为对之前“多线程”一文的补充。
概念:
  生产者消费者问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。这个案例中主要实现的是两个角色协同对同一资源进行访问。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据

设计:本博客前面关于多线程的文章中已讲述过一个实例。本文的思路跟之前差不多,不过这次引入了一个存储资源的队列,并且需要设定2个用于信号传递的条件(Condition)。其实现过程见如下简图:

代码实现

package com.gdufe.thread.consumer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConsumerProducer {
  private static Buffer buffer = new Buffer();

  public static void main(String[] args) {
    // Create a thread pool with two threads
    ExecutorService executor = Executors.newFixedThreadPool(2);
    executor.execute(new ProducerTask());
    executor.execute(new ConsumerTask());
    executor.shutdown();
  }

  // A task for adding an int to the buffer
  private static class ProducerTask implements Runnable {
    public void run() {
      try {
        int i = 1;
        while (true) {
          System.out.println("Producer writes " + i);
          buffer.write(i++); // Add a value to the buffer
          // Put the thread into sleep
          Thread.sleep((int)(Math.random() * 10000));
        }
      } catch (InterruptedException ex) {
        ex.printStackTrace();
      }
    }
  }

  // A task for reading and deleting an int from the buffer
  private static class ConsumerTask implements Runnable {
    public void run() {
      try {
        while (true) {
          System.out.println("\t\t\tConsumer reads " + buffer.read());
          // Put the thread into sleep
          Thread.sleep((int)(Math.random() * 10000));
        }
      } catch (InterruptedException ex) {
        ex.printStackTrace();
      }
    }
  }

  // An inner class for buffer
  private static class Buffer {
    private static final int CAPACITY = 1; // buffer size
    private java.util.LinkedList<Integer> queue =
      new java.util.LinkedList<Integer>();	//queue for storing data

    // Create a new lock
    private static Lock lock = new ReentrantLock();

    // Create two conditions
    private static Condition notEmpty = lock.newCondition();
    private static Condition notFull = lock.newCondition();

    public void write(int value) {
      lock.lock(); // Acquire the lock
      try {
        while (queue.size() == CAPACITY) {
          System.out.println("Wait for notFull condition");
          notFull.await();
        }

        queue.offer(value);
        notEmpty.signal(); // Signal notEmpty condition
      } catch (InterruptedException ex) {
        ex.printStackTrace();
      } finally {
        lock.unlock(); // Release the lock
      }
    }

    public int read() {
      int value = 0;
      lock.lock(); // Acquire the lock
      try {
        while (queue.isEmpty()) {
          System.out.println("\t\t\tWait for notEmpty condition");
          notEmpty.await();
        }

        value = queue.remove();
        notFull.signal(); // Signal notFull condition
      } catch (InterruptedException ex) {
        ex.printStackTrace();
      } finally {
        lock.unlock(); // Release the lock
        return value;
      }
    }
  }
}

测试结果

补充

在Java集合框架中,其实有“阻塞队列”这一概念。其特点具有同步方法,也就是说套用阻塞队列,我们可以通过简化上面的代码同样实现生产者消费者的线程同步问题。

参考代码如下:

 1 package com.gdufe.thread.consumer;
 2
 3 import java.util.concurrent.ArrayBlockingQueue;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6
 7 public class ConsumerProducerUsingBlockingQueue {
 8   private static ArrayBlockingQueue<Integer> buffer =
 9     new ArrayBlockingQueue<Integer>(2);
10
11   public static void main(String[] args) {
12     // Create a thread pool with two threads
13     ExecutorService executor = Executors.newFixedThreadPool(2);
14     executor.execute(new ProducerTask());
15     executor.execute(new ConsumerTask());
16     executor.shutdown();
17   }
18
19   // A task for adding an int to the buffer
20   private static class ProducerTask implements Runnable {
21     public void run() {
22       try {
23         int i = 1;
24         while (true) {
25           System.out.println("Producer writes " + i);
26           buffer.put(i++); // Add any value to the buffer, say, 1
27           // Put the thread into sleep
28           Thread.sleep((int)(Math.random() * 10000));
29         }
30       } catch (InterruptedException ex) {
31         ex.printStackTrace();
32       }
33     }
34   }
35
36   // A task for reading and deleting an int from the buffer
37   private static class ConsumerTask implements Runnable {
38     public void run() {
39       try {
40         while (true) {
41           System.out.println("\t\t\tConsumer reads " + buffer.take());
42           // Put the thread into sleep
43           Thread.sleep((int)(Math.random() * 10000));
44         }
45       } catch (InterruptedException ex) {
46         ex.printStackTrace();
47       }
48     }
49   }
50 }
时间: 2024-08-24 23:14:12

线程同步之生产者消费者的相关文章

JAVA_线程同步_生产者消费者问题

1 public class ProducerConsumer { 2 public static void main(String[] args) { 3 SyncStack ss = new SyncStack(); 4 Producer p = new Producer(ss); 5 Consumer c = new Consumer(ss); 6 new Thread(p).start(); 7 new Thread(p).start(); 8 new Thread(p).start()

linux中的线程同步:生产者、消费者问题

#include <stdio.h> #include <semaphore.h> #include <unistd.h> #include <stdlib.h> #include <pthread.h> #define BUFFER_COUNT 5 int Buffer[BUFFER_COUNT]; //指针数组 int front = 0; int tail = 0; sem_t SemProd; sem_t SemCon; void* pr

Linux线程编程之生产者消费者问题

前言 本文基于顺序循环队列,给出Linux生产者/消费者问题的多线程示例,并讨论编程时需要注意的事项.文中涉及的代码运行环境如下: 本文假定读者已具备线程同步的基础知识. 一  顺序表循环队列 1.1 顺序循环队列定义 队列是一种运算受限的先进先出线性表,仅允许在队尾插入(入队),在队首删除(出队).新元素入队后成为新的队尾元素,元素出队后其后继元素就成为队首元素. 队列的顺序存储结构使用一个数组和两个整型变量实现,其结构如下: 1 struct Queue{ 2 ElemType elem[M

Java线程同步模型-生产者与消费者

Java生产者与消费者模型是经典Java线程同步模型,涉及使用同步锁控制生产者线程和消费者线程同步运行问题.同步对象是仓库资源,生产者线程生产向仓库中生产商品,消费者线程从仓库中消费商品,当生产者线程生产的商品达到仓库的90%时,生产者线程停止生产并通知消费者线程开始消费,当消费者线程消耗到仓库的10%时,消费者线程停止消费并通知生产者线程恢复生产,如此循环往复过程. 如下图解: T1时刻分析 T2时刻 T3时刻 T4时刻 上图的分析,T3同T1时刻相同场景,T2同T2时刻相同场景,程序如此循环

Linux多线程之同步2 &mdash;&mdash; 生产者消费者模型

思路 生产者和消费者(互斥与同步).资源用队列模拟(要上锁,一个时间只能有一个线程操作队列). m个生产者.拿到锁,且产品不满,才能生产.当产品满,则等待,等待消费者唤醒.当产品由空到不空,通知消费者.n个消费者.拿到锁,且有产品,才能消费.当产品空,则等待,等待生产者唤醒.当产品由满到不满,通知生产者.    生产者条件:队列不满消费者条件:队列不空因此有两个条件变量. 代码 /**********************************************************

基于线程实现的生产者消费者模型(Object.wait(),Object.notify()方法)

需求背景 利用线程来模拟生产者和消费者模型 系统建模 这个系统涉及到三个角色,生产者,消费者,任务队列,三个角色之间的关系非常简单,生产者和消费者拥有一个任务队列的引用,生产者负责往队列中放置对象(id),消费者负责从队列中获取对象(id),其关联关系如下 方案1 因为是多线程操作,所以对任务的存取都要使用线程同步加锁机制,看一下我们的TaskQueue类,两个主方法都加了synchronized修饰,这就意味着,一个时间点只可能有一个线程对这个方法进行操作 TaskQueue类代码 [java

python 线程同步:生产/消费者模式

Python中的Queue对象提供了对线程同步的支持,使用queue对象可以实现多生产者和多消费者形成的先进先出的队列. 每个生产者将数据放入队列,而每个消费者依次从队列中取出数据. # coding:utf-8 import threading,time,Queue class Producer(threading.Thread):     def __init__(self,threadname):         threading.Thread.__init__(self,name=th

进程_线程 之(五) --- 生产者消费者

同步锁 acquire([timeout])/release():  调用关联的锁的相应方法. wait([timeout]):   调用这个方法将使线程进入Condition的等待池等待通知,并释放锁.使用前线程必须已获得锁定,否则将抛出异常. notify():   调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池):其他线程仍然在等待池中.调用这个方法不会释放锁定.使用前线程必须已获得锁定,否则将抛出异常. notifyAll()

11.python并发入门(part8 基于线程队列实现生产者消费者模型)

一.什么是生产者消费者模型? 生产者就是生产数据的线程,消费者指的就是消费数据的线程. 在多线程开发过程中,生产者的速度比消费者的速度快,那么生产者就必须等待消费者把数据处理完,生产者才会产生新的数据,相对的,如果消费者处理数据的速度大于生产者,那么消费者就必须等待生产者. 为了解决这种问题,就有了生产者消费者模型. 生产者与消费者模型,是通过一个容器,来解决生产者和消费者之间的耦合性问题,生产者和消费者之间并不会直接通信,这样生产者就无需等待消费者处理完数据,生产者可以直接把数据扔给队列,这个