Java线程池及其底层源码实现分析

1、相关类

  Executors  ExecutorService   Callable   ThreadPool     Future

2、相关接口

  Executor

Executor接口的使用:

  

public class TestExecutor implements Executor{
    @Override
    public void execute(Runnable command){
        //调用execute方法常常传入runnable接口对象,开启线程
    }

}

  ExecutorService接口的使用:(继承Executor接口)

/**
*submit方法(执行runnble、callable对象的线程)
*实现类:各种线程池
*/

Callable接口 && Runnable接口

callable调用call方法

runnable调用run方法

都可以被线程调用,但callable的call方法具有返回值(泛型)

Executors类(操作Executor的工具类)

ExecutorService service = Executors.newFixedThreadPool(5);//创建5个线程的线程池ThreadPool线程池类(装着线程的容器)

线程池创建的固定线程,线程任务执行完后线程不会消失,处于等待任务的状态(idel)。

线程任务大于线程池容量时,多出来的任务放在等待队列中(内部使用BlockingQueue实现)

public class TestThreadPool{
2
    public static void main(String[] args){
3
        ExecutorService service = Executors.newFixedThreadPool(5);//1.创建5个线程的线程池容器
4

5
        for(int i=0;i<6;i++){//2.放6个任务,线程池一次只能放5个,所以第6个任务需要重复使用旧的线程
6
            service.execute(() -> {
7
                System.out.println(Thread.getCurrentThread().getName());//3.打印出当前线程名
8
            });
9
        }
10
        service.shutdown();//执行完当前任务则关闭线程
11
        service.shutdownNow();//无论是否执行完都关闭线程
12
    }
13
}

  Future接口(线程未来产生的返回值)

public class TestFuture{
2
    public static void main(String[] args){
3
        //FutureTask实现类Runnable和Future接口
4
        FutureTask<Integer> task = new FutureTask<>(
5
            Thread.sleep(500);//阻塞等待500毫秒
6
            return 1000;
7
        );
8

9
        //new的方式启动线程任务
10
        new Thread(task).start();
11
        System.out.println(task.get());//阻塞等待500毫秒后得到返回值
12
        ///////////////////////////////////////////////////////////////////
13
        ExecutorService service = Executors.newFixedThreadPool(5);//创建5个线程的线程池
14
        Future<Integer> future = service.submit(()->{//相当于运行类callable接口的call方法,返回1
15
            Thread.sleep(500);
16
            return 1;
17
        });
18
        System.out.println(future.get());//阻塞等待500毫秒后得到返回值
19
    }
20
}

  WorkStealingPool偷任务线程池

  底层采用ForkJoinPool实现(开启的是Deamon守护线程,主线程退出则线程退出)

  

public class WorkStealingPoolTest {
2
    public static void main(String[] args) throws IOException {
3
        //根据CPU核数启动相应个数的线程(4核cpu---4个线程)
4
        ExecutorService service = Executors.newWorkStealingPool();
5
        System.out.println(Runtime.getRuntime().availableProcessors());
6
7
        service.execute(new R(1000));//线程1执行任务1----1秒
8
        service.execute(new R(2000));//线程2执行任务2----2秒
9
        service.execute(new R(2000));//线程3执行任务3----2秒
10
        service.execute(new R(2000));//线程4执行任务4----2秒
11
        service.execute(new R(2000));//任务5阻塞,当线程1执行完后把任务5偷过来执行
12

13
        //由于产生的是守护线程,主线程不阻塞的话,看不到输出
14
        System.in.read();//将主线程阻塞
15
    }
16
17
    static class R implements Runnable {
18
19
        int time;
20
21
        R(int t) {
22
            this.time = t;
23
        }
24
25
        @Override
26
        public void run() {
27

28
            try {
29
                TimeUnit.MILLISECONDS.sleep(time);
30
            } catch (InterruptedException e) {
31
                e.printStackTrace();
32
            }
33
            //打印线程名---ForkJoinPool
34
            System.out.println(time  + " " + Thread.currentThread().getName());
35

36
        }
37
38
    }
39
}

  ForkJoinPool(分支合并线程池)

  思想:分治,把大任务拆分成小任务并行计算,计算完成后将结果合并

  守护线程

  

