Java多线程与并发库高级应用之阻塞队列BlockingQueue

JDK1.5提供了阻塞队列接口BlockingQueue,它是一个有界阻塞队列。BlockingQueue实现是线程安全的,可以安全地与多个生产者和多个使用者一起使用。

使用时用其实现类
ArrayBlockingQueue,它一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部
是在队列中存在时间最长的元素。队列的尾部是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。

这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。

使用构造器ArrayBlockingQueue(int
capacity)可以创建一个带有给定的(固定)容量和默认访问策略的ArrayBlockingQueue。

其put(E
e)方法将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间。

take()方法可以获取并移除此队列的头部,在元素变得可用之前一直等待。

见下面程序,程序中两条线程向队列中写入数据,一条线程读取数据。当队列已满时,写入线程阻塞;当队列空时,读取线程阻塞。

[java] view plaincopy

  1. public class ArrayBlockingQueueDemo {
  2. public static void main(String[] args) {
  3. //阻塞队列,队列容量为3

  4. final BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3);
  5. //两个线程向队列写入数据

  6. for(int i = 0; i < 2; i++){

  7. new Thread(){

  8. @Override

  9. public void run() {

  10. while(true){

  11. try{

  12. Thread.sleep((long)(Math.random()*1000));

  13. System.out.println(getName() + "-准备向队列写入数据");

  14. queue.put(1);

  15. System.out.println(getName() + "-已经向队列写入数据,队列一共有"

  16. + queue.size() + "个数据");

  17. }catch(InterruptedException e){

  18. e.printStackTrace();

  19. }

  20. }

  21. }

  22. }.start();

  23. }
  24. //一个线程读取数据

  25. new Thread(){

  26. @Override

  27. public void run() {

  28. while(true){

  29. try{

  30. Thread.sleep(1000);

  31. System.out.println(getName() + "-准备从队列读取数据");

  32. queue.take();

  33. System.out.println(getName() + "-已经从队列读取数据,队列一共有"

  34. + queue.size() + "个数据");

  35. }catch(InterruptedException e){

  36. e.printStackTrace();

  37. }

  38. }

  39. }

  40. }.start();

  41. }
  42. }


运行程序

看到当队列已满时,写入线程阻塞知道有一个数据被取出后才能继续写入。

Java多线程与并发库高级应用之阻塞队列BlockingQueue,码迷,mamicode.com

时间: 2024-10-10 04:17:41

Java多线程与并发库高级应用之阻塞队列BlockingQueue的相关文章

Java多线程与并发库高级应用之公共屏障点CyclicBarrier

一个小队去登山,每位队员登山的速度不同.山上有几个集合点,在每一集合点处,先到达的队员只有等后面的队员全部到达集合点后才能继续向下一个集合点出发. JDK1.5提供的CyclicBarrier模拟了这种情况.每一个线程相当于一个登山队员,CyclicBarrier相当于山上的集合点.只有等所有线程都执行到了CyclicBarrier后才可以继续向下执行. CyclicBarrier允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point).在涉及一组固定大小的线程

Java多线程与并发库高级应用之信号量Semaphore

JDK1.5提供了一个计数信号量Semaphore类.Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目,并提供了同步机制. Semaphore提供了两个构造器来创建对象: 1)Semaphore(int permits):创建具有给定的许可数和非公平的公平设置的Semaphore. 2)Semaphore(int permits, boolean fair):创建具有给定的许可数和给定的公平设置的Semaphore.如果此信号量保证在争用时按先进先出的顺序授予许可,则为

Java多线程与并发库高级应用之线程数据交换Exchanger

JDK1.5提供了Exchanger用于两个线程的数据交换.两个线程先后到达交换点,先到达的线程会等待后到达的线程,然后两个线程互相交换数据,交换后双方持对方的数据. Exchanger只提供了一个构造器: Exchanger():创建一个新的Exchanger. Exchanger中也只有两个方法: V exchange(V x): 等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象. V exchange(V x, long timeout,

Java多线程与并发库高级应用之倒计时计数器

CountDownLatch 类是一个倒计时计数器,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待.用给定的计数初始化 CountDownLatch.由于调用了countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞.之后,会释放所有等待的线程,await 的所有后续调用都将立即返回. CountDownLatch 是一个通用同步工具,它有很多用途.将计数1初始化的 CountDownLatch 用作一个简单的开/关锁存器,或入口:在通过调用 c

Java多线程与并发库高级应用 学习笔记 10-16课

Callable与Future的介绍 package Thread; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; im

Java多线程与并发库高级应用-java5线程并发库

java5 中的线程并发库 主要在java.util.concurrent包中 还有 java.util.concurrent.atomic子包和java.util.concurrent.lock子包

Java多线程与并发库高级应用 学习笔记 16-22课 +面试题

java.util.concurrent.Exchanger应用范例与原理浅析--转载 package Thread; import java.util.Random; import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExchangerTest { public static

Java多线程与并发库高级应用 学习笔记 1-9课

来源XXX,免得打广告嫌疑. http://www.cnblogs.com/whgw/archive/2011/10/03/2198506.html 今天看了文章才发现创建线程最佳方式为实现Runnable接口,之前的习惯要改鲁. http://blog.csdn.net/imzoer/article/details/8500670 Java中Timer的用法 package timer; import java.util.Calendar; import java.util.Timer; im

Java多线程与并发库高级应用

1.传统线程机制的回顾 1.1创建线程的两种传统方式 在Thread子类覆盖的run方法中编写运行代码 // 1.使用子类,把代码放到子类的run()中运行 Thread thread = new Thread() { @Override public void run() { while (true) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.p