Java并发新构件之PriorityBlockingQueue

An unbounded blocking
queue
that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations. While this queue is logically unbounded, attempted additions may fail due to
resource exhaustion (causing OutOfMemoryError).

PriorityBlockingQueue是一个很基础的优先级队列,它在PriorityQueue的基础上提供了可阻塞的读取操作。它是无限制的,就是说向Queue里面增加元素可能会失败(导致OurofMemoryError)。下面是一个示例,其中在优先级队列中的对象是按照优先级顺序依次出队列的:

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
    private static int counter = 1;
    private final int priority;
    private Random random = new Random(47);
    private final int id = counter++;//这个id不是static的,因此
    protected static List<PrioritizedTask> sequence = new ArrayList<>();
    public PrioritizedTask(int priority) {
        this.priority = priority;
        sequence.add(this);
    }
    @Override
    public int compareTo(PrioritizedTask o) {
        int val = this.priority - o.priority;
        //higher value, higher priority
        return val < 0 ? 1 : (val > 0 ? -1 : 0);
    }
    @Override
    public void run() {
        try {
            TimeUnit.MILLISECONDS.sleep(random.nextInt(250));
        } catch (InterruptedException e) {
        }
        System.out.println(this);
    }
    @Override
    public String toString() {
        return String.format("P=[%1$-3d]", priority) + ", ID=" + id;
    }
    public static class EndFlagTask extends PrioritizedTask {
        private ExecutorService exec;
        public EndFlagTask(ExecutorService executorService) {
            super(-1);//最低的优先级
            exec = executorService;
        }
        @Override
        public void run() {
            System.out.println(this + " calling shutdownNow()");
            exec.shutdownNow();
        }
    }
}

class PrioritizedTaskProducer implements Runnable {
    private Queue<Runnable> queue;
    private ExecutorService exec;
    public PrioritizedTaskProducer(Queue<Runnable> queue, ExecutorService exec) {
        this.queue = queue;
        this.exec = exec;
    }
    @Override
    public void run() {
        try {
            //慢慢的添加高优先级的任务
            for (int i = 0; i < 6; i++) {
                TimeUnit.MILLISECONDS.sleep(250);
                queue.add(new PrioritizedTask(9)); //6个优先级10
            }
            //先创建一个P=0的任务
            queue.add(new PrioritizedTask(0));
            queue.add(new PrioritizedTask(0));
            //添加低优先级的任务
            for (int i = 0; i < 6; i++) {// 优先级0-9
                queue.add(new PrioritizedTask(i));
            }
            //添加一个结束标志的任务
            queue.add(new PrioritizedTask.EndFlagTask(exec));
            
        } catch (InterruptedException e) {
            // TODO: handle exception
        }
        System.out.println("Finished PrioritizedTaskProducer.");
    }
}

class PrioritizedTaskConsumer implements Runnable {
    private PriorityBlockingQueue<Runnable> queue;
    public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            //不停的从queue里面取任务,直到exec停止。
            while(!Thread.interrupted()) {
                //使用当前线程来跑这些任务
                queue.take().run();
            }
        } catch (InterruptedException e) {
            
        }
        System.out.println("Finished PrioritizedTaskConsumer.");
    }
}

public final class PriorityBlockingQueueDemo {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
        exec.execute(new PrioritizedTaskProducer(queue, exec));
        exec.execute(new PrioritizedTaskConsumer(queue));
    }
}

执行结果:

P=[9  ], ID=1
P=[9  ], ID=2
P=[9  ], ID=3
P=[9  ], ID=4
P=[9  ], ID=5
Finished PrioritizedTaskProducer.
P=[9  ], ID=6
P=[5  ], ID=14
P=[4  ], ID=13
P=[3  ], ID=12
P=[2  ], ID=11
P=[1  ], ID=10
P=[0  ], ID=7
P=[0  ], ID=9
P=[0  ], ID=8
P=[-1 ], ID=15 calling shutdownNow()
Finished PrioritizedTaskConsumer.

PrioritizedTask对象的创建序列被记录在sequenceList中,用于和实际的顺序比较。run()方法将休眠一小段随机的时间,然后打印对象信息,而EndFlagTask提供了停止ExecutorService的功能,要确保它是队列中的最后一个对象,因此给它设置了最低的优先级(-1,优先级值越大,优先级越高)。

PrioritizedTaskProducer和PrioritizedTaskConsumer通过PriorityBlockingQueue彼此链接。因为这种队列的阻塞特性提供了所有必须的同步,所以你应该注意到了,这里不需要任何显式的同步——不必考虑当你从这种队列中读取时,其中是否有元素,因为这个队列在没有元素时,将直接阻塞读取者。

从执行结果中可以看到,最先出队列的是Priority为9的6个Task,因为这几个任务先创建。

Finished PrioritizedTaskProducer.

这句话的打印表示生产者已经将所有的任务放到队列中了,由于将任务放到Queue中和从Queue中提取任务并执行时两个不同的任务(即Producer和Consumer),因此Producer先输出“Finished PrioritizedTaskProducer.”。输出这句话的时候,前面只有5个P=9的任务出列了,因此队列中还有1个P=9的任务没出列,同时还有后续放入各种任务。由于Queue中的任务里面,优先级P最高的是P=9的,因此第6个P=9的任务先出队列。剩下的任务按照P的大小依次出列。

任务的ID属性表示了它们的创建顺序,因为ID是自增的,每创建一个任务,ID就增加。因此从

P=[5  ], ID=14

可以很明显的看出:P=5的任务,它的ID最大,所以是最后创建的。从我们的代码中也可以看出来,P=5的任务的确是最后创建的。

