阻塞队列和线程池

一、阻塞队列

1.介绍
阻塞队列会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素。当队列中有元素后,被阻塞的线程会自动被唤醒(不需要我们编写代码去唤醒)。

2.实现
ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。
LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。
PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。
DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

二、线程池

1.介绍
使用线程池的好处有:1.创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处理效率 2.线程并发数量过多,抢占系统资源从而导致阻塞 3.线程池可以对线程进行一些简单的管理
ThreadPoolExecutor类是线程池中最核心的一个类, 在ThreadPoolExecutor类中提供了四个构造方法,其中 三个构造器都是调用的第四个构造器进行的初始化工作。

2.构造方法为:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
七个参数的含义:
* corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

* maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

* keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

* unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

TimeUnit.DAYS;               //天

TimeUnit.HOURS;             //小时

TimeUnit.MINUTES;           //分钟

TimeUnit.SECONDS;           //秒

TimeUnit.MILLISECONDS;      //毫秒

TimeUnit.MICROSECONDS;      //微妙

TimeUnit.NANOSECONDS;       //纳秒

* workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

ArrayBlockingQueue;

LinkedBlockingQueue;

SynchronousQueue;

ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

* threadFactory:线程工厂,主要用来创建线程;

* handler:表示当拒绝处理任务时的策略,有以下四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

3.线程池的执行策略
* 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
* 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
* 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
* 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

4..测试

public class Test {

     public static void main(String[] args) {  

         ThreadPoolExecutor executor = new ThreadPoolExecutor(510200, TimeUnit.MILLISECONDS,

                 new ArrayBlockingQueue<Runnable>(5));

          

         for(int i=0;i<15;i++){

             MyTask myTask = new MyTask(i);

             executor.execute(myTask);

             System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+

             executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());

         }

         executor.shutdown();

     }

}

class MyTask implements Runnable {

    private int taskNum;

     

    public MyTask(int num) {

        this.taskNum = num;

    }

     

    @Override

    public void run() {

        System.out.println("正在执行task "+taskNum);

        try {

            Thread.currentThread().sleep(4000);

        catch (InterruptedException e) {

            e.printStackTrace();

        }

        System.out.println("task "+taskNum+"执行完毕");

    }

}

这个程序中当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。

5.使用Executors类中提供的几个静态方法来创建线程池

Executors.newCachedThreadPool();        //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE

Executors.newSingleThreadExecutor();   //创建容量为1的缓冲池

Executors.newFixedThreadPool(int);    //创建固定容量大小的缓冲池(定长)

  newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
  newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;1. 有且仅有一个工作线程执行任务2. 所有任务按照指定顺序执行,即遵循队列的入队出队规则
  newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
  实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。
  另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。

三、Callable

创建线程在上一篇wiki中说明了两种方法,分别是继承Thread,重写run方法和实现Runnable接口,重新run方法。现在介绍第三种实现Callable接口,重写call方法。
区别:

  • Callable可以在任务结束的时候提供一个返回值Future对象,Runnable无法提供这个功能
  • Callable的call方法分可以抛出异常,而Runnable的run方法不能抛出异常。
  • Callable规定的方法是call(),而Runnable规定的方法是run().

测试:

/*

* FileName: TestCallable

* Author:   aiguo.sun

* Date:     2019/3/28 20:06

* Description: 测试callable方法

*/

package JavaTest;

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.FutureTask;

/**

* 一、创建执行线程的方式三:实现 Callable 接口。 相较于实现 Runnable 接口的方式,方法可以有返回值,并且可以抛出异常。

*

* 二、执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。  FutureTask 是  Future 接口的实现类

*/

public class TestCallable {

    public static void main(String[] args) {

        ThreadDemo td = new ThreadDemo();

        //1.执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。

        FutureTask<Integer> result = new FutureTask<>(td);

        new Thread(result).start();

        //2.接收线程运算后的结果

        try {

            //判断是否完成

            if(!result.isDone())

            {

                System.out.println("-sorry-----------------------");;

            }

            //FutureTask 可用于 闭锁 类似于CountDownLatch的作用,在所有的线程没有执行完成之后这里是不会执行的

            Integer sum = result.get();

            System.out.println(sum);

            System.out.println("------------------------------------");

        catch (InterruptedException | ExecutionException e) {

            e.printStackTrace();

        }

    }

}

class ThreadDemo implements Callable<Integer> {

