Callable+ThreadPoolExecutor实现多线程并发并获得返回值(转)

出处:https://blog.csdn.net/kity9420/article/details/80740466

前言
  经常会遇到一些性能问题,比如调用某个接口,可能要循环调用100次,并且需要拿到每一次调用的返回结果,通常我们都是放在for循环中一次次的串行调用,这种方式可想而知道有多慢,那怎么解决这个问题呢?

多线程
  为了解决以上问题,我使用的方式是多线程。多线程常规的有两种实现方式,即继承Tread类,实现Runnable接口,但是这两种实现方式,有一个共同的问题,就是没有返回值,对于我们来说,获得每个线程的返回值,是个很困难的问题,因此不能用Tread类或Runnable接口,我用的是Callable和ThreadPoolExecutor,Callable的process方法可以允许有返回值,ThreadPoolExecutor的invokeAll或submit方法可以拿到线程的执行结果

案例
  假设需要给100个用户发送邮件,并需要每个用户的返回结果,先看下代码结构

CallableTemplate.java

package com.gdut.thread.multiThread;

import java.util.concurrent.Callable;

/**
 * 多线程模板类
 * @author yang.han
 *
 * @param <V>
 */
public abstract class CallableTemplate<V> implements Callable<V>{

    /**
     * 前置处理,子类可以Override该方法
     */
    public void beforeProcess() {
        System.out.println("before process");
    }

    /**
     * 处理业务逻辑的方法,需要子类去Override
     * @param <V>
     * @return
     */
    public abstract V process();

    /**
     * 后置处理,子类可以Override该方法
     */
    public void afterProcess() {
        System.out.println("after process");
    }

    @Override
    public V call() throws Exception {
        beforeProcess();
        V result = process();
        afterProcess();
        return result;
    }

}

  CallableTemplate类实现了Callable接口,并实现了process方法,该类是一个抽象类,接收任意返回值的类型,beforeProcess方法为前置处理,afterProcess的后置处理,process为具体的业务逻辑抽象方法,该方法在子类中实现

IConcurrentThreadPool.java

package com.gdut.thread.multiThread;

import java.util.List;
import java.util.concurrent.ExecutionException;

public interface IConcurrentThreadPool {

    /**
     * 初始化线程池
     */
    void initConcurrentThreadPool();

