ThreadPoolExecutor线程池深入解读(一)----原理+应用

本文档,适合于对多线程有一定基础的开发人员。对多线程的一些基础性的解读,请参考《java并发编程》的前5章。

多线程编程,在软件开发中占有十分重要的地位。本人对线程同步的本质的理解是:把对一个或者多个的共享状态的复合操作转变为原子性的操作,同时保证共享状态在内存中的可见性。

1.多线程并发时,会存在竞态条件。常见的竞态条件包括先检查后执行机制的竞争和原子性操作竞争,比如同时对一个整数++操作,这个操作可以分割为三个步骤:读取、加法操作与写入(生效)。解决先检查后执行机制的竞态条件的有效手段是采用双检索。对方法加锁,会大大滴降低吞吐量和性能,因此,不建议直接对方法加锁,常见的做法是,对多个线程同时竞争的变量加锁,或者采用ReentrantLock底层的CAS算法(free-lock).如果想深入理解ReentrantLock的原理,请查看java.util.concurrent包下的源代码。

2.任务执行策略与中断策略和饱和策略:在多线程环境中,当定义好了公共资源类,与执行任务时(比如生产者与消费者任务),接下来就要考虑任务执行策略与中断策略和饱和策略,以提升系统的吞吐量和性能,同时在运行时,要考虑吞吐量与CPU占有率的折中。在多线程中,最重要的就是以上三种策略的定制。采用默认的,不一定能满足要求。线程池底层,调用的是ThreadPoolExecutor这个类,我们可以扩展他,实现自己的需求。在这里,先讲一下,默认的任务执行策略。(任务执行策略包括:是否为每一个任务开启一个线程,还是所有任务在一个线程中执行,任务执行的顺序,比如FIFO,还是按照优先级等等),所以, 这里涉及到两个比较重要的东西:一是数量问题,包括线程池的基本容量,最大容量以及BlockingQueue<Runnable> 是采用有界的还是无界的,二是BlockingQueue的数据结构,如果执行顺序是FIFO,就采用非优先级的Queue,如果是按优先级,那就使用PriorityLinkedQueue。下面,结合一下ThreadPoolExecutor源代码讲解一下:

在使用时,我们一般会这样:

ExecutorService executor = Executors.newCachedThreadPool();

executor.execute(Runnable);

先从execute方法开始,一层一层剖析:

ThreadPoolExecutor中的几个重要变量:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3;

private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits

private static final int RUNNING    = -1 << COUNT_BITS;

private static final int SHUTDOWN   =  0 << COUNT_BITS;

private static final int STOP       =  1 << COUNT_BITS;

private static final int TIDYING    =  2 << COUNT_BITS;

private static final int TERMINATED =  3 << COUNT_BITS;

The workerCount is the number of workers that have been permitted to start and not permitted to stop.

ctl是一个重要的变量,主要包装两个重要的概念:一是workerCount:effective number of threads,二是runState:  indicating whether running, shutting down etc

英文解释:

The main pool control state, ctl, is an atomic integer packing

two conceptual fields

workerCount, indicating the effective number of threads

runState,    indicating whether running, shutting down etc

在以上状态变量中,RUNNING可以接受新的task,并且可以处理queue中的task,SHUTDOWN不可以接受新的task,但是可以处理queue中的task,其他的全都不可以。还是英文解释比较好,研究源代码,最好是看英文原版的,不要看汉语版的:

RUNNING:  Accept new tasks and process queued tasks

*   SHUTDOWN: Don‘t accept new tasks, but process queued tasks

*   STOP:     Don‘t accept new tasks, don‘t process queued tasks,

*             and interrupt in-progress tasks

*   TIDYING:  All tasks have terminated, workerCount is zero,

*             the thread transitioning to state TIDYING

*             will run the terminated() hook method

*   TERMINATED: terminated() has completed

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) {---------------------①

if (addWorker(command, true))//如果添加失败,返回false,可能是由于创建线程时遇到意外,比如terminated,重新调用ctl.get()计算wc

return;

c = ctl.get();

}//如果当前执行的线程数量小于corePoolSize,但是添加任务时,遇到了意外,或者,当前执行的线程数量大于corePoolSize,这两种情况,都会进入②处代码

