Markdown编辑器上线啦,终于等到你了!使用这个编辑器写了两篇,感觉还是不错的!不过还是有一些问题,慢慢熟悉吧!
执行器
构建一个新的线程是有一定的代价的,因为涉及到和操作系统的交互。如果程序中创建了大量的生命周期很短的线程,应该使用线程池(thread pool)。
另一个使用线程池的理由是减少并发线程的数目。线程数量太多会大大降低性能甚至会使虚拟机崩溃。如果有一个会创建许多线程的算法,应该使用一个线程数“固定的”线程池以限制并发线程的总数。
Executor类构建线程池的静态方法
方法 | 描述 |
---|---|
newCachedThreadPool | 必要时创建新线程,空闲线程会被保留60秒 |
newFixedThreadPool | 该池包含固定数量的线程;空闲线程会一直被保留 |
newSingleThreadExecutor | 只有一个线程的“池”,该线程顺序执行每一个提交的任务 |
newScheduledThreadPool | 用于预定执行而构建的固定线程池,替代java.util.Timer |
newSingleThreadScheduledExecutor | 用于预定执行而构建的单线程“池” |
线程池
newCachedThreadPool方法构建了一个线程池,对于每个任务,如果有空闲的线程可用,立即让它执行任务,如果没有可用的空闲线程,则创建一个新线程。
newFixedThreadPool方法构建一个具有固定大小的线程池。如果提交的任务数多于空闲的线程数,那么把得不到的服务的任务放置到队列中。当其他任务完成以后再运行它们。
newSingleThreadExecutor是一个退化了的大小为1的线程池:由一个线程执行提交的任务,一个接着一个。这三个方法返回实现了ExecutorService接口的ThreadPoolExecutor类的对象。
使用连接池:
- 调用Executors类中静态的方法newCachedThreadPool或newFixedThreadPool。
- 调用submit提交Runnable或Callable对象。
- 如果想要取消一个任务,或如果提交Callable对象,那就要保存好返回的Future对象。
- 当不在提交任何任务时,调用shutdown。
MacthCounter类:
/**
* @author xzzhao
*/
public class MacthCounter implements Callable<Integer> {
private final File directory;
private final String keyword;
private final ExecutorService pool;
private int count;
public MacthCounter(File directory, String keyword, ExecutorService pool) {
super();
this.directory = directory;
this.keyword = keyword;
this.pool = pool;
}
@Override
public Integer call() throws Exception {
count = 0;
File[] files = directory.listFiles();
List<Future<Integer>> results = new ArrayList<>();
for (File file : files) {
if (file.isDirectory()) {
MacthCounter counter = new MacthCounter(file, keyword, pool);
FutureTask<Integer> task = new FutureTask<>(counter);
results.add(task);
} else {
if (search(file)) {
count++;
}
}
}
for (Future<Integer> result : results) {
count += result.get();
}
return count;
}
/**
* 搜索方法
*
* @param file
* @return 是否找到
*/
public boolean search(File file) {
try {
try (Scanner in = new Scanner(file)) {
boolean found = false;
while (!found && in.hasNextLine()) {
String line = in.nextLine();
if (line.contains(keyword)) {
found = true;
}
}
return found;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
}
ThreadPoolTest类:
/**
* @author xzzhao
*/
public class ThreadPoolTest {
public static void main(String[] args) {
Scanner in = new Scanner(System.in);
System.out.println("请输入根目录 :");
String directory = in.nextLine();
System.out.println("请输入关键字 :");
String keyword = in.nextLine();
ExecutorService pool = Executors.newCachedThreadPool();
MacthCounter counter = new MacthCounter(new File(directory), keyword, pool);
Future<Integer> result = pool.submit(counter);
try {
System.out.println("匹配到的文档数:" + result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
pool.shutdown();
int largestPoolSize = ((ThreadPoolExecutor) pool).getLargestPoolSize();
System.out.println("largestPoolSize=" + largestPoolSize);
}
}
预定执行
ScheduledExecutorService接口具有为预定执行或重复执行任务而设计的方法。它是一种允许使用线程机制的Java.util.Timer的泛化。Executors类的newScheduledThreadPool和newSingleThreadScheduledExecutor方法将返回实现了ScheduledExecutorService接口的对象。
可以预定Runnable或Callable在初始的延迟之后只运行一次。也可以预定一个Runnable对象周期性地运行。
控制任务组
我们已经知道如何将一个执行器服务作为线程池使用,以提高执行任务的效率。有时候,我们要使用执行器来做更有实际意义的事,控制一组相关的任务。例如,可以在执行器中使用shutdownNow方法取消所有的任务。
invokeAll方法提交所有对象到一个Callable对象的集合中,并返回一个Future对象的列表,代表所有任务的结果。这个方法的缺点是如果第一个任务花去了很多时间,那么就可能不得不进行等待。将结果按可获得的顺序保存起来更有意义。可以用ExecutorComeletionService来进行排序。
具体可以查询API。用常规的方法获得一个执行器。然后,构建一个ExecutorComeletionService,提交任务给完成服务。该服务管理Future对象的阻塞队列,其中包含已经提交的任务的执行结果。
大概如下:
ExecutorService executor = Executors.newCachedThreadPool();
ExecutorCompletionService service = new ExecutorCompletionService<>(executor);
for (Callable<T> task : tasks) {
service.submit(task);
}
for (int i = 0; i < task.size(); i++) {
processFurther(service.take().get());
}
Fork-Join框架
有的应用程序使用了大量的线程,但其中大多数都是空闲的。举例来说,一个Web服务器可能会为每个连接分别使用一个线程。另外一些应用可能对每个处理器内核分别使用一个线程,来完成计算密集的任务,如图像或视频处理。Java SE 7 中新引入了fork-join框架,专门用来支持后一类的应用。
我们来讨论一个简单的例子。假设我们想统计一个数组中有多少个元素满足摸个特定的属性。可以将这个数组一分为二,分别对着两部分进行统计,再将结果相加。
Counter 类:
/**
* @author xzzhao
*/
public class Counter extends RecursiveTask<Integer> {
public static final int THRESHOLD = 1000;
private final double[] values;
private final int from;
private final int to;
private final Filter filter;
public Counter(double[] values, int from, int to, Filter filter) {
super();
this.values = values;
this.from = from;
this.to = to;
this.filter = filter;
}
@Override
protected Integer compute() {
if (to - from < THRESHOLD) {
int count = 0;
for (int i = from; i < to; i++) {
if (filter.accept(values[i])) {
count++;
}
}
return count;
} else {
int mid = ((from + to) / 2);
Counter first = new Counter(values, from, mid, filter);
Counter second = new Counter(values, mid, to, filter);
invokeAll(first, second);
return first.join() + second.join();
}
}
}
Counter 类:
/**
* @author xzzhao
*/
public class ForkJoinTest {
public static void main(String[] args) {
final int SIZE = 10000000;
double[] numbers = new double[SIZE];
for (int i = 0; i < SIZE; i++) {
numbers[i] = Math.random();
}
Counter counter = new Counter(numbers, 0, numbers.length, new Filter() {
@Override
public boolean accept(double x) {
return x > 0.5;
}
});
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(counter);
System.out.println(counter.join());
}
}
采用框架可用的一种方式完成这种递归计算,需要提供一个扩展RecursiveTask的类,如果计算会生成一个结果的话,或者如果不生成任何结果,就可以提供一个扩展RecursiveAction的类。再覆盖compute方法来生成并调用子任务,然后合并结果。
invokeAll方法接收很多任务并阻塞,知道所有的这些任务都已经完成。join方法将生成结果。对每个子任务都应用了join,并返回总和。