java多线程-BlockingQueue

  • BlockingQueue简介

  ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。

  LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE,每次插入后都将动态地创建链接节点。

  PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素,依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。

  DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

  SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的。其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。

  • BlockingQueue内容

  BlockingQueue主要方法:

  抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用

  对于非阻塞队列,一般情况下建议使用offer、poll和peek三个方法,不建议使用add和remove方法。因为使用offer、poll和peek三个方法可以通过返回值判断操作成功与否,而使用add和remove方法却不能达到这样的效果。注意,非阻塞队列中的方法都没有进行同步措施。

  • BlockingQueue实现原理

  以ArrayBlockingQueue为例,查看其源代码,其中主要包含以下对象:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = -817911632652898426L;

    /** 数组对象,用于放置对象 */
    final Object[] items;

    /** put, offer, or add方法放入数组的索引 */
    int putIndex;

    /**  take, poll, peek or remove方法取出数据的数组索引 */
    int takeIndex;
    /** queue队列的总数 */
    int count;

    /**可重入锁,控制并发*/
    final ReentrantLock lock;
    /** 非空信号量,可以取数*/
    private final Condition notEmpty;
    /** 非满信号量,可以放数 */
    private final Condition notFull;
}

  下面主要介绍下put()和take()方法,来观察其同步的实现:

 1 public void put(E e) throws InterruptedException {
 2         checkNotNull(e);
 3         final ReentrantLock lock = this.lock;
 4         lock.lockInterruptibly();
 5         try {
 6             while (count == items.length)
 7                 notFull.await();
 8             insert(e);
 9         } finally {
10             lock.unlock();
11         }
12 }
 1 public E take() throws InterruptedException {
 2         final ReentrantLock lock = this.lock;
 3         lock.lockInterruptibly();
 4         try {
 5             while (count == 0)
 6                 notEmpty.await();
 7             return extract();
 8         } finally {
 9             lock.unlock();
10         }
11     }

  大家应该明白了阻塞队列的实现原理,事实它和我们用Object.wait()、Object.notify()和非阻塞队列实现生产者-消费者的思路类似,只不过它把这些工作一起集成到了阻塞队列中实现。并且在前面Condition中我们也模拟实现了一个阻塞队列,实现与其大同小异。

  • BlockingQueue应用

  1:启动两个线程实现互斥等待:

 1 public class BlockingQueueTest {
 2     public static void main(String[] args) {
 3         final BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3);
 4         for (int i = 0; i < 2; i++) {
 5             new Thread(new Runnable() {
 6                 @Override
 7                 public void run() {
 8                     while (true) {
 9                         System.out.println("Thread "+Thread.currentThread().getName()+"正在准备放入数据");
10                         try {
11                             //模拟线程的放数速度
12                             Thread.sleep(new Random().nextInt(1000));
13                         } catch (InterruptedException e) {
14                             // TODO Auto-generated catch block
15                             e.printStackTrace();
16                         }
17                         try {
18                             queue.put(1);
19                         } catch (InterruptedException e) {
20                             // TODO Auto-generated catch block
21                             e.printStackTrace();
22                         }
23                         System.out.println("Thread "+Thread.currentThread().getName()+"放入数据,此时队列中的数据为:"+queue.size());
24                     }
25                 }
26             }).start();
27             new Thread(new Runnable() {
28                 @Override
29                 public void run() {
30                     while (true) {
31                         System.out.println("Thread "+Thread.currentThread().getName()+"正在取得数据");
32                         try {
33                             //模拟线程的去数速度
34                             Thread.sleep(100);
35                         } catch (InterruptedException e) {
36                             // TODO Auto-generated catch block
37                             e.printStackTrace();
38                         }
39                         try {
40                             queue.take();
41                         } catch (InterruptedException e) {
42                             // TODO Auto-generated catch block
43                             e.printStackTrace();
44                         }
45                         System.out.println("Thread "+Thread.currentThread().getName()+"取得数据,此时队列中的数据为:"+queue.size());
46                     }
47                 }
48             }).start();
49         }
50
51     }
52 }

  2:前面介绍传统线程通信中,主线程和子线程交替运行,现在以阻塞队列来实现。

 1 public class BlockingQueueCommunication {
 2     public static void main(String[] args) {
 3         final Business business = new Business();
 4         new Thread(new Runnable() {
 5
 6             @Override
 7             public void run() {
 8                 // TODO Auto-generated method stub
 9                 for (int i = 0; i < 50; i++) {
10                     try {
11                         business.sub(i);
12                     } catch (InterruptedException e) {
13                         // TODO Auto-generated catch block
14                         e.printStackTrace();
15                     }
16                 }
17             }
18         }).start();
19         for (int i = 0; i < 50; i++) {
20             try {
21                 business.main(i);
22             } catch (InterruptedException e) {
23                 // TODO Auto-generated catch block
24                 e.printStackTrace();
25             }
26         }
27     }
28     static class Business{
29         BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1);
30         BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1);
31         {
32             try {
33                 queue2.put(1);//保证queue2阻塞
34             } catch (InterruptedException e) {
35                 // TODO Auto-generated catch block
36                 e.printStackTrace();
37             }
38         }
39
40         public void main(int i) throws InterruptedException{
41             queue1.put(1);//阻塞queue1
42             for (int j = 0; j < 100; j++) {
43                 System.out.println("main thread is looping of "+j +" in " + i);
44             }
45             queue2.take();//唤醒queue2
46         }
47         public void sub(int i) throws InterruptedException{
48             queue2.put(1);//阻塞queue2
49             for (int j = 0; j < 10; j++) {
50                 System.out.println("sub thread is looping of "+j +" in " + i);
51             }
52             queue1.take();//唤醒queue1
53         }
54     }
55 }
  BlockingQueue实现了线程同步,不可在方法中再次加入同步限制,否则会出现死锁。

  3:在API中有一个阻塞对象实现生产者和消费者的例子

 1 class Producer implements Runnable {
 2    private final BlockingQueue queue;
 3    Producer(BlockingQueue q) { queue = q; }
 4    public void run() {
 5      try {
 6        while(true) { queue.put(produce()); }
 7      } catch (InterruptedException ex) { ... handle ...}
 8    }
 9    Object produce() { ... }
10  }
11
12  class Consumer implements Runnable {
13    private final BlockingQueue queue;
14    Consumer(BlockingQueue q) { queue = q; }
15    public void run() {
16      try {
17        while(true) { consume(queue.take()); }
18      } catch (InterruptedException ex) { ... handle ...}
19    }
20    void consume(Object x) { ... }
21  }
22
23  class Setup {
24    void main() {
25      BlockingQueue q = new SomeQueueImplementation();
26      Producer p = new Producer(q);
27      Consumer c1 = new Consumer(q);
28      Consumer c2 = new Consumer(q);
29      new Thread(p).start();
30      new Thread(c1).start();
31      new Thread(c2).start();
32    }
33  }

  使用阻塞队列代码要简单得多,不需要再单独考虑同步和线程间通信的问题。

  在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。

  阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合生产者-消费者模型的都可以使用阻塞队列。

  参考资料:http://www.cnblogs.com/dolphin0520/p/3932906.html

       javaAPI