if (isRunning(c) && workQueue.offer(command)) {------------------②//如果当前线程池中的线程正处于RUNNING状态,并且阻塞队列的容量没有达到上限,重新检查ctl.get()返回的状态

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);//如果此处状态不是RUNNING,也不是SHUTDOWN,那么,拒绝任务

else if (workerCountOf(recheck) == 0)

addWorker(null, false);//由于任务放到了BlockingQueue中,此处,在Worker中,不添加task,而是运行任务时,从queue取出task

}

else if (!addWorker(command, false))-------------------------③//除了以上情况以外,比如BlockingQueue饱和了,线程池容量也饱和了,执行饱和策略,默认为AbortPolicy,拒绝任务

reject(command);

}

private boolean addWorker(Runnable firstTask, boolean core) {

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN &&

! (rs == SHUTDOWN &&

firstTask == null &&

! workQueue.isEmpty()))

return false;

for (;;) {

int wc = workerCountOf(c);

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))//此处判断非常重要

return false;

if (compareAndIncrementWorkerCount(c))

break retry;

c = ctl.get();  // Re-read ctl

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

final ReentrantLock mainLock = this.mainLock;

w = new Worker(firstTask);------------------------------①//把firstTask加到Worker中,并创建一个线程

final Thread t = w.thread;

if (t != null) {

mainLock.lock();

try {

// Recheck while holding lock.

// Back out on ThreadFactory failure or if

// shut down before lock acquired.

int c = ctl.get();

int rs = runStateOf(c);

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // precheck that t is startable

throw new IllegalThreadStateException();

workers.add(w);------------------------------②//把worker加到Set<Worker>中

int s = workers.size();

if (s > largestPoolSize)

largestPoolSize = s;

workerAdded = true;------------------------------③//添加成功

}

} finally {

mainLock.unlock();

}

if (workerAdded) {

t.start();------------------------------④执行任务

workerStarted = true;

}

}

} finally {

if (! workerStarted)

addWorkerFailed(w);------------------------------⑤//添加失败,从Set<Worker>中移除Worker

}

return workerStarted;

}

接下来,看看Woker:

private final class Worker

extends AbstractQueuedSynchronizer

implements Runnable

{

Worker(Runnable firstTask) {

setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this);

}

protected boolean isHeldExclusively() {

return getState() != 0;

}

protected boolean tryAcquire(int unused) {

if (compareAndSetState(0, 1)) {

setExclusiveOwnerThread(Thread.currentThread());

return true;

}

return false;

}

public void run() {----------------①

runWorker(this);----------------②

}

protected boolean tryRelease(int unused) {

setExclusiveOwnerThread(null);

setState(0);//重新置为0

return true;

}

public void lock()        { acquire(1); }

public boolean tryLock()  { return tryAcquire(1); }

public void unlock()      { release(1); }

public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {

Thread t;

if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {

try {

t.interrupt();

} catch (SecurityException ignore) {

}

}

}

}

Worker的本质是Runnable,因此在addWorker()中的t.start()中,实际是调用worker的run()方法,看②处的runWorker()方法:

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock(); // allow interrupts

boolean completedAbruptly = true;

try {

while (task != null || (task = getTask()) != null) {---------------------①

w.lock();

// If pool is stopping, ensure thread is interrupted;

// if not, ensure thread is not interrupted.  This

// requires a recheck in second case to deal with

// shutdownNow race while clearing interrupt

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();

try {

beforeExecute(wt, task);-------------------------②

Throwable thrown = null;

try {

task.run();-------------------------③

} catch (RuntimeException x) {

thrown = x; throw x;

} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

thrown = x; throw new Error(x);

} finally {

afterExecute(task, thrown);-------------------------④

}

} finally {

task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

processWorkerExit(w, completedAbruptly);

}

}

最重要的地方,已经做了标识。对于①处,(task = getTask()) != null,这是在execute方法中,当workerCountof(recheck)== 0时,把task放到BlockingQueue中,所以用getTask()取出task。在execute之前和之后,可以做一些事情,自定义扩展,比如实现统计和计时功能。

以上为ThreadPoolExecutor源代码的关键地方的比较粗浅的解读,下面,来进入应用阶段:

