多线程-BlockingQueue,Array[Linked]BlockingQueue,DelayQueue,PriorityBlockingQueue,SynchronousQueue

阻塞场景 
BlockingQueue阻塞队列,阻塞的情况主要有如下2种: 
1. 当队列满了,进行入队操作阻塞 
2. 当队列空了,进行出队操作阻塞 
阻塞队列主要用在生产者/消费者模式中,下图展示了一个线程生产,一个线程消费的场景:

BlockingQueue接口

操作 抛异常 特殊值 阻塞 超时
插入 add(o) offer(o) put(o) offer(o,timeout,unit)
删除 remove(o) poll() take() poll(timeout,unit)

1. 抛出异常:如果操作不能马上进行,则抛出异常。 
2. 特殊值:如果操作不能马上进行,将会返回一个特殊的值,一般是true/false。 
3. 阻塞:如果操作不能马上进行,操作会阻塞。 
4. 超时:如果操作不能马上进行,操作会阻塞指定时间,如果指定时间没执行,则返回一个特殊值,一般为true/false。 
不能向BlockingQueue中插入null.否则会报NullPointerException异常。 
BlockingQueue子类 
由以上的图片可以知道BlockingQueue具有如下的几个子类: 
1. ArrayBlockingQueue 
2. DelayQueue 
3. LinkedBlockingQueue 
4. PriorityBlockingQueue 
5. SynchronousQueue

ArrayBlockingQueue 
一个有边界的(容量是有限的)阻塞队列,它的内部实现是一个数组。必须在初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。ArrayBlockingQueue是以先进先出的方式存储数据,最新插入的对象是尾部,最新移除的对象是头部。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /** The queued items */
    final Object[] items;
    public ArrayBlockingQueue(int capacity)
    public ArrayBlockingQueue(int capacity, boolean fair)
    public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
}

LinkedBlockingQueue 
队列大小的配置是可选的,如果我们初始时指定一个大小,它就是有边界的,如果不指定它就是无边界采用默认值Integer.MAX_VALUE的容量,它的内部是一个链表。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    public LinkedBlockingQueue();
    public LinkedBlockingQueue(int capacity);
    public LinkedBlockingQueue(Collection<? extends E> c);
}

PriorityBlockingQueue 
是一个没有边界的队列,它的排序规则和ProrityQueue一样,需要注意,PriorityBlockingQueue中允许插入null对象。 
所有插入PriorityBlockingQueue队列的对象必须实现Comparable接口,队列优先级的排序规则就是按照我们队这个接口的实现来定义的。 
另外PriorityBlockingQueue可以获得一个Iterator,但是这个迭代器并不保证按照优先级顺序进行迭代。

package org.github.lujiango;

import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.PriorityBlockingQueue;

class PriorityElement implements Comparable<PriorityElement> {
    private int priority;

    public PriorityElement(int priority) {
        this.priority = priority;
    }

    @Override
    public int compareTo(PriorityElement o) {

        return priority >= o.priority ? 1 : -1;
    }

    @Override
    public String toString() {
        return "PriorityElement[priority= " + priority + "]";
    }

}

public class Test13 {

    public static void main(String[] args) throws InterruptedException {
        PriorityBlockingQueue<PriorityElement> queue = new PriorityBlockingQueue<PriorityElement>();
        for (int i = 0; i < 5; i++) {
            Random rand = new Random();
            PriorityElement ele = new PriorityElement(rand.nextInt(10));
            queue.put(ele);
        }
        System.out.println("Iterator----------------");
        Iterator<PriorityElement> it = queue.iterator();
        while (it.hasNext()) {
            System.out.println(it.next());
        }
        System.out.println("PriorityBlockingQueue.tak()-----------");
        while (!queue.isEmpty()) {
            System.out.println(queue.take());
        }
    }
}

SynchronousQueue 
内部仅仅容纳一个元素,当一个线程插入一个元素之后就被阻塞(放入元素的线程立刻被阻塞),除非这个元素被另一个线程消费。

package org.github.lujiango;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

class MyThread1 implements Runnable {
    private SynchronousQueue<String> queue;

    public MyThread1(SynchronousQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {

        try {
            TimeUnit.SECONDS.sleep(3);
            System.out.println("take a from queue...");
            queue.take();
        } catch (InterruptedException e) {
        }
    }
}

public class Test14 {

    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue<String> queue = new SynchronousQueue<String>();
        Thread t = new Thread(new MyThread1(queue));
        t.start();
        System.out.println("put a  into queue...");
        queue.put("a");

    }

}

DelayQueue 
DelayQueue阻塞的是其内部元素,DelayQueue中的元素必须实现Delayed接口。

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

getDelay()返回值就是队列元素被释放前的存活时间,如果返回<=0,就意味着该元素已经到期需要被释放,此时DelayedQueue会通过其take()方法释放此对象,如无可释放(超期元素)元素,则take方法会阻塞。

package org.github.lujiango;

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

class DelayedElement implements Delayed {
    private long expired;
    private long delay;
    private String name;

    public DelayedElement(String name, long delay) {
        this.name = name;
        this.delay = delay;
        this.expired = (delay + System.currentTimeMillis());
    }

    @Override
    public int compareTo(Delayed o) {
        DelayedElement cache = (DelayedElement) o;
        return cache.expired > expired ? 1 : -1;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return (expired - System.currentTimeMillis());
    }