public class ForkJoinPoolTest{
2

3
        public static void main(String[] args) throws Exception {
4
        ForkJoinPool pool = new ForkJoinPool();
5
        MyTask task = new MyTask(inits, 0, inits.;ength-1);
6
        ForkJoinTask<int[]> taskResult = pool.submit(task);
7
        try {
8
            taskResult.get();//阻塞等待所有线程结果计算完成
9
        } catch (InterruptedException | ExecutionException e) {
10
            e.printStackTrace(System.out);
11
        }
12
    }
13
14
    /**
15
     * 单个排序的子任务
16
     */
17
    static class MyTask extends RecursiveTask<int[]> {
18
19
        private int[] source;
20
        private int start;
21
        private int end;
22
23
        public MyTask(int[] source,int start, int end ) {
24
            this.source = source;
25
            this.start = start;
26
            this.end = end;
27
        }
28
29
30
        @Override
31
        protected int[] compute() {
32
            //长度小于50,进行计算
33
            if(source.length <= 50) {
34
                long sum = 0L;
35
                for(int i=start; i<end; i++) sum += nums[i];
36
                return sum;
37
            }
38
            //长度大于50,继续划分子任务
39
            int middle = start + (end-start)/2;
40

41
            AddTask subTask1 = new MyTask(source,start,middle);
42
            AddTask subTask2 = new MyTask(source,middle,end);
43
            subTask1.fork();//递归创建子任务线程
44
            subTask2.fork();
45

46
            //计算完成后将两个子任务的结果合并
47
            return subTask1.join() + subTask2.join();
48
        }
49
    }
50
}

  各种线程池的底层实现:

  一、基本线程池:

  FixedThreadPool

CachedThreadPool

ScheduledThreadPool

SingleThreadPool

  二、底层创建线程池都是使用ThreadPoolExecutor类实现的,而放置任务、执行任务使用了生产者消费者模型(阻塞队列的方式)

  三、源码分析

  ThreadPoolExecutor的API:

  

ThreadPoolExecutor(int corePoolSize,//核心线程数(最小)

                   int maximumPoolSize,//最大线程数

                   long keepAliveTime, //线程运行时间

                   TimeUnit unit, //时间单位

                   BlockingQueue<Runnable> workQueue)//底层采用哪种阻塞队列来放线程任务

  各种线程池的底层实现:

  

//FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads){
     return new ThreadPoolExecutor(nThreads,//初始线程数自定义
                    nThreads,//最大线程数自定义
                    0L, TimeUnit.SECONDS,//一旦启动线程池,线程永远不消失
                    new LinkedBlockingQueue<Runnable>());//链表阻塞队列
    }

  

//CachedThreadPool(采用同步阻塞队列装任务,队列中有任务则启动新线程执行,没任务就阻塞)
public static ExecutorService newCachedThreadPool(){
                    return new ThreadPoolExecutor(0,//初始为0个线程
                    Integer.MAX_VALUE,//可以启动无限多线程
                    60L, TimeUnit.SECONDS,//60秒空闲则结束
                    new SynchronousQueue<Runnable>());//同步阻塞队列,有任务马上开新线程执行(容量用于为0)
}

  

//SingleThreadPool
    return new ThreadPoolExecutor(1,//初始线程数为1
                1,//最大线程数为1
                 0L, TimeUnit.SECONDS,//一旦启动线程池,线程永远不消失
                 new LinkedBlockingQueue<Runnable>());//链表阻塞队列
}

  

//ScheduledThreadPool
    public newScheduledThreadPool(int corePoolSize){
                    super(corePoolSize,//初始线程数自定义
                    Integer.MAX_VALUE,//无限多线程数
                    0, NANOSECONDS,//一旦启动线程池,线程永远不消失
                    new DelayedWorkQueue<Runnable>());//延时阻塞队列,隔一段时间执行一次任务
}

  

  

 

原文地址:https://www.cnblogs.com/hanxue112253/p/9626839.html

时间: 2024-10-03 07:25:24