Executors.newFixedThreadPool(x)中,默认的,BlockingQueue为无界的LinkedBlockingQueue,使用无界的queue,会因为queue的无限制扩展,而导致资源被耗尽,Executors.newCachedThreadPool()中,线程池的大小没有限制,队列采用的是SynchronousQueue,SynchronousQueue本质上并不是一个队列,而是基于线程间传递机制的一种运行策略。当向SynchronousQueue中添加task时,必须保证线程在等待接收task,可以与运行的线程直接交互。如果需要实现线程池的容量和queue的容量都有限制,并且需要自定义执行策略和饱和策略时,可以扩展ThreadPoolExecutor。ThreadPoolExecutor的构造器中结束如下参数:

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue) {

this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

Executors.defaultThreadFactory(), defaultHandler);

}

其中有:colrePoolSize,线程池的基本大小, maximumPoolSize,线程池中能够同时运行的线程数量的上限,keepAliveTime,超过此时间,空闲线程将被回收,阻塞队列Blockin共Queue,还有RejectedExecutionHandler,任务拒绝处理类。

下面, 自定义线程池,实现计时和统计功能,并且自定义有界队列以及饱和策略

package httpClient;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.RejectedExecutionHandler;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicLong;

import java.util.logging.Logger;

/**

* 自定义线程池,实现计时和统计功能,并且自定义有界队列以及饱和策略

* @author TongXueQiang

* @date 2016/05/19

*/

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();

private final Logger log = Logger.getLogger("MyThreadPoolExecutor");

private final AtomicLong numTasks = new AtomicLong(1);

private final AtomicLong totalTime = new AtomicLong();

public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,

BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);

}

/**

* 任务执行前

*/

protected void beforeExecute(Thread t,Runnable r){

super.beforeExecute(t, r);

log.fine(String.format("Thread %s: start %s",t,r));

startTime.set((long) (System.nanoTime()/Math.pow(10, 9)));

}

/**

* 任务执行后

* @param r 任务

* @param t 执行任务的线程

*/

protected void afterExecutor(Runnable r,Throwable t){

try {

Long endTime = (long) (System.nanoTime() / Math.pow(10,9));

Long taskTime = endTime - startTime.get();

numTasks.incrementAndGet();

totalTime.addAndGet(taskTime);

log.fine(String.format("Thread %s: end%s,time=%ds", taskTime));

} finally {

super.afterExecute(r, t);

}

}

protected void terminated () {

try {

log.info(String.format("Terminated: avg time=%ds", totalTime.get() / numTasks.get()));

} finally {

super.terminated();

}

}

}

//自定义简易爬虫

package httpClient;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

/**

* 网页抓取

* @author TongXueQiang

* @date 2016/05/16

*/

public class UrlHanding {

private final int THREADS = 10;

private final ExecutorService producerExecutor = Executors.newSingleThreadExecutor();

BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);

private final ExecutorService consumerExecutor = new MyThreadPoolExecutor(10, 10, 1000,TimeUnit.MILLISECONDS, q, new ThreadPoolExecutor.CallerRunsPolicy());//调用者执行的饱和策略

private final CountDownLatch startLatch = new CountDownLatch(1);

private final CountDownLatch endLatch = new CountDownLatch(THREADS);

private static UrlQueue queue;

public void urlHanding(String[] seeds) throws InterruptedException {

queue = getUrlQueue();

System.out.println("处理器数量:"+Runtime.getRuntime().availableProcessors());

long start = (long) (System.nanoTime() / Math.pow(10, 9));

producerExecutor.execute(new GetSeedUrlTask(queue,seeds,startLatch));

producerExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);

producerExecutor.shutdown();

startLatch.await();

UrlDataHandingTask []url_handings = new UrlDataHandingTask[THREADS];

for (int i = 0;i < THREADS;i++) {

url_handings[i] = new UrlDataHandingTask(startLatch,endLatch,queue);

consumerExecutor.execute(url_handings[i]);

}

consumerExecutor.shutdown();

startLatch.countDown();

doSomething();

endLatch.await();

long end = (long) (System.nanoTime() / Math.pow(10,9) - start);

