Java并发 - 线程池的使用

线程池的使用

一丶什么是线程池

  为了避免系统频繁的创建和销毁线程, 需要将创建好的线程"存储"管理起来复用, 可以形象的理解为"池子", 当需要线程时, 则向线程池申请一个线程, 用完之后,并不会销毁, 而是将线程归还给线程池, 减少线程的创建和销毁.

二丶为什么需要线程池

  1) 多线程虽然可以提高系统性能,但如果数量不加以控制, 频繁的创建和销毁线程, 会消耗系统性能

  2) 线程会占用内存, 如果过多创建线程, 会出现内存溢出的问题

  因此, 在实际生产环境中, 线程的数量必须得到控制, 盲目的大量创建线程对系统性能是有害的.

二丶JDK中的线程池

   2.1) 三种线程池接口

  a. Executor 执行器执行Runnable任务, 不可获取任务执行后的结果

   Executor

  

  b. ExecutorService 执行器服务, 继承于Executor接口, 执行Callable任务, 可以任务执行后的任务

  

  c. ScheduledExecutorService 调度执行器服务, 继承于ExecutorService接口, 可用于定时调度执行提交的任务

  

  2.2) Executos 线程池工厂类, 用于生产不同特性的线程池

  a. Executors中的newFixedThreadPool()方法

  创建固定线程数量的线程池, 提交任务到该线程池时, 若没有空闲的线程, 则会保存到队列中, 直到有空闲的线程才会执行. 由于使用了无界阻塞队列, 所以该线程池不会拒绝任务,  适合执行可预估任务数量的任务

/**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

  b. Executors中的newSingleThreadExecutor()方法

  创建只有一个线程的线程池, 提交的任务若没有空闲线程, 会被保存到队列中, 直到线程空闲才会按陷入先出的顺序执行队列中的任务

 /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue. (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * {@code newFixedThreadPool(1)} the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     *
     * @return the newly created single-threaded Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

  c. Executos中的newCachedThreadPool()方法

  创建可缓存复用的线程池, 该池没有固定的线程数量, 适合执行大量执行时间短的任务. 提交的任务会优先使用空闲的线程, 否则会创建新的线程执行. 这时因为SynchronousQueue不会保存任务. 空闲线程空闲时间超过60s将会被销毁

 /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available, and uses the provided
     * ThreadFactory to create new threads when needed.
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

  d. Executors中的newSingleThreadScheduledExecutor()方法:

  创建只有一个线程的ScheduledExecutorService,SchduledExecutorService在ExecutorService接口之上拓展了可用于在给定时间执行某任务的功能, 如在某个固定的延时之后执行某个任务, 或者周期性执行某个任务

    /**
     * Creates a single-threaded executor that can schedule commands
     * to run after a given delay, or to execute periodically.
     * (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * {@code newScheduledThreadPool(1)} the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     * @return the newly created scheduled executor
     */
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

  e. Executors中的newSchduledExecutorPool()方法

    返回指定线程数量的SchduledExecutorService

    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

  f. Executors中的newWorkStealingPool()方法

  创建可用于分治执行任务的线程池, 大任务分解成小任务并行执行

    /**
     * Creates a thread pool that maintains enough threads to support
     * the given parallelism level, and may use multiple queues to
     * reduce contention. The parallelism level corresponds to the
     * maximum number of threads actively engaged in, or available to
     * engage in, task processing. The actual number of threads may
     * grow and shrink dynamically. A work-stealing pool makes no
     * guarantees about the order in which submitted tasks are
     * executed.
     *
     * @param parallelism the targeted parallelism level
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code parallelism <= 0}
     * @since 1.8
     */
    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

四丶ThreadPoolExecutor

  从上面的Executors中的多个方法来看, 许多线程池都是使用了ThreadPoolExecutor类, 这是线程池的核心类,

  4.1) ThreadPoolExecutor的构造函数

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {       // 省略赋值代码...
    }

  构造参数含义如下:

  1. corePoolSize: 指定线程池的核心线程数, 这是线程池最小的线程数

  2. maximumPoolSize: 指定线程池的最大线程数, 当线程池没有空闲线程, 且任务塞满队列时, 就会创建额外的线程来执行任务. 总线程数小于maximumPoolSize

  3. keepAliveTime: 超过核心线程数的线程空闲的最大时间, 空闲时间超过该时间后, 竟会被销毁

  4. unit: keepAliveTime对应的时间单位

  5. workQueue: 任务队列, 当没有空闲的核心线程时, 任务就会保存到该队列中, 等待执行

  6. threadFactory: 线程工厂, 用于创建线程, 可以指定线程名字

  7. handler: 拒绝执行处理器, 当没有空闲的核心线程时, 任务就会保存到任务队列中, 如果塞满队列, 且线程数未达到最大线程数, 就会创建额外线程执行任务, 如果已达到最大线程数, 就会使用拒绝执行处理器拒绝超载任务

  

  4.2 ThreadPoolExecutor的工作逻辑

    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn‘t, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

  调度逻辑分3步走:

  a. 当工作线程数< corePoolSize时, 即存在空闲线程或者线程数未满corePoolSize时, 直接分配线程执行

  b. 当线程数已满corePoolSize, 且没有空闲线程时, 将任务提交到工作队列中

  c. 若任务队列已满, 且线程数<maximumPoolSize, 提交的任务将会由额外创建的线程执行, 若线程数已满maximumPoolSize, 将会使用拒绝处理器拒绝任务.

  

  4.3 几种可使用的阻塞队列

  a. 直接提交队列SynchronousQueue:

  SynchronousQueue没有容量,插入一个元素,需要等待移除该元素, 线程池使用该队列时, 每提交一个任务需要创建一个线程执行, 所以需要指定maximumPoolSize, 否则很容易执行拒绝策略

  b. 有界任务队列ArrayBlockingQueue:

  ArrayBlockingQueue是一个有界队列, 从指定容量创建之后, 容量就不会变化了. 线程池使用该队列时, 当队列满之后, 如果线程数小于最大线程数, 就会创建新线程执行, 否则执行拒绝策略

  c. 无界任务队列LinkedBlockingQueue:

  LinkedBlockingQueue是一个使用链表实现的无界阻塞队列, 线程池使用该队列时, 队列永远不会满, 即永远不会执行拒绝策略, 直至内存满为止. Executors默认使用的是该队列, 如果任务数量不可预估,为防止应用程序崩溃, 需重新指定阻塞队列

  d. 优先任务队列PriorityBlockingQueue:

  PriorityBlockingQueue是一个优先阻塞队列, 进入该队列的任务, 可按照指定的算法排序, 按优先级被执行. 而ArrayBlockingQueue, LinkedBlockingQueue则是先入先出队列

