java多线程(九)阻塞队列

转载请注明出处:http://blog.csdn.net/xingjiarong/article/details/48005091

前边的博客中我们介绍了如果用对象锁和条件锁以及更加方便的synchronized关键字来实现多线程的同步和互斥,也许你会觉得使用synchronized关键字已经非常方便了,但是使用者必须真正的理解synchronized的用法,而且要有一定的多线程的编程的经验,否则很难做到全面的考虑问题而造成意想不到的问题。其实在java中还有比synchronized关键字更加方便的途径来实现同步和互斥,并且不需要考虑复杂的问题,那就是使用阻塞队列,这一次我们就来讨论一下阻塞队列的使用。

例如我们前边经常举的例子——银行转账。我们每次写这个程序,都要考虑一大堆问题,例如两个线程之间应该怎么互斥,金额不足了怎么办等等。其实我们可以通过一个队列优雅且安全的解决这个问题。转账线程将转账指令对象插入到一个队列中,而不是直接访问银行对象。另一个线程从队列中取出指令执行转账。因为只有该线程可以访问该银行对象的内部方法,因此不需要同步和互斥。

阻塞队列是指,当试图向队列中添加元素而队列已满时,或者想从队列中移出一个元素而队列为空时,执行该操作的线程会被阻塞。在协调多个线程工作时,阻塞队列是一个有用的工具,工作者线程可以周期的将中间结果存储在队列中,其他的线程移出中间结果并进行进一步的处理。队列会进行自动的负载平衡,当队列满时,向队列中添加元素的线程会被阻塞,当队列为空时,试图从队列中取出元素的线程会被阻塞。

java中主要有以下几种阻塞队列:

1、ArrayBlockingQueue

这种队列用循环数组实现的,有两种构造方法:

1)带有指定容量的阻塞队列

ArrayBlockingQueue(int capacity)

2)带有指定容量和公平性设置的阻塞队列

ArrayBlockingQueue(int capacity,boolean fair)

公平性的阻塞队列在唤醒时会优先考虑等待时间最长的线程,但是这并不保证绝对的公平,因为公平的阻塞队列会降低效率,所以通常不采用,默认情况下是非公平的阻塞队列。

2、LinkedBlockingQueue

这种队列使用链表实现,有以下两种构造方法:

1)构造一个无上限的阻塞队列

LinkedBlockingQueue();

2)构造一个带上限的阻塞队列

LinkedBlockingQueue(int capacity);

3、LinkedBlockingDeque

这种队列的构造方法和实现原理都可第二种一样,他们两个之间的区别就是LinkedBlockingDeque是一种双向的队列。

4、DelayQueue

构造一个包含Delayed元素的无界的阻塞时间有限的阻塞队列。只有那些延时超多时间的元素可以从队列中移走。

构造方法:DelayQueue();

5、PriorityBlockingQueue

优先级阻塞队列,优先级高的先从队列中移出。这种队列使用堆实现,有以下三种构造方法:

PriorityBlockingQueue();
PriorityBlockingQueue(int initialCapacity);
PriorityBlockingQueue(int initialCapacity,Comparator<? super E> compaator);

initialCapacity:优先队列的初始容量。默认值是11.

comparator 用来对元素进行比较的比较器,如果没有指定,则元素必须实现Comparable接口。

阻塞队列的常用方法

方法 正常动作 特殊情况下的动作
add 添加一个元素 如果队列为满,则抛出IllegalStateException异常
remove 移出并返回头元素 如果队列为空,则抛出NoSuchElementException异常
put 添加一个元素 如果队列为满,则阻塞
take 移出并返回头元素 如果队列为空,则阻塞
peek 返回队列的头元素 如果队列为空,则返回null
poll 移出并返回队列的头元素 如果队列为空,则返回null
element 返回队列的头元素 如果队列为空,则抛出NoSuchElementException异常
offer 添加一个元素并返回true 如果队列满,则返回false

使用阻塞队列实现生产者消费者问题:

生产者消费者问题是经典的线程同步问题,生产者和消费者共用一个缓冲区,可以看做仓库,生产者生产产品放入仓库,消费者从仓库中取出产品进行消费,当仓库为空时,消费者阻塞,当仓库为满,生产者阻塞。

import java.util.concurrent.ArrayBlockingQueue;

public class ProductThread implements Runnable {

    private ArrayBlockingQueue<Integer> queue;