System.out.println("耗时: " + end + "秒");

}

private void doSomething() {

}

private UrlQueue getUrlQueue() {

if (queue == null) {

synchronized(UrlQueue.class){

if (queue == null) {

queue = new UrlQueue();

return queue;

}

}

}

return queue;

}

}

上面,是典型的生产者和消费者线程模式,把ArrayBlockingQueue当做公共资源,这里,要处理好消费者线程无限期阻塞的问题,通过在queue的最后加入“毒丸”对象,当每个线程从queue中取出的对象为“毒丸”对象时,停止迭代。

以下为消费者线程:

package httpClient;

import java.util.concurrent.CountDownLatch;

public class UrlDataHandingTask implements Runnable {

private CountDownLatch startLatch;

private CountDownLatch endLatch;

private UrlQueue queue;

public UrlDataHandingTask(CountDownLatch latch, CountDownLatch endLatch, UrlQueue queue) {

this.startLatch = latch;

this.endLatch = endLatch;

this.queue = queue;

}

/**

* 下载对应的页面并抽取出链接,放入待处理队列中

*

* @param url

* @throws InterruptedException

*/

public void dataHanding(String url) throws InterruptedException {

getHrefOfContent(DownPage.getContentFromUrl(url));

for (String url0 : VisitedUrlQueue.visitedUrlQueue) {

System.out.println(url0);

}

}

@Override

public void run() {

try {

startLatch.await();

} catch (InterruptedException e1) {

Thread.currentThread().interrupt();

}

while (!queue.isEmpty()) {

try {

String url = queue.outElem();

if ("".equals(url.trim())) {//“毒丸”对象为空

queue.addElem(url);

break;

}

dataHanding(url);

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

}

}

endLatch.countDown();

}

/**

* 获取页面源代码中的超链接

*

* @param content

* @throws InterruptedException

*/

public void getHrefOfContent(String content) throws InterruptedException {

System.out.println("开始");

String[] contents = content.split("<a href=\"");

for (int i = 1; i < contents.length; i++) {

int endHref = contents[i].indexOf("\"");

String aHref = FunctionUtils.getHrefOfInOut(contents[i].substring(0, endHref));

if (aHref != null) {

String href = FunctionUtils.getHrefOfInOut(aHref);

if (queue.isContains(href) && !VisitedUrlQueue.isContains(href)

&& href.indexOf("/code/explore") != -1) {

// 放入待抓取队列中

queue.addElem(href);

}

}

}

System.out.println(queue.size() + "--抓取到的连接数");

System.out.println(VisitedUrlQueue.size() + "--已处理的页面数");

}

}

生产者线程:

package httpClient;

import java.util.concurrent.CountDownLatch;

public class GetSeedUrlTask implements Runnable {

private UrlQueue queue;

private String[] seeds;

private CountDownLatch startLatch;

public GetSeedUrlTask(UrlQueue queue, String[] seeds,CountDownLatch startLatch) {

this.queue = queue;

this.seeds = seeds;

this.startLatch = startLatch;

}

public void addUrl() {

try {

for (String url : seeds) {

queue.addElem(url);

}

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

}

}

@Override

public void run() {

addUrl();

try {

queue.addElem("");//加入“毒丸”对象

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

}

startLatch.countDown();

}

}

未完待续,中断策略还没有讲呢,嘻嘻……

时间: 2025-01-16 06:24:41

ThreadPoolExecutor线程池深入解读(一)----原理+应用的相关文章

Java并发编程与技术内幕:线程池深入理解

林炳文Evankaka原创作品.转载请注明出处http://blog.csdn.net/evankaka 摘要: 本文主要讲了Java当中的线程池的使用方法.注意事项及其实现源码实现原理,并辅以实例加以说明,对加深Java线程池的理解有很大的帮助. 首先,讲讲什么是线程池?照笔者的简单理解,其实就是一组线程实时处理休眠状态,等待唤醒执行.那么为什么要有线程池这个东西呢?可以从以下几个方面来考虑:其一.减少在创建和销毁线程上所花的时间以及系统资源的开销 .其二.2将当前任务与主线程隔离,能实现和主

线程池深入理解

