1 api
java.util.concurrent包下的新类。LinkedBlockingQueue就是其中之一,是一个阻塞的线程安全的队列,底层采用链表实现。
LinkedBlockingQueue构造的时候若没有指定大小,则默认大小为Integer.MAX_VALUE,当然也可以在构造函数的参数中指定大小。LinkedBlockingQueue不接受null。
添加元素的方法有三个:add,put,offer,且这三个元素都是向队列尾部添加元素的意思。
区别:
add方法在添加元素的时候,若超出了度列的长度会直接抛出异常:
public static void main(String args[]){
try {
LinkedBlockingQueue<String> queue=new LinkedBlockingQueue(2);
queue.add("hello");
queue.add("world");
queue.add("yes");
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
//运行结果:
java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(Unknown Source)
at com.wjy.test.GrandPather.main(GrandPather.java:12)
put方法,若向队尾添加元素的时候发现队列已经满了会发生阻塞一直等待空间,以加入元素。
public static void main(String args[]){
try {
LinkedBlockingQueue<String> queue=new LinkedBlockingQueue(2);
queue.put("hello");
queue.put("world");
queue.put("yes");
System.out.println("yes");
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
//运行结果:
//在queue.put("yes")处发生阻塞
//下面的“yes”无法输出
offer方法在添加元素时,如果发现队列已满无法添加的话,会直接返回false。
public static void main(String args[]){
try {
LinkedBlockingQueue<String> queue=new LinkedBlockingQueue(2);
boolean bol1=queue.offer("hello");
boolean bol2=queue.offer("world");
boolean bol3=queue.offer("yes");
System.out.println(queue.toString());
System.out.println(bol1);
System.out.println(bol2);
System.out.println(bol3);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
//运行结果:
[hello, world]
true
true
false
从队列中取出并移除头元素的方法有:poll,remove,take。
poll: 若队列为空,返回null。
remove:若队列为空,抛出NoSuchElementException异常。
take:若队列为空,发生阻塞,等待有元素。
2基于LinkedBlockingQueue的生产者和消费者
package com.queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class LinkedBlockingQueueTest1 {
public static void main(String[] args) {
LinkedBlockingQueueTest1 test = new LinkedBlockingQueueTest1();
// 建立一个装苹果的篮子
Basket basket = test.new Basket();
ExecutorService service = Executors.newCachedThreadPool();
Producer producer = test.new Producer("生产者001", basket);
Producer producer2 = test.new Producer("生产者002", basket);
Consumer consumer = test.new Consumer("消费者001", basket);
service.submit(producer);
service.submit(producer2);
service.submit(consumer);
// 程序运行5s后,所有任务停止
try {
Thread.sleep(1000 * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
service.shutdownNow();
}
//定义篮子
public class Basket {
// 篮子,能够容纳3个苹果
BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);
// 生产苹果,放入篮子
public void produce() throws InterruptedException {
// put方法放入一个苹果,若basket满了,等到basket有位置
basket.put("An apple");
}
// 消费苹果,从篮子中取走
public String consume() throws InterruptedException {
// take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部)
return basket.take();
}
}
// 定义苹果生产者
class Producer implements Runnable {
private String instance;
private Basket basket;
public Producer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
}
public void run() {
try {
while (true) {
// 生产苹果
System.out.println(instance + "生产苹果");
basket.produce();
// 休眠300ms
Thread.sleep(300);
}
} catch (InterruptedException ex) {
System.out.println("Producer Interrupted");
}
}
}
// 定义苹果消费者
class Consumer implements Runnable {
private String instance;
private Basket basket;
public Consumer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
}
public void run() {
try {
while (true) {
// 消费苹果
System.out.println(instance + "消费苹果" + basket.consume());
// 休眠1000ms
Thread.sleep(150);
}
} catch (InterruptedException ex) {
System.out.println("Consumer Interrupted");
}
}
}
}
3示例2
并发库中的BlockingQueue是一个比较好玩的类,顾名思义,就是阻塞队列。该类主要提供了两个方法put()和take(),前者将一个对象放到队列中,如果队列已经满了,就等待直到有空闲节点;后者从head取一个对象,如果没有对象,就等待直到有可取的对象。
下面的例子比较简单,一个读线程,用于将要处理的文件对象添加到阻塞队列中,
另外四个写线程用于取出文件对象,为了模拟写操作耗时长的特点,特让线程睡眠一段随机长度的时间。另外,该Demo也使用到了线程池和原子整型(AtomicInteger),AtomicInteger可以在并发情况下达到原子化更新,避免使用了synchronized,而且性能非常高。由于阻塞队列的put和take操作会阻塞,为了使线程退出,特在队列中添加了一个“标识”,算法中也叫“哨兵”,当发现这个哨兵后,写线程就退出。
public class LinkedBlockingQueueTest { static long randomTime() { return (long) (Math.random() * 1000); } @Test public void testName() throws Exception { AtomicInteger rc = new AtomicInteger(); int incrementAndGet = rc.incrementAndGet(); System.out.println(incrementAndGet); } public static void main(String[] args) { // 能容纳100个文件 final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100); // 线程池 final ExecutorService exec = Executors.newFixedThreadPool(5); final File root = new File("D:\\JavaLib"); // 完成标志 final File exitFile = new File(""); // 读个数 final AtomicInteger rc = new AtomicInteger(); // 写个数 final AtomicInteger wc = new AtomicInteger(); // 读线程 Runnable read = new Runnable() { public void run() { scanFile(root); scanFile(exitFile); } public void scanFile(File file) { if (file.isDirectory()) { File[] files = file.listFiles(new FileFilter() { public boolean accept(File pathname) { return pathname.isDirectory() || pathname.getPath().endsWith(".java"); } }); for (File one : files) scanFile(one); } else { try { int index = rc.incrementAndGet(); System.out.println("Read0: " + index + " " + file.getPath()); queue.put(file); } catch (InterruptedException e) { } } } }; exec.submit(read); // 四个写线程 for (int index = 0; index < 4; index++) { // write thread final int NO = index; Runnable write = new Runnable() { String threadName = "Write" + NO; public void run() { while (true) { try { Thread.sleep(randomTime()); int index = wc.incrementAndGet(); File file = queue.take(); // 队列已经无对象 if (file == exitFile) { // 再次添加"标志",以让其他线程正常退出 queue.put(exitFile); break; } System.out.println(threadName + ": " + index + " " + file.getPath()); } catch (InterruptedException e) { } } } }; exec.submit(write); } exec.shutdown(); } } |