多线程批量执行等待全部结果

来自:http://blog.csdn.net/wxwzy738/article/details/8497853

http://blog.csdn.net/cutesource/article/details/6061229

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

public class Test17 {
    public static void main(String[] args) throws Exception {
        Test17 t = new Test17();
        t.count1();
        t.count2();
    }
//使用阻塞容器保存每次Executor处理的结果,在后面进行统一处理
    public void count1() throws Exception{
        ExecutorService exec = Executors.newCachedThreadPool();
        BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();
        for(int i=0; i<10; i++){
            Future<Integer> future =exec.submit(getTask());
            queue.add(future);
        }
        int sum = 0;
        int queueSize = queue.size();
        for(int i=0; i<queueSize; i++){
            sum += queue.take().get();
        }
        System.out.println("总数为:"+sum);
        exec.shutdown();
    }
//使用CompletionService(完成服务)保持Executor处理的结果
    public void count2() throws InterruptedException, ExecutionException{
        ExecutorService exec = Executors.newCachedThreadPool();
        CompletionService<Integer> execcomp = new ExecutorCompletionService<Integer>(exec);
        for(int i=0; i<10; i++){
            execcomp.submit(getTask());
        }
        int sum = 0;
        for(int i=0; i<10; i++){
//检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。
            Future<Integer> future = execcomp.take();
            sum += future.get();
        }
        System.out.println("总数为:"+sum);
        exec.shutdown();
    }
    //得到一个任务
    public Callable<Integer> getTask(){
        final Random rand = new Random();
        Callable<Integer> task = new Callable<Integer>(){
            @Override
            public Integer call() throws Exception {
                int i = rand.nextInt(10);
                int j = rand.nextInt(10);
                int sum = i*j;
                System.out.print(sum+"\t");
                return sum;
            }
        };
        return task;
    }
    /**
     * 执行结果:
        6    6    14    40    40    0    4    7    0    0    总数为:106
        12    6    12    54    81    18    14    35    45    35    总数为:312
     */
}

先看一下新建一个ThreadPoolExecutor的构建参数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

看这个参数很容易让人以为是线程池里保持corePoolSize个线程,如果不够用,就加线程入池直至maximumPoolSize大小,如果 还不够就往workQueue里加,如果workQueue也不够就用RejectedExecutionHandler来做拒绝处理。

但实际情况不是这样,具体流程如下:

1)当池子大小小于corePoolSize就新建线程,并处理请求

2)当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去从workQueue中取任务并处理

3)当workQueue放不下新入的任务时,新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize就用RejectedExecutionHandler来做拒绝处理

4)另外,当池子的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁

内部结构如下所示:

从中可以发现ThreadPoolExecutor就是依靠BlockingQueue的阻塞机制来维持线程池,当池子里的线程无事可干的时候就通过workQueue.take()阻塞住。

其实可以通过Executes来学学几种特殊的ThreadPoolExecutor是如何构建的。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

newFixedThreadPool就是一个固定大小的ThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

newCachedThreadPool比较适合没有固定大小并且比较快速就能完成的小任务,没必要维持一个Pool,这比直接new Thread来处理的好处是能在60秒内重用已创建的线程。

其他类型的ThreadPool看看构建参数再结合上面所说的特性就大致知道它的特性

时间: 2024-07-30 18:02:36

多线程批量执行等待全部结果的相关文章

java多线程批量执行的时限问题

需求:需要并发执行三个线程,要求在指定的时间内返回结果,如果某个线程超时,则返回为空. 思路:使用ExecutorService的invokeAll(time,timeUint)方法来设置执行时限,该方法返回一个List<Future<T>>,一旦返回后,即取消尚未完成的任务,然后再从list中读取future并调用future.get()方法来获取线程返回的结果,如果future.get()抛出CancellationException 则说明该任务未完成被取消了. <T&

c# Task多线程并行任务中等待所有线程都执行完成

C#多线程中如何等待所有线程的任务都执行完成呢?在.net 4.0以4.0+中,有Task.WaitAll(params Task[] tasks)方法来等待所有Task[],而不需要更多的操作.按照微软官方的文档,我写了一个实例来调试和说明: using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Thre

sh, 批量执行Linux命令

step 1:创建一个sh批处理命令文件 # vim /etc/batch_ssh/install_redis.sh step 2:给当前用户,能够执行sh脚本权限# chmod install_redis.sh 777 step 3: 编写要批量执行的命令,read表示等待前端用户输入,sleep表示等待时间单位为 秒. echo '\n begin to install 01 plugin \n'; yum install cpp -y; echo '\n yum finish instal

python 通过paramiko模块批量执行ssh命令

多台设备批量执行ssh命令,目前是串行,后期会加入多线程实现并行,直接上源码 # 多台设备批量执行ssh命令     #!/usr/bin/env python     # -*- coding:utf-8 -*-     __author__  = 'babyshen'     __version__ = '1.0.0'     import paramiko     class SSh(object):     def __init__(self,port,username,password

多线程批量探测目标IP段的服务器类型(内网也可用)

一 原理解释 这里所说的服务器类型是指像Apache,tomcat,nginx,IIS这种.其中原理用到了HTTP Header的Responses,这里面有项叫"Server"的参数就包涵我们所需要的信息.下面是Responses的部分截图: (PS:更多相关可自行百度"HTTP Header") 因此,我们想要做一个多线程批量探测的软件,思路有两种:(1)根据别人提供的接口然后我们去调用获取(比如:http://api.builtwith.com 这个我以后可能

多线程同步执行

引用命名空间using System.Threading.Tasks;1.将互补影响的多个方法同时执行 Parallel.Invoke(() => { 方法一; }, () => { 方法二; }......);2.将foreach中的循环批量执行foreach (DataRow dr in dt.Rows){}使用下面多线程IEnumerable<DataRow> rows =dt.Rows.Cast<DataRow>(); Parallel.ForEach<D

Python脚本远程批量执行命令

摘要 本文主要写用python脚本远程连接多台服务器,然后批量执行命令,最终返回命令执行结果. 这个可以说是Ansible,Puppet等工具的最简单的雏形. 做运维的同学应该都知道的. 正文 multi_task.py #_*_coding:utf-8_*_ import  multiprocessing import paramiko import getpass import ConfigParser class MultiTask(object):     '''handles all 

批量执行shell命令

虽然目前都实现了自动化如puppet saltstack在环境中的应用,但工作中不可避免的要自己写一些简单的批量执行shell命令的脚本. python paramiko模块是目前使用得较为顺手的模块,执行命令时基本无需要转换,直接将shell命令扔进去执行就OK 简单示例,10个线程同时执行ssh或scp动作,未设置timeout时间,如执行长时间无反应会导致脚本执行问题: #!/usr/bin/python # _*_ coding: utf-8 _*_ import paramiko im

selenium之批量执行测试用例

把写好的测试用例放在指定目录下,使用discover函数扫描该目录,并根据关键字自动筛选需要执行的用例.本例使用Python3.6版本. 1 # 遍历指定目录,批量执行测试用例 2 import unittest 3 4 case_dir = 'D:\\test_case' 5 6 7 def suites_run(): 8 '''运行测试套件,批量执行测试用例''' 9 # discover函数遍历指定目录,按条件过滤文件,返回测试套件列表 10 discover_suites = unitt