Java多线程——执行器(Executor)

Markdown编辑器上线啦,终于等到你了!使用这个编辑器写了两篇,感觉还是不错的!不过还是有一些问题,慢慢熟悉吧!

执行器

构建一个新的线程是有一定的代价的,因为涉及到和操作系统的交互。如果程序中创建了大量的生命周期很短的线程,应该使用线程池(thread pool)。

另一个使用线程池的理由是减少并发线程的数目。线程数量太多会大大降低性能甚至会使虚拟机崩溃。如果有一个会创建许多线程的算法,应该使用一个线程数“固定的”线程池以限制并发线程的总数。

Executor类构建线程池的静态方法

方法 描述
newCachedThreadPool 必要时创建新线程,空闲线程会被保留60秒
newFixedThreadPool 该池包含固定数量的线程;空闲线程会一直被保留
newSingleThreadExecutor 只有一个线程的“池”,该线程顺序执行每一个提交的任务
newScheduledThreadPool 用于预定执行而构建的固定线程池,替代java.util.Timer
newSingleThreadScheduledExecutor 用于预定执行而构建的单线程“池”

线程池

newCachedThreadPool方法构建了一个线程池,对于每个任务,如果有空闲的线程可用,立即让它执行任务,如果没有可用的空闲线程,则创建一个新线程。

newFixedThreadPool方法构建一个具有固定大小的线程池。如果提交的任务数多于空闲的线程数,那么把得不到的服务的任务放置到队列中。当其他任务完成以后再运行它们。

newSingleThreadExecutor是一个退化了的大小为1的线程池:由一个线程执行提交的任务,一个接着一个。这三个方法返回实现了ExecutorService接口的ThreadPoolExecutor类的对象。

使用连接池:

  1. 调用Executors类中静态的方法newCachedThreadPool或newFixedThreadPool。
  2. 调用submit提交Runnable或Callable对象。
  3. 如果想要取消一个任务,或如果提交Callable对象,那就要保存好返回的Future对象。
  4. 当不在提交任何任务时,调用shutdown。

MacthCounter类:

/**
 * @author xzzhao
 */
public class MacthCounter implements Callable<Integer> {

    private final File            directory;
    private final String          keyword;
    private final ExecutorService pool;
    private int                   count;

    public MacthCounter(File directory, String keyword, ExecutorService pool) {
        super();
        this.directory = directory;
        this.keyword = keyword;
        this.pool = pool;
    }

    @Override
    public Integer call() throws Exception {
        count = 0;
        File[] files = directory.listFiles();
        List<Future<Integer>> results = new ArrayList<>();
        for (File file : files) {
            if (file.isDirectory()) {
                MacthCounter counter = new MacthCounter(file, keyword, pool);
                FutureTask<Integer> task = new FutureTask<>(counter);
                results.add(task);
            } else {
                if (search(file)) {
                    count++;
                }
            }
        }
        for (Future<Integer> result : results) {
            count += result.get();
        }
        return count;
    }

