BlockingQueue学习

引言

在Concurrent包中,BlockingQueue很好的解决了在多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。同时,BlockingQueue也用于java自带线程池的缓冲队列中,了解BlockingQueue也有助于理解线程池的工作模型。

一 BlockingQueue接口

该接口属于队列,所以继承了Queue接口,该接口最重要的五个方法分别是offer方法,poll方法,put方法,take方法和drainTo方法。

offer方法和poll方法分别有一个静态重载方法,分别是offer(E e, long timeout, TimeUnit unit)和poll(long timeout, TimeUnit unit)方法。其意义是在限定时间内存入或取出对象,如果不能存入取出则返回false。

put方法会在当队列存储对象达到限定值时阻塞线程,而在队列不为空时唤醒被take方法所阻塞的线程。take方法是相反的。

drainTo方法可批量获取队列中的元素。

二 常见的BlockingQueue实现

一 LinkedBlockingQueue

LinkedBlockingQueue是比较常见的BlockingQueue的实现,他是基于链表的阻塞队列。在创建该对象时如果不指定可存储对象个数大小时,默认为Integer.MAX_VALUE。当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时,才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。

LinkedBlockingQueue内部使用了独立的两把锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

put方法和offer方法:

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
}

这两个方法的区别是put方法在容量达到上限时会阻塞,而offer方法则会直接返回false。

二 ArrayBlockingQueue

ArrayBlockingQueue是基于数组的阻塞队列,除了有一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue。ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。

三 SynchronousQueue

是一种没有缓冲的阻塞队列,在生产者put的同时必须要有一个消费者进行take,否则就会阻塞。声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:  如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。

四 PriorityBlockingQueue和DelayQueue

PriorityBlockingQueue是基于优先级的阻塞队列,该队列不会阻塞生产者,只会阻塞消费者。

DelayQueue队列存储的对象只有指定的延迟时间到了才能被取出,该队列也不会阻塞生产者。

三 BlockingQueue的使用

在处理多线程生产者消费者问题时的演示代码:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * Created by gavin on 15-8-30.
 */
public class BlockingQueueTest {

    public static void main(String[] args)
    {
        BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1000);
        Thread p1 = new Thread(new Producer(queue),"producer1");
        Thread p2 = new Thread(new Producer(queue),"producer2");
        Thread c1 = new Thread(new Consumer(queue),"consumer1");
        Thread c2 = new Thread(new Consumer(queue),"consumer2");

        p1.start();
        p2.start();
        c1.start();
        c2.start();
    }
}

class Producer implements Runnable{
    private BlockingQueue<String> queue;

    public Producer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    public void run() {
        int i = 0;
        while (!Thread.currentThread().isInterrupted())
        {
            try {
                queue.put(Thread.currentThread().getName()+" product "+i);
            } catch (InterruptedException e) {
                System.err.println(Thread.currentThread().getName() + " error");
            }
            i++;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {

            }
        }
    }
}
class Consumer implements Runnable{
    private BlockingQueue<String> queue;

    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    public void run() {
        int i = 0;
        while (!Thread.currentThread().isInterrupted())
        {
            try {
                String str = queue.take();
                System.out.println(str);
            } catch (InterruptedException e) {

            }
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {

            }
        }
    }
}

四 总结

BlockingQueue在并发编程中扮演着重要的角色,既可以自己用来解决生产者消费者问题,也用于java自带线程池的缓冲队列。

参考:http://wsmajunfeng.iteye.com/blog/1629354 BlockingQueue

时间: 2024-07-29 01:30:12

BlockingQueue学习的相关文章

&lt;Java&gt;&lt;学习路线图&gt;

Java是一个通用的编程语言,其实可以干很多事,怎么学Java就看怎么用了. 但有一些一般的步骤: 1.熟悉一种文本编辑器,比如Vim, Emacs, Notepad++, TextMate等.知道哪些是开源的,哪些是闭源的,哪些要收费.养成不用盗版软件的习惯.2. 安装JDK(建议用你的Linux发行版自带的软件包管理器安装openjdk,过程中可能需要读发行版特定的文档)3. 写一个Java的Hello world程序,并用命令行工具javac编译,再用java命令运行这个程序.过程中熟悉源

阿里内部Java多线程资料整理

目录: 1.volatile变量 2.Java并发编程学习 3.CountDownLatch用法 4.CyclicBarrier使用 5.BlockingQueue使用 6.任务执行器Executor7.CompletionService使用8.ConcurrentHashMap使用9.Lock使用 一. volatile变量 1.volatile原理:volatile的原理实际上是告诉处理器,不要把变量缓存在寄存器或者相对于其他处理器不可见的地方,而是把变量放在主存,每次读写操作都在主存上进行

[转载] java多线程学习-java.util.concurrent详解(四) BlockingQueue

转载自http://janeky.iteye.com/blog/770671 --------------------------------------------------------------------------------- 7.BlockingQueue     “支持两个附加操作的 Queue,这两个操作是:获取元素时等待队列变为非空,以及存储元素时等待空间变得可用.“ 这里我们主要讨论BlockingQueue的最典型实现:LinkedBlockingQueue 和Arra

Concurrent包学习之 BlockingQueue源码学习

上一篇学习了ExecutorService和其它相关类的源码,本篇要学习的BlockingQueue中的源码,as always,先上类图 其实继承(实现)的层次比较简单,我们只要需要先学习一下BlockingQueue中的方法: public interface BlockingQueue<E> extends Queue<E> { boolean add(E e);--往队列中插入一个对象,队列满了会抛异常 boolean offer(E e);--同上,区别是队列满了会返回f

BlockingQueue队列学习

今天看了下BlockingQueue的几种实现,记录下以便以后复习. 首先来看一下BlockingQueue的家族成员: BlockingQueue除了先进先出外,还有两个操作:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 阻塞队列提供了四种处理方法: 方法\处理方式 抛出异常 返回特殊值 一

多线程学习之BlockingQueue

前言: 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题.通过这些高效并且线程安全的队列 类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场 景. 认识BlockingQueue阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由队列的一端输入,从另

BlockingQueue之DelayQueue的学习使用

DelayQueue 是一中阻塞队列,需要实现接口Delayed定义的方法.做下使用记录和心得吧, @Datapublic class DelayQueueExample implements Delayed { private String beginTime; private String name; public static DateTimeFormatter dateTimeFormatter=DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:

20145326 《Java程序设计》第6周学习总结

20145326 <Java程序设计>第6周学习总结 教材学习内容总结 第十章 一.使用InputStream与OutputStream 1.串流设计的概念 想活用输入/输出API,一定要先了解Java中如何以串流抽象化输入/输出概念,以及InputStream.OutputStream继承架构.如此一来,无论标准输入/输出.文档输入/输出.网络输入/输出.数据库输入/输出都可以用一致的操作来处理.Java将输入/输出抽象化为串流,数据有来源及目的地,衔接两者的是串流对象,其实数据就好比水,串

Java核心知识点学习----多线程中的阻塞队列,ArrayBlockingQueue介绍

1.什么是阻塞队列? 所谓队列,遵循的是先进先出原则(FIFO),阻塞队列,即是数据共享时,A在写数据时,B想读同一数据,那么就将发生阻塞了. 看一下线程的四种状态,首先是新创建一个线程,然后,通过start方法启动线程--->线程变为可运行可执行状态,然后通过数据产生共享,线程产生互斥---->线程状态变为阻塞状态---->阻塞状态想打开的话可以调用notify方法. 这里Java5中提供了封装好的类,可以直接调用然后构造阻塞状态,以保证数据的原子性. 2.如何实现? 主要是实现Blo