4.4) 拒绝策略

JDK内置的拒绝策略有4种:

a. AbortPolicy策略: 该策略会直接抛出异常, 组织系统正常工作

b. CallerRunsPolicy策略: 只要线程池未关闭, 该策略会直接在调用者线程中运行当前被丢弃任务. 这样不会真正的丢弃任务,但任务提交线程性能可能会急剧下降.

c. DiscardOledestPolicy策略: 该策略将丢弃最老的一个请求, 也就是即将被执行的一个任务, 并尝试再次提交当前任务.

d. DiscardPolicy策略: 该策略默默的丢弃无法处理的任务, 不予任何处理.

4.5) 注意被"吞掉"异常

a. 放弃submit() , 改用 execute()

b. 改造submit()

  捕获异常, 手动抛出异常 (可以参考<Java高并发程序设计>P114页)

学习资料:

  <java高并发程序设计>

原文地址:https://www.cnblogs.com/timfruit/p/10993534.html

时间: 2024-10-10 01:57:31

Java并发 - 线程池的使用的相关文章

Java并发——线程池Executor框架

线程池 无限制的创建线程 若采用"为每个任务分配一个线程"的方式会存在一些缺陷,尤其是当需要创建大量线程时: 线程生命周期的开销非常高 资源消耗 稳定性 引入线程池 任务是一组逻辑工作单元,线程则是使任务异步执行的机制.当存在大量并发任务时,创建.销毁线程需要很大的开销,运用线程池可以大大减小开销. Executor框架 说明: Executor 执行器接口,该接口定义执行Runnable任务的方式. ExecutorService 该接口定义提供对Executor的服务. Sched

Java并发——线程池原理

"池"技术对我们来说是非常熟悉的一个概念,它的引入是为了在某些场景下提高系统某些关键节点性能,最典型的例子就是数据库连接池,JDBC是一种服务供应接口(SPI),具体的数据库连接实现类由不同厂商实现,数据库连接的建立和销毁都是很耗时耗资源的操作,为了查询数据库中某条记录,最原始的一个过程是建立连接.发送查询语句.返回查询结果.销毁连接,假如仅仅是一个很简单的查询语句,那么可能建立连接与销毁连接两个步骤就已经占所有资源时间消耗的绝大部分,如此低下的效率显然让人无法接受.针对这个过程是否能

深入理解Java之线程池

原作者:海子 出处:http://www.cnblogs.com/dolphin0520/ 本文归作者海子和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利. 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可

Java(Android)线程池 总结

一种是使用Executors工厂生产线程池:另一种是直接使用ThreadPoolExecutor自定义. Executors工厂生产线程池 Java(Android)线程池 Trinea 介绍new Thread的弊端及Java四种线程池的使用,对Android同样适用.本文是基础篇,后面会分享下线程池一些高级功能. 1.new Thread的弊端执行一个异步任务你还只是如下new Thread吗? Java 1 2 3 4 5 6 7 newThread(newRunnable(){ @Ove

Java高新技术—线程池的使用

一. 问题引入 我们知道可以用1.继承Thread类 2.实现runnable接口两种方法创建一个线程,这样实现起来很方便,但是同样出现了一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率, 因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果! 二. 线程池简介 ① Jdk1.5之后加入了java.

[转]深入理解Java之线程池

出处:http://www.cnblogs.com/dolphin0520/ 本文归作者海子和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利. 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执

Java(Android)线程池---基础篇

1.new Thread的弊端 执行一个异步任务你还只是如下new Thread吗? 1 newThread(newRunnable(){ 2 3 @Override 4 publicvoidrun(){ 5 // TODO Auto-generated method stub 6 } 7 }).start(); 那你就out太多了,new Thread的弊端如下: a. 每次new Thread新建对象性能差.b. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致

关于Java中线程池的解读

之前的面试中多次被问到线程池的相关内容,所以在之后的时间内我仔细的学习了一下线程池的相关内容. 1.使用线程池的意义 复用:类似WEB服务器等系统,长期来看内部需要使用大量的线程处理请求,而单次请求响应时间通常比较短,此时Java基于操作系统的本地调用方式大量的创建和销毁线程本身会成为系统的一个性能瓶颈和资源浪费.若使用线程池技术可以实现工作线程的复用,即一个工作线程创建和销毁的生命周期期间内可以执行处理多个任务,从而总体上降低线程创建和销毁的频率和时间,提升了系统性能. 流控:服务器资源有限,

[并发]线程池技术小白

1  线程池技术介绍 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池正是遵循这一思想而产生的,本文将介绍的线程池技术同样符合这一思想.