时间: 2024-11-08 19:13:24

java多线程-BlockingQueue的相关文章

Java多线程系列十——BlockingQueue类

参考资料:http://ifeve.com/java-synchronousqueue/http://www.cnblogs.com/jackyuj/archive/2010/11/24/1886553.htmlhttp://ifeve.com/java-blocking-queue/ BlockingQueue的几个API认识 方法 说明 add(E e) 添加元素,超出队列size上限后抛异常 offer(E e) 添加元素,超出队列size上限后抛异常,相比add官方更建议使用offer方

Java多线程与并发库高级应用之阻塞队列BlockingQueue

JDK1.5提供了阻塞队列接口BlockingQueue,它是一个有界阻塞队列.BlockingQueue实现是线程安全的,可以安全地与多个生产者和多个使用者一起使用. 使用时用其实现类 ArrayBlockingQueue,它一个由数组支持的有界阻塞队列.此队列按 FIFO(先进先出)原则对元素进行排序.队列的头部 是在队列中存在时间最长的元素.队列的尾部是在队列中存在时间最短的元素.新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素. 这是一个典型的"有界缓存区",固定

Java多线程总结之线程安全队列Queue

在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列.Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列. 注:什么叫线程安全?这个首先要明确.线程安全的类 ,指的是类内共享的全局变量的访问必须保证是不受多线程形式影响的.如果由于多线程的访问(比如修改.遍历.查看)而使这些变量结构被破坏