还有一点可以看出,当P相同的时候,出Queue的顺序是不确定的,例如:

P=[0  ], ID=7
P=[0  ], ID=9
P=[0  ], ID=8

另外,在使用此类的时候需要注意:

This class does not permit null elements. A priority queue relying
on natural
ordering
also does not permit insertion of non-comparable objects (doing so
results in ClassCastException).

时间: 2024-11-05 09:10:05

Java并发新构件之PriorityBlockingQueue的相关文章

并发新构件之PriorityBlockingQueue:优先阻塞队列

PriorityBlockingQueue:优先阻塞队列:是带有优先级的阻塞队列,一个无界阻塞队列,它使用与类 PriorityQueue 相同的顺序规则,并且提供了阻塞获取操作.虽然此队列逻辑上是无界的,但是资源被耗尽时试图执行 add 操作也将失败(导致 OutOfMemoryError).此类不允许使用 null 元素.依赖自然顺序的优先级队列也不允许插入不可比较的对象(这样做会导致抛出 ClassCastException). package com.houjun.current.new

Java并发新构件之DelayQueue

DelayQueue主要用于放置实现了Delay接口的对象,其中的对象只能在其时刻到期时才能从队列中取走.这种队列是有序的,即对头的延迟到期时间最短.如果没有任何延迟到期,那么久不会有任何头元素,并且poll()将返回null(正因为这样,你不能将null放置到这种队列中) 下面是一个示例,其中的Delayed对象自身就是人物,而DelayedTaskConsumer将最"紧急"的任务从队列中取出来,然后运行它: import java.util.ArrayList; import j

转: 【Java并发编程】之二十一:并发新特性—阻塞队列和阻塞栈(含代码)

转载请注明出处:http://blog.csdn.net/ns_code/article/details/17511147 阻塞队列 阻塞队列是Java5并发新特性中的内容,阻塞队列的接口是Java.util.concurrent.BlockingQueue,它有多个实现类:ArrayBlockingQueue.DelayQueue.LinkedBlockingQueue.PriorityBlockingQueue.SynchronousQueue等,用法大同小异,具体可查看JDK文档,这里简单

转: 【Java并发编程】之二十:并发新特性—Lock锁和条件变量(含代码)

简单使用Lock锁 Java5中引入了新的锁机制--Java.util.concurrent.locks中的显式的互斥锁:Lock接口,它提供了比synchronized更加广泛的锁定操作.Lock接口有3个实现它的类:ReentrantLock.ReetrantReadWriteLock.ReadLock和ReetrantReadWriteLock.WriteLock,即重入锁.读锁和写锁.lock必须被显式地创建.锁定和释放,为了可以使用更多的功能,一般用ReentrantLock为其实例化

Java并发编程知识总结

一.线程 1.线程创建: 继承Thread类创建线程类 实现Runnable接口创建线程类 使用Callable和Future创建线程 Runnable是执行工作的独立任务,但是它不返回任何值,如果希望任务完成时能够返回一个值,可以实现Callable接口 class TestThread implements Callable<Integer> { @Override public Integer call() throws Exception { return 1; } } //测试方法

Java并发编程笔记 并发概览

并发概览 >>同步 如何同步多个线程对共享资源的访问是多线程编程中最基本的问题之一.当多个线程并发访问共享数据时会出现数据处于计算中间状态或者不一致的问题,从而影响到程序的正确运行.我们通常把这种情况叫做竞争条件(race condition),把并发访问共享数据的代码叫做关键区域(critical section).同步就是使得多个线程顺序进入关键区域从而避免竞争条件的发生. >>线程安全性 编写线程安全的代码的核心是要对状态访问操作进行管理,尤其是对共享的和可变的状态访问. 线

JAVA并发编程J.U.C学习总结

前言 学习了一段时间J.U.C,打算做个小结,个人感觉总结还是非常重要,要不然总感觉知识点零零散散的. 有错误也欢迎指正,大家共同进步: 另外,转载请注明链接,写篇文章不容易啊,http://www.cnblogs.com/chenpi/p/5614290.html 本文目录如下,基本上涵盖了J.U.C的主要内容: JSR 166及J.U.C Executor框架(线程池. Callable .Future) AbstractQueuedSynchronizer(AQS框架) Locks & C

Java 并发基础

Java 并发基础 线程简述 线程是进程的执行部分,用来完成一定的任务; 线程拥有自己的堆栈,程序计数器和自己的局部变量,但不拥有系统资源, 他与其他线程共享父进程的共享资源及部分运行时环境,因此编程时需要小心,确保线程不会妨碍同一进程中的其他线程; 多线程优势 进程之间不能共享内存,但线程之间共享内存/文件描述符/进程状态非常容易. 系统创建进程时需要为该其分配很多系统资源(如进程控制块),但创建线程的开销要小得多,因此线程实现多任务并发比进程效率高. Java语言内置多线程支持,而不是单纯采

Java并发和多线程基础(一)

1.java线程状态 Java中的线程可以处于下列状态之一: NEW: 至今尚未启动的线程处于这种状态. RUNNABLE: 正在 Java 虚拟机中执行的线程处于这种状态. BLOCKED: 受阻塞并等待某个监视器锁的线程处于这种状态. WAITING: 无限期地等待另一个线程来执行某一特定操作的线程处于这种状态. TIMED_WAITING: 等待另一个线程来执行取决于指定等待时间的操作的线程处于这种状态. TERMINATED: 已退出的线程处于这种状态. 在给定时间点上,一个线程只能处于