使用BlockingQueue的生产者消费者模式

BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。使用场景。

首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:

通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;在生产者消费者模式中,通过队列的方式可以很方便的实现两者之间的数据共享。强大的BlockingQueue使我们不用关心什么时候需要阻塞线程,什么时候需要唤醒线程。

BlockingQueue的核心方法:

放入数据:

  offer(anObject) 如果BlockingQueue可以容纳,返回为true,否则返回false.

  offer(E o,long timeout,TimeUnit unit),设置等待时间,如果指定时间内,还不能往队列中加入BlockingQueue,则返回失败。

  put(anObject)把anObject加到BlockingQueue中,如果BlockQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续。

获取数据:
  poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
    取不到时返回null;
  poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
    队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
  take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
    BlockingQueue有新的数据被加入; 
  drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数), 
    通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

测试代码:

package BlockingQueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BlockingQueueTest {
    public static void main(String args[]) throws InterruptedException{
        BlockingQueue<String> queue = new ArrayBlockingQueue(10);

        Producer producer1 = new Producer(queue);
        Producer producer2 = new Producer(queue);
        Producer producer3 = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        ExecutorService service = Executors.newCachedThreadPool();

        service.execute(producer1);
        service.execute(producer2);
        service.execute(producer3);
        service.execute(consumer);

        Thread.sleep(10 * 1000);
        producer1.stop();
        producer2.stop();
        producer3.stop();

        Thread.sleep(2000);
        // 退出Executor
        service.shutdown();
    }
}

生产者:

package BlockingQueue;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable{

    private volatile boolean      isRunning               = true;
    private BlockingQueue<String> queue;
    private static AtomicInteger  count                   = new AtomicInteger();
    private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;

    public Producer(BlockingQueue queue){
        this.queue = queue;
    }

    public void run(){
        String data = null;
        Random r = new Random();
        System.out.println("启动生产者线程");
        try{
            while(isRunning){
                System.out.println("正在生产数据.....");
                Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));

                data = "data:" + count.incrementAndGet();
                System.out.println("将数据:" + data + "放入队列...");
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
                    System.out.println("放入数据失败:" + data);
                }
            }
        }catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
        finally{
            System.out.println("退出生产者线程!");
        }
    }

    public void stop(){
        isRunning = false;
    }

}

消费者:

package BlockingQueue;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class Consumer implements Runnable{
     private BlockingQueue<String> queue;
     private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;

    public Consumer(BlockingQueue<String> queue){
        this.queue = queue;
    }

    public void run(){
        System.out.println("启动消费者线程:");
        Random r = new Random();
        boolean isRunning = true;
        try{
            while(isRunning){
                System.out.println("正从队列获取数据...");
                String data = queue.poll(2,TimeUnit.SECONDS);
                if(null != data){
                     System.out.println("拿到数据:" + data);
                     System.out.println("正在消费数据:" + data);
                     Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
                }else{
                    isRunning = false;
                }
            }
        }catch(InterruptedException e){
             e.printStackTrace();
             Thread.currentThread().interrupt();
        }finally{
            System.out.println("退出消费者线程!");
        }
    }
}

参考:http://wsmajunfeng.iteye.com/blog/1629354

时间: 2024-10-04 04:37:25

使用BlockingQueue的生产者消费者模式的相关文章

阻塞队列和生产者-消费者模式

阻塞队列提供了可阻塞的put和take方法.如果队列满了put将阻塞到有空间可用,如果队列为空,take将阻塞到有元素可用.队列可以是有界和无界的,无界的队列put将不会阻塞. 阻塞队列支持生产者消费者模式,该模式将找出需要完成的工作,和执行工作分开.生产者-消费者模式能简化开发过程,因为消除了生产者和消费者之间的代码依赖性,此外,该模式还将生产数据的过程和使用数据的过程解耦开来. 在基于阻塞队列构建的生产者-消费者设计中个,当数据生成时,生产者把数据放入队列,当消费者处理数据时,将从队列中获取

生产者消费者模式(吃包子例子)

生产者-消费者问题是一个经典的进程同步问 题,该问题最早由Dijkstra提出,用以演示他提出的信号量机制.在同一个进程地址空间内执行的两个线程生产者线程生产物品,然后将物品放置在一个空 缓冲区中供消费者线程消费.消费者线程从缓冲区中获得物品,然后释放缓冲区.当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费 者线程释放出一个空缓冲区.当消费者线程消费物品时,如果没有满的缓冲区,那么消费者线程将被阻塞,直到新的物品被生产出来. 生产者消费者模式是并发.多线程编程中经典的设计

Java 并发编程(四)阻塞队列和生产者-消费者模式

阻塞队列 阻塞队列提供了可阻塞的 put 和 take 方法,以及支持定时的 offer 和 poll 方法.如果队列已经满了,那么put方法将阻塞直到有空间可以用:如果队列为空,那么take方法将一直阻塞直到有元素可用.队列可以使有界的,也可以是无界的,无界队列永远都不会充满,因此无界队列上的put方法永远不会阻塞.一种常见的阻塞生产者-消费者模式就是线程池与工作队列的组合,在 Executor 任务执行框架中就体现了这种模式. 意义:该模式能简化开发过程,因为他消除了生产者和消费者类之间的代

多线程的并发执行应用(生产者消费者模式)

在实际的开发中我们为了提高CPU的利用率,也提高程序的执行效率,我们经常使用多线程进行对数据进行并发处理,下面我举一个多线程并发执行的实例,大致意思就是 一个简单的生产者消费者模式,二个线程进行存数据,一个线程进行取数据. import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueTest { /** * @param a

多线程之生产者消费者模式

最近在项目中需要使用使用多线程实现一种功能,和生产者消费者模式类似,因此,学习了下生产者消费者模式的多线程实现.在生产者消费者模式中,通常有两类线程, 即若干个生产者线程和若干个消费者线程.生产者线程负责提交用户请求,消费者线程则负责处理生产者提交的任务.生产者和消费者之间则通过共享内存缓冲区进行通信. 在这里我们选择BlockingQueue做为共享内存缓冲区. 首先,我们构建生产者生产的,和消费者需要处理的数据PCData,即相关任务数据. public class PCData { pri

java 多线程并发系列之 生产者消费者模式的两种实现

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题.该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度. 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式. 什么是生

Java多线程15:Queue、BlockingQueue以及利用BlockingQueue实现生产者/消费者模型

Queue是什么 队列,是一种数据结构.除了优先级队列和LIFO队列外,队列都是以FIFO(先进先出)的方式对各个元素进行排序的.无论使用哪种排序方式,队列的头都是调用remove()或poll()移除元素的.在FIFO队列中,所有新元素都插入队列的末尾. Queue中的方法 Queue中的方法不难理解,6个,每2对是一个也就是总共3对.看一下JDK API就知道了: 注意一点就好,Queue通常不允许插入Null,尽管某些实现(比如LinkedList)是允许的,但是也不建议. Blockin

Java并发(基础知识)—— 阻塞队列和生产者消费者模式

1.阻塞队列 BlockingQueue是线程安全的Queue版本,从它的名字就可以看出,它是一个支持阻塞的Queue实现:当向空BlockingQueue请求数据时,它会阻塞至BlockingQueue非空:当向一个已满BlockingQueue插入数据时,线程会阻塞至BlockingQueue可插入. BlockingQueue 的方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 fa

10 阻塞队列 &amp; 生产者-消费者模式

原文:http://www.cnblogs.com/dolphin0520/p/3932906.html 在前面我们接触的队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦.但是有了阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一