    /**
     * 提交单个任务
     * @param <V>
     * @param task
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    <V> V submit(CallableTemplate<V> task) throws InterruptedException, ExecutionException;

    /**
     * 提交多个任务
     * @param <V>
     * @param tasks
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    <V> List<V> invokeAll(List<? extends CallableTemplate<V>> tasks) throws InterruptedException, ExecutionException;
}

  IConcurrentThreadPool是多线程接口类,声名了三个方法,initConcurrentThreadPool:初始化线程池,submit:提交单个任务的线程,并有返回值,invokeAll:提交多个任务的线程,并有返回值

ConcurrentThreadPool.java

package com.gdut.thread.multiThread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ConcurrentThreadPool implements IConcurrentThreadPool{

    private ThreadPoolExecutor threadPoolExecutor;
    // 核心线程数
    private int corePoolSize = 10;
    // 最大线程数
    private int maximumPoolSize = 20;
    // 超时时间30秒
    private long keepAliveTime = 30;

    @Override
    public void initConcurrentThreadPool() {
        threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
                                                    maximumPoolSize,
                                                    keepAliveTime,
                                                    TimeUnit.SECONDS,
                                                    new LinkedBlockingDeque<Runnable>()
                                                    );
    }

    @Override
    public <V> V submit(CallableTemplate<V> task) throws InterruptedException, ExecutionException {
        Future<V> result = threadPoolExecutor.submit(task);
        return result.get();
    }

    @Override
    public <V> List<V> invokeAll(List<? extends CallableTemplate<V>> tasks) throws InterruptedException, ExecutionException {
        List<Future<V>> tasksResult = threadPoolExecutor.invokeAll(tasks);
        List<V> resultList = new ArrayList<V>();

        for(Future<V> future : tasksResult) {
            resultList.add(future.get());
        }
        return resultList;
    }

}

  ConcurrentThreadPool是创建线程池的实现类,用到了ThreadPoolExecutor线程池类及这个类的invokeAll方法和submit方法,这两个方法的返回值,都可以通过Future类的get方法获得

ICallableTaskFrameWork.java

package com.gdut.thread.multiThread;

import java.util.List;
import java.util.concurrent.ExecutionException;

public interface ICallableTaskFrameWork {
    <V> List<V> submitsAll(List<? extends CallableTemplate<V>> tasks)
            throws InterruptedException, ExecutionException;
}

  ICallableTaskFrameWork是定义的线程任务框架接口,所有的多线程调用,都通过该接口发起

CallableTaskFrameWork.java

package com.gdut.thread.multiThread;

import java.util.List;
import java.util.concurrent.ExecutionException;

public class CallableTaskFrameWork implements ICallableTaskFrameWork{

    private IConcurrentThreadPool concurrentThreadPool = new ConcurrentThreadPool();

    @Override
    public <V> List<V> submitsAll(List<? extends CallableTemplate<V>> tasks)
            throws InterruptedException, ExecutionException {
        concurrentThreadPool.initConcurrentThreadPool();
        return concurrentThreadPool.invokeAll(tasks);
    }

}

  CallableTaskFrameWork是ICallableTaskFrameWork 的实现类,在submitsAll实现方法中,通过调用线程池对象IConcurrentThreadPool接口的invokeAll方法来发起多线程的调用,这里注意一个,在submitAll实现方法中,我手动的调用了初始化线程池的方法concurrentThreadPool.initConcurrentThreadPool(),在真实的项目上,应该在应用启动的时候就调用该方法来初始化线程池

测试类代码 
SendMessageService.java,假设这是一个发送邮件信息的服务类

package com.gdut.thread.multiThread;

public class SendMessageService {
    public void sendMessage(String email,String content){
        System.out.println("发送邮件。。。");
    }
}

SendMessageHander.java,多线程发送邮件的处理类

package com.gdut.thread.multiThread;

import java.util.HashMap;
import java.util.Map;

public class SendMessageHander extends CallableTemplate<Map<String, String>>{

    private String email;
    private String content;
    public SendMessageHander(String email,String content) {
        this.email = email;
        this.content = content;
    }

    @Override
    public Map<String, String> process() {
        SendMessageService sendMessageService = new SendMessageService();
        sendMessageService.sendMessage(email, content);
        Map<String, String> map = new HashMap<String, String>();
        map.put(email, content);
        return map;
    }

}

  这个类继承了上面的CallableTemplate,我们要的返回值是Map,因此泛型类型是Map,在类中还重写了process方法,在方法中调用发送邮件的业务逻辑接口SendMessageService.sendMessage,并将返回结果组装成Map返回,这里我就简单处理了,将邮件地址及内容放在Map中直接返回了;另外还要注意这个类有个有参构造器,通过构建器可以接收需要传递进来的参数

SendMessageTest.java,测试类

package com.gdut.thread.multiThread;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;

public class SendMessageTest {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ICallableTaskFrameWork callableTaskFrameWork = new CallableTaskFrameWork();

        List<CallableTemplate<Map<String, String>>> tasks = new ArrayList<CallableTemplate<Map<String, String>>>();

        SendMessageHander sendMessageHander = null;

        // 将需要发送邮件的邮件地址及内容组装好,放在一个集合中
        for (int i = 0; i < 1000; i++) {
            sendMessageHander = new SendMessageHander("email" + i, "content" + i);
            tasks.add(sendMessageHander);
        }

        //通过多线程一次性发起邮件,并拿到返回结果集
        List<Map<String, String>> results = callableTaskFrameWork.submitsAll(tasks);

        // 解析返回结果集
        for (Map<String, String> map : results) {
            for (Entry<String, String> entry : map.entrySet()) {
                System.out.println(entry.getKey() + "\t" + entry.getValue());
            }
        }
    }

}

运行结果

附录:还可以看这边文章: java并发异步编程 原来十个接口的活现在只需要一个接口就搞定!

原文地址:https://www.cnblogs.com/myseries/p/11515370.html

时间: 2024-10-14 19:59:01

Callable+ThreadPoolExecutor实现多线程并发并获得返回值(转)的相关文章

【多线程】让线程返回值

很多时候,我们使用线程去处理一些业务,并希望得到结果,这时候,我们可以使用Callable. 下面例子,模拟使用线程查询DB得到一个List. 例子 线程,返回一个List数据 package com.nicchagil.study.thread.cnblogs.No02可返回值的线程; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import

利用多线程,执行有返回值的方法

解决的问题: 1. 当需要拿到多线程执行的方法的结果 2. 如何让主线程,等在所有的子线程直接结束 class Program { static void Main(string[] args) { IList<ManualResetEvent> listManual = new List<ManualResetEvent>(); List<ThreadReturnData> testList = new List<ThreadReturnData>();

java多线程并发概览

一.操作系统中线程和进程的概念 现在的操作系统是多任务操作系统.多线程是实现多任务的一种方式. 进程是指一个内存中运行的应用程序,每个进程都有自己独立的一块内存空间,一个进程中可以启动多个线程.比如在Windows系统中,一个运行的exe就是一个进程. 线程是指进程中的一个执行流程,一个进程中可以运行多个线程.比如java.exe进程中可以运行很多线程.线程总是属于某个进程,进程中的多个线程共享进程的内存. "同时"执行是人的感觉,在线程之间实际上轮换执行. 二.Java中的线程 在J

Java 多线程 并发编程 (转)

一.多线程 1.操作系统有两个容易混淆的概念,进程和线程. 进程:一个计算机程序的运行实例,包含了需要执行的指令:有自己的独立地址空间,包含程序内容和数据:不同进程的地址空间是互相隔离的:进程拥有各种资源和状态信息,包括打开的文件.子进程和信号处理. 线程:表示程序的执行流程,是CPU调度执行的基本单位:线程有自己的程序计数器.寄存器.堆栈和帧.同一进程中的线程共用相同的地址空间,同时共享进进程锁拥有的内存和其他资源. 2.Java标准库提供了进程和线程相关的API,进程主要包括表示进程的jav

多线程并发编程

前言 多线程并发编程是Java编程中重要的一块内容,也是面试重点覆盖区域,所以学好多线程并发编程对我们来说极其重要,下面跟我一起开启本次的学习之旅吧. 正文 线程与进程 1 线程:进程中负责程序执行的执行单元线程本身依靠程序进行运行线程是程序中的顺序控制流,只能使用分配给程序的资源和环境 2 进程:执行中的程序一个进程至少包含一个线程 3 单线程:程序中只存在一个线程,实际上主方法就是一个主线程 4 多线程:在一个程序中运行多个任务目的是更好地使用CPU资源 线程的实现 继承Thread类 在j

Java 多线程 并发编程

转自: http://blog.csdn.net/escaflone/article/details/10418651 一.多线程 1.操作系统有两个容易混淆的概念,进程和线程. 进程:一个计算机程序的运行实例,包含了需要执行的指令:有自己的独立地址空间,包含程序内容和数据:不同进程的地址空间是互相隔离的:进程拥有各种资源和状态信息,包括打开的文件.子进程和信号处理. 线程:表示程序的执行流程,是CPU调度执行的基本单位:线程有自己的程序计数器.寄存器.堆栈和帧.同一进程中的线程共用相同的地址空

Java多线程并发技术

Java多线程并发技术 参考文献: http://blog.csdn.net/aboy123/article/details/38307539 http://blog.csdn.net/ghsau/article/category/1707779 http://www.iteye.com/topic/366591 JAVA多线程实现方式主要有三种:继承Thread类.实现Runnable接口.使用ExecutorService.Callable.Future实现有返回结果的多线程.其中前两种方式

java多线程并发

一.多线程 1.操作系统有两个容易混淆的概念,进程和线程. 进程:一个计算机程序的运行实例,包含了需要执行的指令:有自己的独立地址空间,包含程序内容和数据:不同进程的地址空间是互相隔离的:进程拥有各种资源和状态信息,包括打开的文件.子进程和信号处理. 线程:表示程序的执行流程,是CPU调度执行的基本单位:线程有自己的程序计数器.寄存器.堆栈和帧.同一进程中的线程共用相同的地址空间,同时共享进进程锁拥有的内存和其他资源. 2.Java标准库提供了进程和线程相关的API,进程主要包括表示进程的jav

java多线程 并发编程

一.多线程 1.操作系统有两个容易混淆的概念,进程和线程. 进程:一个计算机程序的运行实例,包含了需要执行的指令:有自己的独立地址空间,包含程序内容和数据:不同进程的地址空间是互相隔离的:进程拥有各种资源和状态信息,包括打开的文件.子进程和信号处理. 线程:表示程序的执行流程,是CPU调度执行的基本单位:线程有自己的程序计数器.寄存器.堆栈和帧.同一进程中的线程共用相同的地址空间,同时共享进进程锁拥有的内存和其他资源. 2.Java标准库提供了进程和线程相关的API,进程主要包括表示进程的jav