    /**
     * 搜索方法
     *
     * @param file
     * @return 是否找到
     */
    public boolean search(File file) {
        try {
            try (Scanner in = new Scanner(file)) {
                boolean found = false;
                while (!found && in.hasNextLine()) {
                    String line = in.nextLine();
                    if (line.contains(keyword)) {
                        found = true;
                    }
                }
                return found;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
}

ThreadPoolTest类:

/**
 * @author xzzhao
 */
public class ThreadPoolTest {
    public static void main(String[] args) {
        Scanner in = new Scanner(System.in);
        System.out.println("请输入根目录 :");
        String directory = in.nextLine();
        System.out.println("请输入关键字 :");
        String keyword = in.nextLine();

        ExecutorService pool = Executors.newCachedThreadPool();

        MacthCounter counter = new MacthCounter(new File(directory), keyword, pool);
        Future<Integer> result = pool.submit(counter);

        try {
            System.out.println("匹配到的文档数:" + result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        pool.shutdown();
        int largestPoolSize = ((ThreadPoolExecutor) pool).getLargestPoolSize();
        System.out.println("largestPoolSize=" + largestPoolSize);
    }
}

预定执行

ScheduledExecutorService接口具有为预定执行或重复执行任务而设计的方法。它是一种允许使用线程机制的Java.util.Timer的泛化。Executors类的newScheduledThreadPool和newSingleThreadScheduledExecutor方法将返回实现了ScheduledExecutorService接口的对象。

可以预定Runnable或Callable在初始的延迟之后只运行一次。也可以预定一个Runnable对象周期性地运行。

控制任务组

我们已经知道如何将一个执行器服务作为线程池使用,以提高执行任务的效率。有时候,我们要使用执行器来做更有实际意义的事,控制一组相关的任务。例如,可以在执行器中使用shutdownNow方法取消所有的任务。

invokeAll方法提交所有对象到一个Callable对象的集合中,并返回一个Future对象的列表,代表所有任务的结果。这个方法的缺点是如果第一个任务花去了很多时间,那么就可能不得不进行等待。将结果按可获得的顺序保存起来更有意义。可以用ExecutorComeletionService来进行排序。

具体可以查询API。用常规的方法获得一个执行器。然后,构建一个ExecutorComeletionService,提交任务给完成服务。该服务管理Future对象的阻塞队列,其中包含已经提交的任务的执行结果。

大概如下:

ExecutorService executor = Executors.newCachedThreadPool();
ExecutorCompletionService service = new ExecutorCompletionService<>(executor);
for (Callable<T> task : tasks) {
     service.submit(task);
}
for (int i = 0; i < task.size(); i++) {
     processFurther(service.take().get());
}

Fork-Join框架

有的应用程序使用了大量的线程,但其中大多数都是空闲的。举例来说,一个Web服务器可能会为每个连接分别使用一个线程。另外一些应用可能对每个处理器内核分别使用一个线程,来完成计算密集的任务,如图像或视频处理。Java SE 7 中新引入了fork-join框架,专门用来支持后一类的应用。

我们来讨论一个简单的例子。假设我们想统计一个数组中有多少个元素满足摸个特定的属性。可以将这个数组一分为二,分别对着两部分进行统计,再将结果相加。

Counter 类:

/**
 * @author xzzhao
 */
public class Counter extends RecursiveTask<Integer> {

    public static final int THRESHOLD = 1000;
    private final double[]  values;
    private final int       from;
    private final int       to;
    private final Filter    filter;

    public Counter(double[] values, int from, int to, Filter filter) {
        super();
        this.values = values;
        this.from = from;
        this.to = to;
        this.filter = filter;
    }

    @Override
    protected Integer compute() {
        if (to - from < THRESHOLD) {
            int count = 0;
            for (int i = from; i < to; i++) {
                if (filter.accept(values[i])) {
                    count++;
                }
            }
            return count;
        } else {
            int mid = ((from + to) / 2);
            Counter first = new Counter(values, from, mid, filter);
            Counter second = new Counter(values, mid, to, filter);
            invokeAll(first, second);
            return first.join() + second.join();
        }
    }
}

Counter 类:

/**
 * @author xzzhao
 */
public class ForkJoinTest {
    public static void main(String[] args) {
        final int SIZE = 10000000;
        double[] numbers = new double[SIZE];
        for (int i = 0; i < SIZE; i++) {
            numbers[i] = Math.random();
        }
        Counter counter = new Counter(numbers, 0, numbers.length, new Filter() {
            @Override
            public boolean accept(double x) {
                return x > 0.5;
            }
        });
        ForkJoinPool pool = new ForkJoinPool();
        pool.invoke(counter);
        System.out.println(counter.join());
    }
}

采用框架可用的一种方式完成这种递归计算,需要提供一个扩展RecursiveTask的类,如果计算会生成一个结果的话,或者如果不生成任何结果,就可以提供一个扩展RecursiveAction的类。再覆盖compute方法来生成并调用子任务,然后合并结果。

invokeAll方法接收很多任务并阻塞,知道所有的这些任务都已经完成。join方法将生成结果。对每个子任务都应用了join,并返回总和。

时间: 2024-11-06 09:28:09

Java多线程——执行器(Executor)的相关文章

Java多线程框架Executor详解

原文链接  http://www.imooc.com/article/14377 为什么引入Executor线程池框架new Thread()的缺点 每次new Thread()耗费性能调用new Thread()创建的线程缺乏管理,被称为野线程,而且可以无限制创建,之间相互竞争,会导致过多占用系统资源导致系统瘫痪.不利于扩展,比如如定时执行.定期执行.线程中断 采用线程池的优点 重用存在的线程,减少对象创建.消亡的开销,性能佳可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争

Java 多线程(2)-Executor

public interface Executor{ void executor(Runnable command); } 如上所写,Executor实际上是一个接口,他提供了唯一的接口方法executor(Runnable command)

Java多线程——&lt;三&gt;简单的线程执行:Executor

一.概述 按照<Java多线程——<一><二>>中所讲,我们要使用线程,目前都是显示的声明Thread,并调用其start()方法.多线程并行,明显我们需要声明多个线程然后都调用他的start方法,这么一看,似乎有些问题:第一.线程一旦多了,声明势必是个问题:第二.多线程启动如果通过手动执行的话,那可能一个线程已经跑完了,另外一个还没起来(我推测可能会出现这个问题).所以,我们在想,如果有个管家,能够帮我们管理这么多线程,只需要把我们定义的任务交给管家,管家就能够帮我们

Java多线程学习(八)线程池与Executor 框架

Java面试通关手册(Java学习指南,欢迎Star,会一直完善下去,欢迎建议和指导):https://github.com/Snailclimb/Java_Guide 历史优质文章推荐: Java并发编程指南专栏 分布式系统的经典基础理论 可能是最漂亮的Spring事务管理详解 面试中关于Java虚拟机(jvm)的问题看这篇就够了 目录: [TOC] 本节思维导图: 思维导图源文件+思维导图软件关注微信公众号:"Java面试通关手册" 回复关键字:"Java多线程"

Java并发编程 - Executor,Executors,ExecutorService, CompletionServie,Future,Callable

一.Exectuor框架简介 Java从1.5版本开始,为简化多线程并发编程,引入全新的并发编程包:java.util.concurrent及其并发编程框架(Executor框架). Executor框架是指java 5中引入的一系列并发库中与executor相关的一些功能类,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等.他们的关系为 在Executor框架中,使用执行器(Exectuo

java多线程学习(2)

1)Callable和Future Runnable封装一个异步运行的任务:可以当成一个没有任何参数和返回值的异步方法,Callable和 Runnable类似,但是它有返回值和参数. Callable接口是一个参数化的类型,只有一个方法call. 1 public interface Callable<V> 2 3 { 4 5 V call()throws Exception; 6 7 } 类型参数v是指返回值的类型,例如Callable<Integer>代表最终返回一个Inte

阿里内部Java多线程资料整理

目录: 1.volatile变量 2.Java并发编程学习 3.CountDownLatch用法 4.CyclicBarrier使用 5.BlockingQueue使用 6.任务执行器Executor7.CompletionService使用8.ConcurrentHashMap使用9.Lock使用 一. volatile变量 1.volatile原理:volatile的原理实际上是告诉处理器,不要把变量缓存在寄存器或者相对于其他处理器不可见的地方,而是把变量放在主存,每次读写操作都在主存上进行

week 11 ——java 多线程

1. 本周学习总结 1.1 以你喜欢的方式(思维导图或其他)归纳总结多线程相关内容. 2. 书面作业 本次PTA作业题集多线程 1. 源代码阅读:多线程程序BounceThread 1.1 BallRunnable类有什么用?为什么代码中需要调用Thread.sleep进行休眠? 答:1.作用:它实现了Runnable接口. 2.因为这个类在run()方法内使小球能够移动,调用Thread.sleep进行休眠是为了提高程序运行的效率. 1.2 Ball.java只做了两件事,这两件事分别是什么?

JAVA多线程和并发基础面试问答(转载)

原文链接:http://www.cnblogs.com/dolphin0520/p/3932934.html 多线程和并发问题是Java技术面试中面试官比较喜欢问的问题之一.在这里,从面试的角度列出了大部分重要的问题,但是你仍然应该牢固的掌握Java多线程基础知识来对应日后碰到的问题. Java多线程面试问题 1. 进程和线程之间有什么不同? 一个进程是一个独立(self contained)的运行环境,它可以被看作一个程序或者一个应用.而线程是在进程中执行的一个任务.Java运行环境是一个包含