    @Override
    public String toString() {
        return "DelayedElement[delay=" + delay + ",name=" + name + "]";
    }

}

public class Test15 {

    public static void main(String[] args) throws InterruptedException {
        DelayQueue<Delayed> queue = new DelayQueue<Delayed>();
        DelayedElement ele = new DelayedElement("3s", 3000);
        queue.put(ele);
        System.out.println(queue.take());
    }

}

DelayQueue的应用场景很多,比如定时关闭连接,缓存对象,超时处理等各种场景。

时间: 2024-10-07 09:24:54

多线程-BlockingQueue,Array[Linked]BlockingQueue,DelayQueue,PriorityBlockingQueue,SynchronousQueue的相关文章

JAVA多线程之间共享数据BlockingQueue介绍

在JAVA的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景. 一.认识BlockingQueue 阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:  从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由队列的一端输入,从

Java多线程15:Queue、BlockingQueue以及利用BlockingQueue实现生产者/消费者模型

Queue是什么 队列,是一种数据结构.除了优先级队列和LIFO队列外,队列都是以FIFO(先进先出)的方式对各个元素进行排序的.无论使用哪种排序方式,队列的头都是调用remove()或poll()移除元素的.在FIFO队列中,所有新元素都插入队列的末尾. Queue中的方法 Queue中的方法不难理解,6个,每2对是一个也就是总共3对.看一下JDK API就知道了: 注意一点就好,Queue通常不允许插入Null,尽管某些实现(比如LinkedList)是允许的,但是也不建议. Blockin

集合类源码(五)Collection之BlockingQueue(LinkedTransferQueue, PriorityBlockingQueue, SynchronousQueue)

LinkedTransferQueue 功能 全名 public class LinkedTransferQueue<E> extends AbstractQueue<E> implements TransferQueue<E>, Serializable 简述 基于链表的的无界队列.队列的头是某个生产者在队列中停留时间最长的元素.队列的尾部是某个生产者在队列中时间最短的元素. 注意,与大多数集合不同,size方法不是一个常量时间操作.由于这些队列的异步性,确定当前元素

Java多线程系列十——BlockingQueue类

参考资料:http://ifeve.com/java-synchronousqueue/http://www.cnblogs.com/jackyuj/archive/2010/11/24/1886553.htmlhttp://ifeve.com/java-blocking-queue/ BlockingQueue的几个API认识 方法 说明 add(E e) 添加元素,超出队列size上限后抛异常 offer(E e) 添加元素,超出队列size上限后抛异常,相比add官方更建议使用offer方

Java集合容器简介

Java集合容器主要有以下几类: 1,内置容器:数组 2,list容器:Vetor,Stack,ArrayList,LinkedList, CopyOnWriteArrayList(1.5),AttributeList(1.5),RoleList(1.5),RoleUnresolvedList(1.5), ConcurrentLinkedQueue(1.5),ArrayBlockingQueue(1.5),LinkedBlockingQueue(1.5), PriorityQueue(1.5),

深入理解并发(二)--生产者及消费者

生产者及消费者问题,是线程操作中的一个经典案列.但由于线程运行的不确定性,生产者及消费者可能会产生一些问题: 试想,如果生产者线程向存储数据空间添加了部分信息,但没有添加全部,这时就切换到消费者线程,这时消费者线程将会把已经添加了的部分信息,后上一次的信息混淆了,导致出错. 或者,若生产者放数据,与消费者取数据的速度不匹配,也会出现问题:即可能会出现,生产者放了多条数据,消费者才取了一条,导致数据丢失:或生产者只放了一条数据,但消费者已经取了多条,这会导致重复取出数据. 举例说明: class

Chapter 3-01

Please indicate the source: http://blog.csdn.net/gaoxiangnumber1 Welcome to my github: https://github.com/gaoxiangnumber1 第3章多线程服务器的适用场合与常用编程模型 ?"两个进程不在同一台机器上"指的是逻辑上不在同一个操作系统里运行,虽然物理上可能位于同一机器虚拟出来的两台"虚拟机"上. 3.1 进程与线程 ?一个进程是"内存中正在运行

java并发容器(Map、List、BlockingQueue)

转发: 大海巨浪 Java库本身就有多种线程安全的容器和同步工具,其中同步容器包括两部分:一个是Vector和Hashtable.另外还有JDK1.2中加入的同步包装类,这些类都是由Collections.synchronizedXXX工厂方法.同步容器都是线程安全的,但是对于复合操作,缺有些缺点: ① 迭代:在查觉到容器在迭代开始以后被修改,会抛出一个未检查异常ConcurrentModificationException,为了避免这个异常,需要在迭代期间,持有一个容器锁.但是锁的缺点也很明显

java并发容器(Map、List、BlockingQueue)具体解释

Java库本身就有多种线程安全的容器和同步工具,当中同步容器包含两部分:一个是Vector和Hashtable.另外还有JDK1.2中增加的同步包装类.这些类都是由Collections.synchronizedXXX工厂方法. 同步容器都是线程安全的,可是对于复合操作.缺有些缺点: ① 迭代:在查觉到容器在迭代開始以后被改动,会抛出一个未检查异常ConcurrentModificationException,为了避免这个异常,须要在迭代期间,持有一个容器锁.可是锁的缺点也非常明显.就是对性能的