JAVA多线程和并发基础面试问答(转载)

原文链接:http://www.cnblogs.com/dolphin0520/p/3932934.html 多线程和并发问题是Java技术面试中面试官比较喜欢问的问题之一.在这里,从面试的角度列出了大部分重要的问题,但是你仍然应该牢固的掌握Java多线程基础知识来对应日后碰到的问题. Java多线程面试问题 1. 进程和线程之间有什么不同? 一个进程是一个独立(self contained)的运行环境,它可以被看作一个程序或者一个应用.而线程是在进程中执行的一个任务.Java运行环境是一个包含

JAVA多线程和并发基础面试问答

原文链接:http://ifeve.com/java-multi-threading-concurrency-interview-questions-with-answers/ 多线程和并发问题是Java技术面试中面试官比较喜欢问的问题之一.在这里,从面试的角度列出了大部分重要的问题,但是你仍然应该牢固的掌握Java多线程基础知识来对应日后碰到的问题.(校对注:非常赞同这个观点) Java多线程面试问题 1. 进程和线程之间有什么不同? 一个进程是一个独立(self contained)的运行环

Java多线程——阻塞队列

现在,通过前几篇的总结,我们对Java多线程已经有所了解了,但是它们都是一些Java并发程序设计基础的底层构建块.对于实际编程来说,我们应该尽可能的远离底层结构.使用那些由并发处理的专业人士实现的较高层次的结构要方便的多,安全的多. 阻塞队列 对于许多线程问题.可以通过使用一个或多个队列以优雅且安全的方式将其形式化.生产者线程向队列插入元素,消费者线程则取出他们.使用队列,可以安全地从一个线程向另一个线程传递数据. 阻塞队列的方法 方法 正常动作 特殊情况下动作 add 添加一个元素 如果队列满

java多线程编程

一.多线程的优缺点 多线程的优点: 1)资源利用率更好2)程序设计在某些情况下更简单3)程序响应更快 多线程的代价: 1)设计更复杂虽然有一些多线程应用程序比单线程的应用程序要简单,但其他的一般都更复杂.在多线程访问共享数据的时候,这部分代码需要特别的注意.线程之间的交互往往非常复杂.不正确的线程同步产生的错误非常难以被发现,并且重现以修复. 2)上下文切换的开销当CPU从执行一个线程切换到执行另外一个线程的时候,它需要先存储当前线程的本地的数据,程序指针等,然后载入另一个线程的本地数据,程序指

40个Java多线程问题总结

前言 Java多线程分类中写了21篇多线程的文章,21篇文章的内容很多,个人认为,学习,内容越多.越杂的知识,越需要进行深刻的总结,这样才能记忆深刻,将知识变成自己的.这篇文章主要是对多线程的问题进行总结的,因此罗列了40个多线程的问题. 这些多线程的问题,有些来源于各大网站.有些来源于自己的思考.可能有些问题网上有.可能有些问题对应的答案也有.也可能有些各位网友也都看过,但是本文写作的重心就是所有的问题都会按照自己的理解回答一遍,不会去看网上的答案,因此可能有些问题讲的不对,能指正的希望大家不

【转】 Java 多线程之一

转自   Java 多线程 并发编程 一.多线程 1.操作系统有两个容易混淆的概念,进程和线程. 进程:一个计算机程序的运行实例,包含了需要执行的指令:有自己的独立地址空间,包含程序内容和数据:不同进程的地址空间是互相隔离的:进程拥有各种资源和状态信息,包括打开的文件.子进程和信号处理. 线程:表示程序的执行流程,是CPU调度执行的基本单位:线程有自己的程序计数器.寄存器.堆栈和帧.同一进程中的线程共用相同的地址空间,同时共享进进程锁拥有的内存和其他资源. 2.Java标准库提供了进程和线程相关