使用ThreadPoolExecutor 创建线程池,完成并行操作

日常工作中很多地方很多效率极低的操作,往往可以改串行为并行,执行效率往往提高数倍,废话不多说先上代码

1、用到的guava坐标

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>18.0</version>
        </dependency>

2、创建一个枚举保证线程池是单例

package com.hao.service;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

public enum ExecutorManager {

    INSTANCE;

    private ExecutorManager() {

    }

    private static int AVAILABLEPROCESSORS = Runtime.getRuntime().availableProcessors();

    public static final ThreadPoolExecutor threadPoolExecutor =
        new ThreadPoolExecutor(AVAILABLEPROCESSORS * 50, AVAILABLEPROCESSORS * 80, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(AVAILABLEPROCESSORS * 2000),
            new ThreadFactoryBuilder().setNameFormat("ExecutorManager-pool-Thread-%d").build());

}

3、创建一个方法类

package com.hao.service;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;

import org.springframework.stereotype.Service;

import com.google.common.base.Preconditions;

@Service
public class ExecutorContext {

    public ExecutorService executorService;
    private int DEFAULT_WAIT_SECONDS = 2;

    @PostConstruct
    public void init() {
        executorService = ExecutorManager.threadPoolExecutor;
    }

    public <T> List<T> waitAllFutures(List<Callable<T>> calls, int milliseconds) throws Exception {
        Preconditions.checkArgument(null != calls && !calls.isEmpty(), "callable empty.");
        LatchedCallables<T> latchAndCallables = wrapCallables(calls);
        List<Future<T>> futurres = new LinkedList<>();
        for (CountdownedCallable<T> callable : latchAndCallables.wrappedCallables) {
            if (null != callable) {
                futurres.add(executorService.submit(callable));
            }
        }
        List<T> rets = new ArrayList<>();
        if (latchAndCallables.latch.await(milliseconds, TimeUnit.MILLISECONDS)) {
            for (CountdownedCallable<T> call : latchAndCallables.wrappedCallables) {
                rets.add(call.getResult());
            }
        } else {
            for (Future<T> future : futurres) {
                if (!future.isDone()) {
                    future.cancel(true);
                }
            }
        }
        return rets;
    }

    public <T> List<T> waitAllCallables(List<Callable<T>> calls, int seconds) throws Exception {
        Preconditions.checkArgument(null != calls && !calls.isEmpty(), "callable empty.");
        LatchedCallables<T> latchAndCallables = wrapCallables(calls);
        for (CountdownedCallable<T> callable : latchAndCallables.wrappedCallables) {
            executorService.submit(callable);
        }
        List<T> rets = new ArrayList<>();
        if (latchAndCallables.latch.await(seconds, TimeUnit.SECONDS)) {
            for (CountdownedCallable<T> call : latchAndCallables.wrappedCallables) {
                rets.add(call.getResult());
            }
        }
        return rets;
    }

    public <T> List<T> waitAllCallables(@SuppressWarnings("unchecked") Callable<T>... calls) throws Exception {
        Preconditions.checkNotNull(calls, "callable empty.");
        return waitAllCallables(Arrays.asList(calls), DEFAULT_WAIT_SECONDS);
    }

    private static <T> LatchedCallables<T> wrapCallables(List<Callable<T>> callables) {
        CountDownLatch latch = new CountDownLatch(callables.size());
        List<CountdownedCallable<T>> wrapped = new ArrayList<>(callables.size());
        for (Callable<T> callable : callables) {
            wrapped.add(new CountdownedCallable<>(callable, latch));
        }

        LatchedCallables<T> returnVal = new LatchedCallables<>();
        returnVal.latch = latch;
        returnVal.wrappedCallables = wrapped;
        return returnVal;
    }

    public static class LatchedCallables<T> {
        public CountDownLatch latch;
        public List<CountdownedCallable<T>> wrappedCallables;
    }

    public static class CountdownedCallable<T> implements Callable<T> {
        private final Callable<T> wrapped;
        private final CountDownLatch latch;
        private T result;

        public CountdownedCallable(Callable<T> wrapped, CountDownLatch latch) {
            this.wrapped = wrapped;
            this.latch = latch;
        }

        @Override
        public T call() throws Exception {
            try {
                result = wrapped.call();
                return result;
            } finally {
                latch.countDown();
            }
        }

        public T getResult() {
            return result;
        }
    }

}

4、创建一个测试类

package com.hao;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;

import com.hao.bean.Employee;
import com.hao.service.EmployeeService;
import com.hao.service.ExecutorContext;

public class ExecutorTest extends BaseTest {

    @Autowired
    ExecutorContext executorContext;

    @Autowired
    EmployeeService employeeService;