    @Override

    public Integer call() throws Exception {

        int sum = 0;

        for (int i = 0; i <= 100000000; i++) {

            sum += i;

        }

        return sum;

    }

}

 

原文地址:https://www.cnblogs.com/aiguona/p/10634043.html

时间: 2024-11-06 09:14:49

阻塞队列和线程池的相关文章

Java实现锁、公平锁、读写锁、信号量、阻塞队列、线程池等常用并发工具

锁的实现 锁的实现其实很简单,主要使用Java中synchronized关键字. public class Lock { private volatile boolean isLocked = false; private Thread lockingThread = null; public synchronized void lock() throws InterruptedExpection { while(isLocked){ wait(); } isLocked = true; loc

用阻塞队列和线程池简单实现生产者和消费者场景

本例子仅仅是博主学习阻塞队列和后的一些小实践,并不是真正的应用场景! 生产者消费者场景是我们应用中最常见的场景,我们可以通过ReentrantLock的Condition和对线程进行wait,notify同通信来实现生产者和消费者场景,前者可以实现多生产者和多消费者模式,后者仅可以实现一生产者,一消费者模式. 今天我们就利用阻塞队列来实现下生产者和消费者模式(里面还利用了线程池). 看过我关于阻塞队列博文的朋友已经知道,阻塞队列其实就是由ReentrantLock实现的! 场景就不描述了,为简单

基于队列的线程池

import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /**  * Created 

线程进阶之线程队列、线程池和协程

本节目录: 1.线程队列 2.线程池 3.协程 一.线程队列 线程之间的通信我们列表行不行呢,当然行,那么队列和列表有什么区别呢? queue队列 :使用import queue,用法与进程Queue一样 queue is especially useful in threaded programming when information must be exchanged safely between multiple threads. class queue.Queue(maxsize=0)

面试题之使用无界队列的线程池会导致内存飙升吗?

答案:会: 分析: 创建线程池方式有如下几种: Executors.newFixedThreadPool(10);//LinkedBlockingQueue 无限加入队列 Executors.newScheduledThreadPool(10);//DelayedWorkQueue 队列如果满了,阻塞 Executors.newSingleThreadScheduledExecutor();//DelayedWorkQueue 队列如果满了,阻塞 Executors.newCachedThrea

C++11消息队列 + Qt线程池 + QRunnable执行任务简单模型

1.模板类queue,包含头文件<queue>中,是一个FIFO队列. queue.push():在队列尾巴增加数据 queue.pop():移除队列头部数据 queue.font():获取队列头部数据的引用... 2.Qt库的线程池,QThreadPool QThreadPool.setMaxThreadCount():设置线程池最大线程数 QThreadPool.start(new QRunnable(..)):开启线程池调用QRunnable 3.QRunnable执行任务 void r

09 | 队列:队列在线程池等有限资源池中的应用

当队满时,(tail+1)%n=head. public class CircularQueue { // 数组:items,数组大小:n private String[] items; private int n = 0; // head表示队头下标,tail表示队尾下标 private int head = 0; private int tail = 0; // 申请一个大小为capacity的数组 public CircularQueue(int capacity) { items = n

多线程--Executor线程池框架

Executor的介绍 在Java 5之后,并发编程引入了一堆新的启动.调度和管理线程的API.其内部使用了线程池机制,它在java.util.cocurrent 包下,通过该框架来控制线程的启动.执行和关闭,可以简化并发编程的操作.因此,在Java 5之后,通过Executor来启动线程比使用Thread的start方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免this逃逸问题——如果我们在构造器中启动一个线程,因为另一个任务可能会在构造器结束之前开始

java线程池学习(一)

前言 在实际工作中,线程是一个我们经常要打交道的角色,它可以帮我们灵活利用资源,提升程序运行效率.但是我们今天不是探讨线程!我们今天来聊聊另一个与线程息息相关的角色:线程池.本篇文章的目的就是全方位的解析线程池的作用,以及jdk中的接口,实现以及原理,另外对于某些重要概念,将从源码的角度探讨. tip:本文较长,建议先码后看. 线程池介绍 首先我们看一段创建线程并且运行的常用代码: for (int i = 0; i < 100; i++) { new Thread(() -> { Syste