Java高并发编程(四)

一、Executor执行器

  1.Executor接口,java线程池框架中的顶层接口,提供一个execute方法来执行任务

import java.util.concurrent.Executor;

public class T01_MyExecutor implements Executor {

    public static void main(String[] args) {
        new T01_MyExecutor().execute(()->System.out.println("hello executor"));
    }

    @Override
    public void execute(Runnable command) {
        //new Thread(command).run();//另起线程方法调用
        command.run();//方法直接调用
    }

}

  2.任务接口

    1)callable接口:提供一个call方法,具有返回值可以抛出异常

    2)runnable接口:提供一个run方法,无返回值不可抛出异常

  3.ExecutorService接口(Executor的一个子接口)

    1)ExecutorService可以理解为服务,执行execute方法可以向服务器中添加runnable任务无返回值,而执行submit方法可以向服务中添加callable与runnable任务,且有返回值

    2)Executor与ExecutorService的简单理解:可以想象许多的执行器Executor在等待着执行任务,而service服务可通过调用execute或submit方法来添加不同的任务供执行器执行

    3)submit方法具有一个future类型的返回值,与callable,Executors一起使用

public class T06_Future {
    public static void main(String[] args) throws InterruptedException, ExecutionException {

        FutureTask<Integer> task = new FutureTask<>(()->{
            TimeUnit.MILLISECONDS.sleep(500);
            return 1000;
        }); //new Callable () { Integer call();}

        new Thread(task).start();

        System.out.println(task.get()); //阻塞

        //*******************************
        ExecutorService service = Executors.newFixedThreadPool(5);
        Future<Integer> f = service.submit(()->{
            TimeUnit.MILLISECONDS.sleep(500);
            return 1;
        });
        System.out.println(f.get());
        System.out.println(f.isDone());

    }
}

运行结果:

  4.Executors(操作Executor的一个工具类、工厂类)

二、ThreadPool线程池

  1.线程池将任务分配给池中线程,当线程池中线程执行不过来时,任务会进入等待队列,队列由BlockingQueue实现,当有线程空闲时再来任务时会被分配给空闲线程,一个线程同时维护着两个队列(结束队列与等待队列)

  下面一个程序帮助理解线程池的概念

public class T05_ThreadPool {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(5); //execute submit
        for (int i = 0; i < 6; i++) {
            service.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println(service);

        service.shutdown();//正常关闭,等待所有任务执行完毕,关闭线程池
        System.out.println(service.isTerminated());//判断所有任务是否都执行完了
        System.out.println(service.isShutdown());//判断线程池是否被关闭了
        System.out.println(service);

        TimeUnit.SECONDS.sleep(5);
        System.out.println(service.isTerminated());
        System.out.println(service.isShutdown());
        System.out.println(service);
    }
}

运行结果:

  2.六种线程池

    1)FixedThreadPool  固定线程数量的线程池

    2)cachedThreadPool  假定刚开始一个线程都没有,来一个任务开启一个线程,如果再来一个任务,线程池中有空闲线程就将任务分配给空闲线程,如此往复直到起到电脑能支撑限度为止,默认一个线程空闲超过一分钟就自动销毁

    3)singleThreadPool  线程池中只有一个线程

    4)scheduledThreadPool  定时器线程池

    5)workStealingPool  工作窃取线程池,当前线程池有线程空闲,会自动去寻找任务执行,默认根据CPU核数启动默认线程数目线程,是精灵(demon)线程(守护线程、后台线程),主函数不阻塞的话看不到输出,本质上是ForkJoinPool实现的

    6)ForkJoinPool  分叉合并线程,大任务可以切分成许多的小任务,如果小任务还是太大的话还可以继续分,分的可以了就将小任务合并,最后产生一个总的结果(有点像归并排序)

       ForkJoinTask从RecursiveAction和RecursiveTask继承

      a.RecursiveAction  无返回值

/**
 * 对数组中所有数求和
 * @author zhangqi
 *
 */
public class T12_ForkJoinPool {
    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random r = new Random();

