Java--concurrent并发包下阻塞队列介绍

JDK提供了7中阻塞队列,这里介绍其中3中,剩余的以此类推原理相同。

1.ArrayBlockingQueue

package com.seeyon.queue;

import java.util.concurrent.ArrayBlockingQueue;

/**
 * Created by yangyu on 16/11/27.
 */

/**
 * ArrayBlockingQueue是数组结构组成的有界阻塞队列
 * 当队列已经满了的时候,put操作会阻塞当前线程,直到队列发生出队操作然后会唤醒put线程在入队
 * 当队列为空的时候,take操作会阻塞当前线程,直到队列发生入队操作后会唤醒take线程进行出队
 */
public class TestArrayBlockingQueue {
    public static void main(String[] args) {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue(1);

        try {
            queue.put("1111");
            /**
             * 该操作会被阻塞,知道队列发生出队操作
             */
            queue.put("2222");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

2.LinkedBlockingQueue:链表实现的有界阻塞队列

3.PriorityBlockingQueue:支持优先级的无界阻塞队列

4.DelayQueue

package com.seeyon.queue;

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.NANOSECONDS;

/**
 * Created by yangyu on 16/11/27.
 */

/**
 * DelayQueue是一个支持延时获取元素的无界队列
 * DelayQueue可以用于如下场景:
 * 1.缓存系统的的设计:用DelayQueue保存缓存元素的有效期,用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取到元素,说明该元素过期了
 * 2.定时任务调度:使用DelayQueue保存当天会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,TimerQueue就是使用DelayQueue实现的
 * DelayQueue的原理:
 * 1.当线程put元素的时候,DelayQueue会对你put的元素通过其本身的compareTo方法进行排序,延时时间越短的顺序越靠近队列头部
 * 2.当线程take元素的时候,DelayQueue会检测当前是否有Thread已经在等待队头元素了,如果有的话,那么只能阻塞当前前程,等已经取到队头
 * 的Thread完成以后再唤醒。
 * 如果没有Thread在等待队头元素的话,那么会查询一下队头元素还剩多少Delay时间,并且将当前线程设置为队头等待线程,然后让当前线程wait剩余
 * Delay时间后在来获取队头元素。
 */
public class TestDelayQueue {
    public static void main(String[] args) {
        DelayQueue<Message> delayQueue = new DelayQueue<>();
        delayQueue.put(new Message(2000,"yangyu"));

        try {
            System.out.println(delayQueue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("end");
    }

    private static class Message implements Delayed{

        private long nanoTime;

        private String data;

        Message(long millisTime ,String data){
            this.nanoTime = now()+millisTime*(1000*1000);
            this.data = data;
        }

        private final long now(){
            return System.nanoTime();
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(nanoTime - now(), NANOSECONDS);
        }

        @Override
        public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof Message) {
                Message x = (Message) other;
                long diff = nanoTime - x.nanoTime;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }
    }
}

5.SynchronousQueue

package com.seeyon.queue;

import java.util.concurrent.SynchronousQueue;

/**
 * Created by yangyu on 16/11/27.
 */

/**
 * SynchronousQueue是一个不存储数据的队列,只是做数据的的传递工作
 * 同步队列,一个put操作必须等待一个take操作,否则线程被阻塞
 * 同样,一个take操作必须等待一个put操作,否则线程被阻塞
 */
public class TestSynchronousQueue {
    public static void main(String[] args) {
        SynchronousQueue<String> strings = new SynchronousQueue<>();
        Thread t =new Thread(()->{
            try {
                /**
                 * 该take操作会被阻塞,直到后面的strings.put("yangyu")操作后,当前线程才会被唤醒
                 */
                System.out.println(strings.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        t.start();

        try {
            Thread.sleep(2000);
            /**
             * 唤醒阻塞线程并且传递数据
             */
            strings.put("yangyu");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("完成");

    }
}

6.LinkedTransferQueue

7.LinkedBlockingDeqeue:是一个链表结构组成的双向阻塞队列,可以从队列的两端插入或者移出元素。

时间: 2024-11-04 01:31:00

Java--concurrent并发包下阻塞队列介绍的相关文章

9.并发包非阻塞队列ConcurrentLinkedQueue

jdk1.7.0_79  队列是一种非常常用的数据结构,一进一出,先进先出. 在Java并发包中提供了两种类型的队列,非阻塞队列与阻塞队列,当然它们都是线程安全的,无需担心在多线程并发环境所带来的不可预知的问题.为什么会有非阻塞和阻塞之分呢?这里的非阻塞与阻塞在于有界与否,也就是在初始化时有没有给它一个默认的容量大小,对于阻塞有界队列来讲,如果队列满了的话,则任何线程都会阻塞不能进行入队操作,反之队列为空的话,则任何线程都不能进行出队操作.而对于非阻塞无界队列来讲则不会出现队列满或者队列空的情况

跟我学Java多线程——线程池与阻塞队列

前言 上一篇文章中我们将ThreadPoolExecutor进行了深入的学习和介绍,实际上我们在项目中应用的时候很少有直接应用ThreadPoolExecutor来创建线程池的,在jdk的api中有这么一句话"但是,强烈建议程序员使用较为方便的 Executors 工厂方法Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收).Executors.newFixedThreadPool(int)(固定大小线程池)和Executors.newSingleT

java多线程(九)阻塞队列

转载请注明出处:http://blog.csdn.net/xingjiarong/article/details/48005091 前边的博客中我们介绍了如果用对象锁和条件锁以及更加方便的synchronized关键字来实现多线程的同步和互斥,也许你会觉得使用synchronized关键字已经非常方便了,但是使用者必须真正的理解synchronized的用法,而且要有一定的多线程的编程的经验,否则很难做到全面的考虑问题而造成意想不到的问题.其实在java中还有比synchronized关键字更加

JAVA多线程提高十二:阻塞队列应用

一.类相关属性 接口BlockingQueue<E>定义: public interface BlockingQueue<E> extends Queue<E> { boolean add(E e); boolean offer(E e); void put(E e) throws InterruptedException; boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedExcepti

java condition 实现简单的阻塞队列

上一篇文章介绍了condition的使用方法 https://www.cnblogs.com/liumy/p/11563772.html 这一篇文章介绍如何用condition来实现一个简单的阻塞队列 消费者 生产者模式. 消费者 生产者模式就是 生产者生产某些对象,消费者来消费这些对象.其中用对象数组来保存这些对象,既然是数组,在初始化的时候需要指定数组的大小. 在生产者生产的时候需要检查数组是否已经满了,如果满了,那么生产者会被挂起,等到有消费者消费对象时,再进行生产. 当消费者消费的时候,

25、Java并发性和多线程-阻塞队列

以下内容转自http://ifeve.com/blocking-queues/: 阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞.试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素.同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列,下图展示了如何通过阻塞队列来合作: 线程1往阻塞队列中添

并发包实现阻塞队列

/** * @描述: 阻塞队列 ,先放进来先取走 * 缓冲区:隔离效果,平均每一秒钟收一个短信,放在池子里 * 可以放可以取,当满了不能放,取走了之后才能取 * 当空的时候不能取,只有放了之后才能取 * @作者: Wnj . * @创建时间: 2017年5月16日 . * @版本: 1.0 . */ public class BoundedBuffer { final Lock lock = new ReentrantLock(); //空 ,一个Condition有五个线程同时往池子里放,发现

JAVA线程池ThreadPoolExecutor与阻塞队列BlockingQueue .

从Java5开始,Java提供了自己的线程池.每次只执行指定数量的线程,java.util.concurrent.ThreadPoolExecutor 就是这样的线程池.以下是我的学习过程. 首先是构造函数签名如下: [java] view plain copy print ? public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<

Java并发编程(十一)-- 阻塞队列

在介绍Java的阻塞队列之前,我们简单介绍一下队列. 队列 队列是一种数据结构.它有两个基本操作:在队列尾部加人一个元素,和从队列头部移除一个元素就是说,队列以一种先进先出的方式管理数据,如果你试图向一个已经满了的阻塞队列中添加一个元素或者是从一个空的阻塞队列中移除一个元索,将导致线程阻塞.在多线程进行合作时,阻塞队列是很有用的工具.工作者线程可以定期地把中间结果存到阻塞队列中而其他工作者线线程把中间结果取出并在将来修改它们.队列会自动平衡负载.如果第一个线程集运行得比第二个慢,则第二个 线程集