Java线程池及其底层源码实现分析的相关文章

Java线程池ThreadPoolExector的源码分析

前言:线程是我们在学习java过程中非常重要的也是绕不开的一个知识点,它的重要程度可以说是java的核心之一,线程具有不可轻视的作用,对于我们提高程序的运行效率.压榨CPU处理能力.多条线路同时运行等都是强有力的杀手锏工具.线程是如此的重要,那么我们来思考这样一个问题.假设我们有一个高并发,多线程的项目,多条线程在运行的时候,来一个任务我们new一个线程,任务结束了,再把它销毁结束,这样看似没有问题,适合于低并发的场景,可是当我们的项目投入到生产环境,一下涌入千条任务的时候,线程不断的new执行

JAVA线程池原理与源码分析

1.线程池常用接口介绍 1.1.Executor public interface Executor { void execute(Runnable command); } 执行提交的Runnable任务.其中的execute方法在将来的某个时候执行给定的任务,该任务可以在新线程.池化线程或调用线程中执行,具体由Executor的实现者决定. 1.2.ExecutorService ExecutorService继承自Executor,下面挑几个方法介绍: 1.2.1.shutdown() vo

Java线程池的底层实现与使用

前言 在我们进行开发的时候,为了充分利用系统资源,我们通常会进行多线程开发,实现起来非常简单,需要使用线程的时候就去创建一个线程(继承Thread类.实现Runnable接口.使用Callable和Future),但是这样也有一点问题,就是如果并发的线程数量很多,创建线程.销毁线程都是需要消耗时间.资源,这个时候线程池就派上用场了 一.四种线程池的介绍 Java通过Executors提供了四种线程池,分别是 1.newSingleThreadExecutor() 创建一个单线程化的线程池,它只会

周期性线程池与主要源码解析

之前学习ThreadPool的使用以及源码剖析,并且从面试的角度去介绍知识点的解答.今天小强带来周期性线程池的使用和重点源码剖析. ScheduledThreadPoolExecutor ScheduledThreadPoolExecutor:用来处理延时任务或定时任务 定时线程池类的类结构图 ScheduledThreadPoolExecutor接收ScheduleFutureTask类型的任务,是线程池调度任务的最小单位. 它采用DelayQueue存储等待的任务: 1.DelayQueue

Linux简单线程池实现(带源码)

这里给个线程池的实现代码,里面带有个应用小例子,方便学习使用,代码 GCC 编译可用.参照代码看下面介绍的线程池原理跟容易接受,百度云下载链接: http://pan.baidu.com/s/1i3zMHDV 一.线程池简介 为什么使用线程池? 目前的大多数网络服务器,包括Web服务器.Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短. 传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任

JDK线程池框架Executor源码阅读

Executor框架 Executor ExecutorService AbstractExecutorService ThreadPoolExecutor ThreadPoolExecutor继承AbstractExecutorService,是一个线程池的具体的实现 主要成员 1. ctl private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT

C#线程池操作演示源码

把开发过程中经常用到的一些代码段做个备份,下面代码内容是关于C#线程池操作演示的代码. static void Main(string[] args){ThreadPool.SetMaxThreads(1000, 1000);for (int i = 0; i < 10;i ){ThreadPool.QueueUserWorkItem(new WaitCallback(ShowMessage), string.Format("当前编号{0}",i));}Console.ReadL

线程池之ThreadPoolExecutor源码解析

1.变量 ThreadPoolExecutor先定义了这几个常量,初看时一脸懵逼,其实它就是用int的二进制高三位来表示线程池的状态, 先回顾一下位运算: <<’左移:右边空出的位置补0,其值相当于乘以2. ‘>>’右移:左边空出的位,如果是正数则补0,若为负数则补0或1,取决于所用的计算机系统OS X中补1.其值相当于除以2. 负数二进制由它的绝对值取反后加1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RU

SpringBoot学习(2):底层源码架构分析引导类注解的具体实现

pom.xml文件 spring-boot-starter- parent 是当前项目的父级依赖 <parent>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-parent</artifactId>   <version>1.5.9.RELEASE</version> </parent> spri