    static {
        for(int i=0; i<nums.length; i++) {
            nums[i] = r.nextInt(100);
        }

        System.out.println(Arrays.stream(nums).sum()); //stream api
    }

    static class AddTask extends RecursiveAction { 

        int start, end;

        AddTask(int s, int e) {
            start = s;
            end = e;
        }

        @Override
        protected void compute() {

            if(end-start <= MAX_NUM) {
                long sum = 0L;
                for(int i=start; i<end; i++) sum += nums[i];
                System.out.println("from:" + start + " to:" + end + " = " + sum);
            } else {

                int middle = start + (end-start)/2;

                AddTask subTask1 = new AddTask(start, middle);
                AddTask subTask2 = new AddTask(middle, end);
                subTask1.fork();
                subTask2.fork();
            }

        }

    }

    /*static class AddTask extends RecursiveTask<Long> { 

        int start, end;

        AddTask(int s, int e) {
            start = s;
            end = e;
        }

        @Override
        protected Long compute() {

            if(end-start <= MAX_NUM) {
                long sum = 0L;
                for(int i=start; i<end; i++) sum += nums[i];
                return sum;
            } 

            int middle = start + (end-start)/2;

            AddTask subTask1 = new AddTask(start, middle);
            AddTask subTask2 = new AddTask(middle, end);
            subTask1.fork();
            subTask2.fork();

            return subTask1.join() + subTask2.join();
        }

    }*/

    public static void main(String[] args) throws IOException {
        ForkJoinPool fjp = new ForkJoinPool();
        AddTask task = new AddTask(0, nums.length);
        fjp.execute(task);
        //long result = task.join();
        //System.out.println(result);

        System.in.read();

    }
}

运行结果:

      b.RescursiveTask  有返回值的递归任务

/**
 * 对数组中所有数求和
 * @author zhangqi
 *
 */
public class T12_ForkJoinPool {
    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random r = new Random();

    static {
        for(int i=0; i<nums.length; i++) {
            nums[i] = r.nextInt(100);
        }

        System.out.println(Arrays.stream(nums).sum()); //stream api
    }

/*    static class AddTask extends RecursiveAction { 

        int start, end;

        AddTask(int s, int e) {
            start = s;
            end = e;
        }

        @Override
        protected void compute() {

            if(end-start <= MAX_NUM) {
                long sum = 0L;
                for(int i=start; i<end; i++) sum += nums[i];
                System.out.println("from:" + start + " to:" + end + " = " + sum);
            } else {

                int middle = start + (end-start)/2;

                AddTask subTask1 = new AddTask(start, middle);
                AddTask subTask2 = new AddTask(middle, end);
                subTask1.fork();
                subTask2.fork();
            }

        }

    }*/

    static class AddTask extends RecursiveTask<Long> { 

        int start, end;

        AddTask(int s, int e) {
            start = s;
            end = e;
        }

        @Override
        protected Long compute() {

            if(end-start <= MAX_NUM) {
                long sum = 0L;
                for(int i=start; i<end; i++) sum += nums[i];
                return sum;
            } 

            int middle = start + (end-start)/2;

            AddTask subTask1 = new AddTask(start, middle);
            AddTask subTask2 = new AddTask(middle, end);
            subTask1.fork();
            subTask2.fork();

            return subTask1.join() + subTask2.join();
        }

    }

    public static void main(String[] args) throws IOException {
        ForkJoinPool fjp = new ForkJoinPool();
        AddTask task = new AddTask(0, nums.length);
        fjp.execute(task);
        long result = task.join();
        System.out.println(result);

        //System.in.read();

    }
}

运行结果:

补充:

  ThreadPoolExecutor线程池执行器(自定义线程池,许多线程池的底层实现,ForkJoinPool不是由它实现)

时间: 2024-10-06 02:19:30

Java高并发编程(四)的相关文章

Java高并发编程(一)

1.原子量级操作(读.++操作.写分为最小的操作量单位,在多线程中进行原子量级编程保证程序可见性(有序性人为规定)) 由于某些问题在多线程条件下:产生了竞争的问题,(例如:在多线程中一个简单的计数器增加)如果在程序中不采用同步的机制,那么在程序的运行结果中,多个线程在访问此资源时候,产生Racing.解决这个问题,采用某种方式阻止其他线程在该线程使用该变量的时候使用该变量 采用原子级操作:1.采用加锁的机制(最好的操作)2.Java.concurrent.atomic包包含一些原子量操作:Ato