一 使用线程池的好处 合理利用线程池能够带来三个好处.第一:降低资源消耗.通过重复利用已创建的线程降低线程创建和销毁造成的消耗.第二:提高响应速度.当任务到达时,任务可以不需要的等到线程创建就能立即执行.第三:提高线程的可管理性.线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控. 二 在何种情况下使用线程池 要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析: 1.                   任

Java线程池深入理解

最近项目中进行告警模块性能优化,不少地方使用了线程池技术,整理总结如下. package com.coshaho.threadpool; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; im

死磕 java线程系列之线程池深入解析——体系结构

(手机横屏看源码更方便) 注:java源码分析部分如无特殊说明均基于 java8 版本. 简介 Java的线程池是块硬骨头,对线程池的源码做深入研究不仅能提高对Java整个并发编程的理解,也能提高自己在面试中的表现,增加被录取的可能性. 本系列将分成很多个章节,本章作为线程池的第一章将对整个线程池体系做一个总览. 体系结构 上图列举了线程池中非常重要的接口和类: (1)Executor,线程池顶级接口: (2)ExecutorService,线程池次级接口,对Executor做了一些扩展,增加一

死磕 java线程系列之线程池深入解析——生命周期

摘自:https://www.cnblogs.com/tong-yuan/p/11748887.html (手机横屏看源码更方便) 注:java源码分析部分如无特殊说明均基于 java8 版本. 注:线程池源码部分如无特殊说明均指ThreadPoolExecutor类. 简介 上一章我们一起重温了下线程的生命周期(六种状态还记得不?),但是你知不知道其实线程池也是有生命周期的呢?! 问题 (1)线程池的状态有哪些? (2)各种状态下对于任务队列中的任务有何影响? 先上源码 其实,在我们讲线程池体

死磕 java线程系列之线程池深入解析——未来任务执行流程

(手机横屏看源码更方便) 注:java源码分析部分如无特殊说明均基于 java8 版本. 注:线程池源码部分如无特殊说明均指ThreadPoolExecutor类. 简介 前面我们一起学习了线程池中普通任务的执行流程,但其实线程池中还有一种任务,叫作未来任务(future task),使用它您可以获取任务执行的结果,它是怎么实现的呢? 建议学习本章前先去看看彤哥之前写的<死磕 java线程系列之自己动手写一个线程池(续)>,有助于理解本章的内容,且那边的代码比较短小,学起来相对容易一些. 问题

死磕 java线程系列之线程池深入解析——构造方法

(手机横屏看源码更方便) 注:java源码分析部分如无特殊说明均基于 java8 版本. 简介 ThreadPoolExecutor的构造方法是创建线程池的入口,虽然比较简单,但是信息量很大,由此也能引发一系列的问题,同样地,这也是面试中经常被问到的问题,下面彤哥只是列举了一部分关于ThreadPoolExecutor构造方法的问题,如果你都能回答上来,则可以不用看下面的分析了. 问题 (1)ThreadPoolExecutor有几个构造方法? (2)ThreadPoolExecutor最长的构

死磕 java线程系列之线程池深入解析——定时任务执行流程

(手机横屏看源码更方便) 注:java源码分析部分如无特殊说明均基于 java8 版本. 注:本文基于ScheduledThreadPoolExecutor定时线程池类. 简介 前面我们一起学习了普通任务.未来任务的执行流程,今天我们再来学习一种新的任务--定时任务. 定时任务是我们经常会用到的一种任务,它表示在未来某个时刻执行,或者未来按照某种规则重复执行的任务. 问题 (1)如何保证任务是在未来某个时刻才被执行? (2)如何保证任务按照某种规则重复执行? 来个栗子 创建一个定时线程池,用它来

ThreadPoolExecutor线程池的分析和使用

1. 引言 合理利用线程池能够带来三个好处. 第一:降低资源消耗.通过重复利用已创建的线程降低线程创建和销毁造成的消耗. 第二:提高响应速度.当任务到达时,任务可以不需要等到线程创建就能立即执行. 第三:提高线程的可管理性.线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控. 但是要做到合理的利用线程池,必须对其原理了如指掌. 2. 线程池的使用 线程池的创建 我们可以通过java.util.concurrent.ThreadPo