    public ProductThread(ArrayBlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    public void run(){
        while(true){
            try {
                int product = (int)(Math.random()*10);
                /*
                 * 队列满时会阻塞
                 */
                queue.put(product);
                System.out.println(Thread.currentThread()+"生产了产品——"+product+" 剩余元素:"+queue.size());
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}
import java.util.concurrent.ArrayBlockingQueue;

public class ConsumeThread implements Runnable {

    private ArrayBlockingQueue<Integer> queue;

    public ConsumeThread(ArrayBlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    public void run(){
        while(true){
            try {
                /*
                 * 队列空时会阻塞
                 */
                int product = queue.take();
                System.out.println(Thread.currentThread()+"消费了元素——"+product+" 剩余空间:"+queue.size());
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}
import java.util.concurrent.ArrayBlockingQueue;

public class Main {

    public static void main(String[] args) {
        //大小为10的循环数组阻塞队列
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);

        /*
         * 用5个生产者线程来生产产品
         */
        for(int i=0;i<5;i++){
            new Thread(new ProductThread(queue)).start();
        }

        /*
         * 用2个消费者线程来消费产品
         */
        for(int i=0;i<2;i++){
            new Thread(new ConsumeThread(queue)).start();
        }
    }

}

运行结果:

Thread[Thread-2,5,main]生产了产品——4 剩余元素:4

Thread[Thread-5,5,main]消费了元素——1 剩余空间:3

Thread[Thread-1,5,main]生产了产品——1 剩余元素:3

Thread[Thread-6,5,main]消费了元素——1 剩余空间:3

Thread[Thread-0,5,main]生产了产品——0 剩余元素:3

Thread[Thread-4,5,main]生产了产品——1 剩余元素:3

Thread[Thread-3,5,main]生产了产品——9 剩余元素:4

Thread[Thread-0,5,main]生产了产品——2 剩余元素:5

Thread[Thread-1,5,main]生产了产品——9 剩余元素:5

Thread[Thread-4,5,main]生产了产品——6 剩余元素:7

Thread[Thread-2,5,main]生产了产品——2 剩余元素:7

Thread[Thread-3,5,main]生产了产品——1 剩余元素:7

Thread[Thread-5,5,main]消费了元素——4 剩余空间:6

Thread[Thread-6,5,main]消费了元素——0 剩余空间:7

Thread[Thread-0,5,main]生产了产品——0 剩余元素:7

Thread[Thread-4,5,main]生产了产品——7 剩余元素:8

Thread[Thread-3,5,main]生产了产品——5 剩余元素:9

Thread[Thread-1,5,main]生产了产品——4 剩余元素:10

Thread[Thread-5,5,main]消费了元素——9 剩余空间:9

Thread[Thread-2,5,main]生产了产品——9 剩余元素:10

Thread[Thread-6,5,main]消费了元素——2 剩余空间:9

Thread[Thread-1,5,main]生产了产品——4 剩余元素:10

Thread[Thread-0,5,main]生产了产品——3 剩余元素:10

Thread[Thread-6,5,main]消费了元素——2 剩余空间:9

Thread[Thread-4,5,main]生产了产品——5 剩余元素:10

Thread[Thread-5,5,main]消费了元素——9 剩余空间:10

Thread[Thread-3,5,main]生产了产品——9 剩余元素:10

Thread[Thread-5,5,main]消费了元素——6 剩余空间:10

Thread[Thread-6,5,main]消费了元素——1 剩余空间:10

Thread[Thread-2,5,main]生产了产品——2 剩余元素:10

Thread[Thread-0,5,main]生产了产品——1 剩余元素:10

Thread[Thread-5,5,main]消费了元素——0 剩余空间:10

Thread[Thread-4,5,main]生产了产品——4 剩余元素:10

Thread[Thread-6,5,main]消费了元素——7 剩余空间:10

Thread[Thread-5,5,main]消费了元素——5 剩余空间:10

从结果上我们可以看出,刚开始阻塞队列为空,多个生产线程同时生产,当有了产品之后,两个消费者开始消费,因为生产者的数量较多,所以队列逐渐填满当队列满时,生产者线程阻塞,只能等待消费者消费一个产品后再生产,所以最后出现消费一个产品,生产一个产品的结果。

源码下载:http://download.csdn.net/detail/xingjiarong/9052057

版权声明:本文为博主原创文章,转载请注明出处,查看原文章,请访问:http://blog.csdn.net/xingjiarong

时间: 2024-10-20 14:35:12

java多线程(九)阻塞队列的相关文章

多线程编程学习六(Java 中的阻塞队列).

介绍 阻塞队列(BlockingQueue)是指当队列满时,队列会阻塞插入元素的线程,直到队列不满:当队列空时,队列会阻塞获得元素的线程,直到队列变非空.阻塞队列就是生产者用来存放元素.消费者用来获取元素的容器. 当线程 插入/获取 动作由于队列 满/空 阻塞后,队列也提供了一些机制去处理,或抛出异常,或返回特殊值,或者线程一直等待... 方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出 插入方法 add(e) offer(e) put(e) offer(e, timeout, unit

Java中的阻塞队列

1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 阻塞队列提供了四种处理方法: 方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出 插入方法 add(e) offer(e) put

聊聊并发(七)——Java中的阻塞队列

1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 阻塞队列提供了四种处理方法: 方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出 插入方法 add(e) offer(e) put

聊聊并发(七)Java中的阻塞队列

什么是阻塞队列 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 阻塞队列提供了四种处理方法: 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Queue

深入剖析java并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue

关联文章: 深入理解Java类型信息(Class对象)与反射机制 深入理解Java枚举类型(enum) 深入理解Java注解类型(@Annotation) 深入理解Java类加载器(ClassLoader) 深入理解Java并发之synchronized实现原理 Java并发编程-无锁CAS与Unsafe类及其并发包Atomic 深入理解Java内存模型(JMM)及volatile关键字 剖析基于并发AQS的重入锁(ReetrantLock)及其Condition实现原理 剖析基于并发AQS的共

Java多线程之阻塞I/O如何中断

阻塞的I/O线程在关闭线程时并不会被打断,需要关闭资源才能打断.1.执行socketInput.close();阻塞可中断.2.执行System.in.close();阻塞没有中断. package Thread.Interrupting; import java.io.IOException; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.co

JAVA并发之阻塞队列浅析

背景 因为在工作中经常会用到阻塞队列,有的时候还要根据业务场景获取重写阻塞队列中的方法,所以学习一下阻塞队列的实现原理还是很有必要的.(PS:不深入了解的话,很容易使用出错,造成没有技术深度的样子) 阻塞队列是什么? 要想了解阻塞队列,先了解一下队列是啥,简单的说队列就是一种先进先出的数据结构.(具体的内容去数据结构里学习一下)所以阻塞队列就是一种可阻塞的队列.和普通的队列的不同就体现在 ”阻塞“两个字上.阻塞是啥意思? 百度看一下 在软件工程里阻塞一般指的是阻塞调用,即调用结果返回之前,当前线

Java多线程——线程阻塞工具类LockSupport

简述 LockSupport 是一个非常方便实用的线程阻塞工具,它可以在线程内任意位置让线程阻塞. 和 Thread.suspend()相比,它弥补了由于 resume()在前发生,导致线程无法继续执行的情况. 和 Object.wait()相比,它不需要先获得某个对象的锁,也不会抛出 InterruptedException 异常. LockSupport 的静态方法 park()可以阻塞当前线程,类似的还有 parkNanos().parkUntil()等方法.它们实现了一个限时等待,如下图

【java并发】阻塞队列的使用

在前面一篇名为条件阻塞Condition的应用的博客中提到了一个拔高的例子:利用Condition来实现阻塞队列.其实在java中,有个叫ArrayBlockingQueue<E>的类提供了阻塞队列的功能,所以我们如果需要使用阻塞队列,完全没有必要自己去写. ArrayBlockingQueue<E>实现了BlockingQueue<E>,另外还有LinkedBlockingQueue<E>和PriorityBlockingQueue<E>.Ar

多线程之阻塞队列ArrayBlockingQueue,BlockingQueue

ArrayBlockingQueue是个有数组支持的有界的阻塞队列.该队列按照先进先出FIFO的原理对元素排序,插入新元素市场队列的尾部,获取新元素是操作队列的开始处.一旦见了建立了缓存区,就不能再增加其容量,试图从已满的队列中方式元素会导致操作阻塞:试图从空的队列中提取元素将导致阻塞. 提拱了四种方法,只有put(),take()才会发生阻塞. 下面是阻塞队列的例子. package andy.thread.test; import java.util.concurrent.ArrayBloc