java高并发编程(二)

马士兵java并发编程的代码,照抄过来,做个记录. 一.分析下面面试题 /** * 曾经的面试题:(淘宝?) * 实现一个容器,提供两个方法,add,size * 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束 * * 分析下面这个程序,能完成这个功能吗? * @author mashibing */ package yxxy.c_019; import java.util.ArrayList; import java.util.List

java高并发编程(五)线程池

摘自马士兵java并发编程 一.认识Executor.ExecutorService.Callable.Executors /** * 认识Executor */ package yxxy.c_026; import java.util.concurrent.Executor; public class T01_MyExecutor implements Executor { public static void main(String[] args) { new T01_MyExecutor(

Java高并发编程(三)

目录: 1.线程安全单例模式的几种实现方式 2.同步容器 3.并发容器 一.线程安全单例模式的几种实现方式 1.饿汉式(不使用同步锁,典型的用空间换时间) public class Singleton1 { private static Singleton1 mySingleton = new Singleton1(); private Singleton1(){ System.out.println("single"); } public static Singleton1 getS

java高并发编程--03--线程间通信

1.同步阻塞与异步非阻塞 1.1同步阻塞消息处理 服务端监听端口,客户端提交Event,服务端创建线程接收Event,处理Event,返回结果 缺陷: 同步Event提交,客户端等待时间过长(提交Event时间+接收Event时间+处理Event时间+返回结果时间)会陷入阻塞,导致二次提交Event耗时过长 由于客户端提交Event数量有限,导致系统受理业务数量有限,系统吞吐量不高 一个线程处理一个Event,线程频繁创建销毁,从而增加系统额外开销 业务达到峰值时,大量业务处理线程会导致CPU频

[高并发]Java高并发编程系列开山篇--线程实现

ava是最早开始有并发的语言之一,再过去传统多任务的模式下,人们发现很难解决一些更为复杂的问题,这个时候我们就有了并发. 引用 多线程比多任务更加有挑战.多线程是在同一个程序内部并行执行,因此会对相同的内存空间进行并发读写操作.这可能是在单线程程序中从来不会遇到的问题.其中的一些错误也未必会在单CPU机器上出现,因为两个线程从来不会得到真正的并行执行.然而,更现代的计算机伴随着多核CPU的出现,也就意味着不同的线程能被不同的CPU核得到真正意义的并行执行. 那么,要开始Java并发之路,就要开始

java高并发编程--04--Hook线程以及捕获线程执行异常

1.获取线程运行时异常Thread类处理运行时异常的四个API:public void setUncaughtExceptionHandler(UncaughtExceptionHandler eh):为某个线程UncaughtExceptionHandlerpublic static setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler eh):设置全局UncaughtExceptionHandlerpublic Uncaugh

马士兵java高并发编程三

1.使用静态内部类实现线程安全的单例模式 package com.weiyuan.test; /** * 采用内部类的形式实现单例模式 * 是同步安全的,并且实现了懒加载 * */ public class Sigleton { private Sigleton(){ } private static class Inner{ private static Sigleton s = new Sigleton(); } public static Sigleton getInstance(){ r

Java 面试知识点解析(二)——高并发编程篇

前言: 在遨游了一番 Java Web 的世界之后,发现了自己的一些缺失,所以就着一篇深度好文:知名互联网公司校招 Java 开发岗面试知识点解析 ,来好好的对 Java 知识点进行复习和学习一番,大部分内容参照自这一篇文章,有一些自己补充的,也算是重新学习一下 Java 吧. 前序文章链接: Java 面试知识点解析(一)--基础知识篇 (一)高并发编程基础知识 这里涉及到一些基础的概念,我重新捧起了一下<实战 Java 高并发程序设计>这一本书,感觉到心潮澎湃,这或许就是笔者叙述功底扎实的