    @Test
    public void test01() {
        long t0 = System.currentTimeMillis();
        List<Employee> employees = new ArrayList<Employee>();
        try {
            List<Callable<Integer>> calls = new ArrayList<Callable<Integer>>();
            Callable<Integer> able1 = new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep(5000);
                    Employee employee = employeeService.getById(1L);
                    employees.add(employee);
                    return 1;
                }

            };
            calls.add(able1);
            Callable<Integer> able2 = new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep(5000);
                    Employee employee = employeeService.getById(2L);
                    employees.add(employee);
                    return 2;
                }

            };
            calls.add(able2);
            Callable<Integer> able3 = new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep(5000);
                    Employee employee = employeeService.getById(3L);
                    employees.add(employee);
                    return 3;
                }

            };
            calls.add(able3);

            executorContext.waitAllCallables(calls, 5000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        for (Employee employee : employees) {
            System.out.println(employee);
        }
        System.out.println(System.currentTimeMillis() - t0);
    }

}

5、执行结果如下

次工具类的好处在于能够像使用普通 service一样使用线程池完成并行操作,当然不要忘记将 ExecutorContext 置于能被sping扫描到的地方,

否则不能直接使用@Autowired 依赖注入

原文地址:https://www.cnblogs.com/zhanh247/p/12576491.html

时间: 2024-10-15 20:30:35

使用ThreadPoolExecutor 创建线程池,完成并行操作的相关文章

ThreadPoolExecutor自定义线程池

1.ThreadPoolExecutor创建线程池的构造函数 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize &

【搞定面试官】你还在用Executors来创建线程池?会有什么问题呢?

前言 上文我们介绍了JDK中的线程池框架Executor.我们知道,只要需要创建线程的情况下,即使是在单线程模式下,我们也要尽量使用Executor.即: ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1); //此处不该利用Executors工具类来初始化线程池 但是,在<阿里巴巴Java开发手册>中有一条 [强制]线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方

高并发之——从源码角度分析创建线程池究竟有哪些方式

前言 在Java的高并发领域,线程池一直是一个绕不开的话题.有些童鞋一直在使用线程池,但是,对于如何创建线程池仅仅停留在使用Executors工具类的方式,那么,创建线程池究竟存在哪几种方式呢?就让我们一起从创建线程池的源码来深入分析究竟有哪些方式可以创建线程池. 使用Executors工具类创建线程池 在创建线程池时,初学者用的最多的就是Executors 这个工具类,而使用这个工具类创建线程池时非常简单的,不需要关注太多的线程池细节,只需要传入必要的参数即可.Executors 工具类提供了

自定义线程池,如何最佳创建线程池

java有预置线程池:newSingleThreadExecutor,newFixedThreadPool,newCacheedThreadPool,newScheduledThreadPool,newWorkStealingPool.如果不适合,还可以使用ThreadPoolExecutor创建自定义线程池.主要构造方法: 1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveT

使用Callable接口创建线程池

步骤: 创建线程池对象创建 Callable 接口子类对象提交 Callable 接口子类对象关闭线程池实例: class TaskCallable implements Callable<Integer> { @Override public Integer call() throws Exception { for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName()+"

线程池ThreadPoolExecutor分析: 线程池是什么时候创建线程的,队列中的任务是什么时候取出来的?

带着几个问题进入源码分析: 线程池是什么时候创建线程的? 任务runnable task是先放到core到maxThread之间的线程,还是先放到队列? 队列中的任务是什么时候取出来的? 什么时候会触发reject策略? core到maxThread之间的线程什么时候会die? task抛出异常,线程池中这个work thread还能运行其他任务吗? 至少在new ThreadPoolExecutor()时,Thread对象并没有初始化. 这里仅仅指定了几个初始参数 一段基础代码,进入分析 pu

Android线程管理之ThreadPoolExecutor自定义线程池(三)

前言: 上篇主要介绍了使用线程池的好处以及ExecutorService接口,然后学习了通过Executors工厂类生成满足不同需求的简单线程池,但是有时候我们需要相对复杂的线程池的时候就需要我们自己来自定义一个线程池,今天来学习一下ThreadPoolExecutor,然后结合使用场景定义一个按照线程优先级来执行的任务的线程池. ThreadPoolExecutor ThreadPoolExecutor线程池用于管理线程任务队列.若干个线程. 1.)ThreadPoolExecutor构造函数

ThreadPoolExecutor(线程池)源码分析

1. 常量和变量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 高3位为线程池的运行状态,低29位为当前线程总数 private static final int COUNT_BITS = Integer.SIZE - 3; // 32 -3 = 29 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 线程池容量:2^

创建线程池

#!/usr/bin/env python # -*- coding: utf-8 -*- import threading import queue import time class ThreadPool: def __init__(self, maxsize=5): self.maxsize = maxsize self._q = queue.Queue(self.maxsize) # 创建队列,队列最大容量为5 for i in range(self.maxsize): self._q.