[email protected]异步线程池的配置及应用

示例:

1、 配置

@EnableAsync
@Configuration
public class TaskExecutorConfiguration {

    @Autowired
    private TaskExecutorProperties taskExecutorProperties;

    @Bean
    public Executor routeGen() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(taskExecutorProperties.getCorePoolSize());     //50
        executor.setQueueCapacity(taskExecutorProperties.getQueueCapacity());   //200
        executor.setMaxPoolSize(taskExecutorProperties.getMaxPoolSize());       //500
        executor.setKeepAliveSeconds(taskExecutorProperties.getKeepAliveSeconds());
        executor.setThreadNamePrefix("gen-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        executor.initialize();
        return executor;
    }}

2、 运用(作用于方法上)

@Async("routeGen") public Future<Result<String>> genRouteByCategory(RouteGenDTO routeGenDTO, List<String> cityList, String category){}

3、 异常处理及日志记录

   public Result<String> genRouteByShard(RouteGenDTO routeGenDTO) throws RouteGenException {

        // 根据分片获取城市
        Result<List<String>> cityResult = cityService.queryAllCity();
        if (cityResult == null || !cityResult.isSuccess()) {
            String errorMsg = cityResult == null ? "查询城市没有返回结果" : cityResult.getMsg();
            // 记录查询城市日志
            logQueryCityByShard(RouteJobTypeEnum.ROUTE_GEN, routeGenDTO, null, errorMsg);
            return Result.ofFail(ErrorEnum.QUERY_ERROR.getCode(), errorMsg);
        }

        List<String> cityList = cityResult.getData();
        if (CollectionUtils.isEmpty(cityList)) {
            String errorMsg = "没有查询到城市,无需生成路线";
            // 记录查询城市日志
            logQueryCityByShard(RouteJobTypeEnum.ROUTE_GEN, routeGenDTO, null, errorMsg);
            return Result.ofSuccessMsg(errorMsg);
        }

        // 记录查询城市日志
        logQueryCityByShard(RouteJobTypeEnum.ROUTE_GEN, routeGenDTO, JSON.toJSONString(cityList), null);

        Map<String, Future<Result<String>>> categoryFutureMap = new ConcurrentHashMap<>();

        // 禁用的类别列表
        List<String> disableCategoryList = null;
        String disableCategory = genProperties.getDisableCategory();
        if (StringUtils.isNotBlank(disableCategory)) {
            String[] disableCategoryArr = disableCategory.split(",");
            disableCategoryList = Stream.of(disableCategoryArr).collect(Collectors.toList());
        }

        // 根据类别生成路线
        // 每个类别一个线程
        for (RouteCategoryEnum routeCategory : RouteCategoryEnum.values()) {
            // 跳过禁用的类别
            if (CollectionUtils.isNotEmpty(disableCategoryList) && disableCategoryList.contains(routeCategory.getType())) {
                // 记录生成日志
                logGenRouteByCategory(RouteJobTypeEnum.ROUTE_GEN, routeGenDTO, routeCategory.getType(), routeCategory.getType() + "类别被禁用,无需生成");
                continue;
            }

            // 根据城市和类别生成路线
            Future<Result<String>> categoryFuture = routeGenService.genRouteByCategory(routeGenDTO, cityList, routeCategory.getType());
            categoryFutureMap.put(routeCategory.getType(), categoryFuture);
        }

        String lastErrorMsg = null;

        for (Map.Entry<String, Future<Result<String>>> entry : categoryFutureMap.entrySet()) {

            String errorMsg = null;

            String category = entry.getKey();
            Future<Result<String>> futureResult = entry.getValue();
            if (futureResult != null) {
                try {
                    if (futureResult.isCancelled()) {
                        errorMsg = "线程被取消";
                        log.error("分片项{} 分片总数{} 生成 {} 路线 任务开始时间:{} 处理失败,错误描述:{}",
                                routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), errorMsg);
                        continue;
                    }
                    Result<String> result = futureResult.get(CommonConfConstants.FUTURE_CATEGORY_WAIT_TIME, TimeUnit.SECONDS);

                    if (result == null || !result.isSuccess()) {
                        errorMsg = result == null ? category + "没有返回处理结果" : result.getMsg();
                        log.error("分片项{} 分片总数{} 生成 {} 路线 任务开始时间:{} 处理失败,错误描述:{}",
                                routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), errorMsg);
                    } else {
                        log.info("分片项{} 分片总数{} 生成 {} 路线 任务开始时间:{} 生成成功",
                                routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), result);
                    }
                } catch (TimeoutException e) {
                    errorMsg = category + "生成路线 线程处理超时:" + e.getMessage();
                    log.error("分片项{} 分片总数{} 生成 {} 路线 任务开始时间:{} 线程处理超时",
                            routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), e);
                } catch (InterruptedException e) {
                    errorMsg = category + "生成路线 线程中断异常:" + e.getMessage();
                    log.error("分片项{} 分片总数{} 生成 {} 路线 任务开始时间:{} 线程中断异常",
                            routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), e);
                } catch (ExecutionException e) {
                    errorMsg = category + "生成路线 线程执行异常:" + e.getMessage();
                    log.error("分片项{} 分片总数{} 生成 {} 路线 任务开始时间:{} 线程执行异常",
                            routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), e);
                }

            } else {
                errorMsg = category + "生成路线 线程返回结果为null";
                log.error("分片项{} 分片总数{} 生成 {} 路线 任务开始时间:{} 处理失败,错误描述:{}",
                        routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), errorMsg);
            }

            // 记录生成日志
            logGenRouteByCategory(RouteJobTypeEnum.ROUTE_GEN, routeGenDTO, category, errorMsg);

            if (StringUtils.isNotBlank(errorMsg)) {
                lastErrorMsg = errorMsg;
            }
        }

        if (StringUtils.isNotBlank(lastErrorMsg)) {
            return Result.ofFail(ErrorEnum.GEN_ROUTE_FAILURE.getCode(), lastErrorMsg);
        }

        return Result.ofSuccess("路线生成成功");
    }
private void logQueryCityByShard(RouteJobTypeEnum routeJobTypeEnum, RouteGenDTO routeGenDTO, String cityJson, String errorMsg) {    String logFormat;    String logMsg;    if (StringUtils.isBlank(errorMsg)) {        logFormat = "查询城市成功,城市信息:%s";        logMsg = String.format(logFormat, cityJson);    } else {        logMsg = errorMsg;    }

    String operationObject = routeJobTypeEnum.getType() + "_job_"            + routeGenDTO.getShardItem() + "_"            + TimeUtil.dateTimeToStr(routeGenDTO.getStartTime(), DateFormatConstants.DATE_TIME_COMPACT_FORMAT);

    LogInfoDTO logInfoDTO = new LogInfoDTO();    logInfoDTO.setBusinessName(routeJobTypeEnum.getDesc());    logInfoDTO.setOperationName("根据分片查询城市");    logInfoDTO.setOperationObject(operationObject);    logInfoDTO.setOperationDesc(logMsg);    LogUtil.log(logInfoDTO);}
 

原文地址:https://www.cnblogs.com/gendway/p/10695949.html

时间: 2024-10-14 17:09:42

[email protected]异步线程池的配置及应用的相关文章

记录一次线程池的在项目中的实际应用,讲解一下线程池的配置和参数理解。

前言:最近项目中与融360项目中接口对接,有反馈接口(也就是我们接收到请求,需要立即响应,并且还要有一个接口推送给他们其他计算结果),推送过程耗时.或者说两个接口不能是同时返回,有先后顺序. 这时我想到了把自己Controller立即返回接受成功,中间添加一个新的线程去做其他耗时的操作(线程池配置和参数测试讲解请阅读第5步). 1.Controller代码如下: @Autowiredprivate CallThreadDemo worker; @RequestMapping("/bandBank

异步线程池的实现(一)-------具体实现方法

本篇是这个内容的第一篇,主要是写:遇到的问题,和自己摸索实现的方法.后面还会有一篇是总结性地写线程池的相关内容(偏理论的). 一.背景介绍 朋友的项目开发到一定程度之后,又遇到了一些问题:在某些流程中的一些节点,由于是串联执行的.上一步要等下一步执行完毕:或者提交数据之后要等待后台其他系统处理完成之后,才能返回结果.这样就会导致,请求发起方不得不一直等待结果,用户体验很不好:从项目优化来说,模块与模块之间构成了强耦合,这也是不利于以后扩展的,更不用说访问量上来之后,肯定会抓瞎的问题.所以,我就着

异步线程池的使用

合理使用异步线程开发项目能提高一个项目的并发量,减少响应时间.下面就简单介绍一下异步线程池的使用,参考博客:https://blog.csdn.net/hry2015/article/details/67640534 spring 对@Async定义异步任务的方法有3种: 1.最简单的异步调用,返回值为void: 2.带参数的异步调用,异步方法可以传入参数: 3.异常调用返回Future 代码如下: package com.hry.spring.async.annotation; import

使用C++11 开发一个半同步半异步线程池

摘自:<深入应用C++11>第九章 实际中,主要有两种方法处理大量的并发任务,一种是一个请求由系统产生一个相应的处理请求的线程(一对一) 另外一种是系统预先生成一些用于处理请求的进程,当请求的任务来临时,先放入同步队列中,分配一个处理请求的进程去处理任务, 线程处理完任务后还可以重用,不会销毁,而是等待下次任务的到来.(一对多的线程池技术) 线程池技术,能避免大量线程的创建和销毁动作,节省资源,对于多核处理器,由于线程被分派配到多个cpu,会提高并行处理的效率. 线程池技术分为半同步半异步线程

c++11 实现半同步半异步线程池

感受: 随着深入学习,现代c++给我带来越来越多的惊喜- c++真的变强大了. 半同步半异步线程池: 其实很好理解,分为三层 同步层:通过IO复用或者其他多线程多进程等不断的将待处理事件添加到队列中,这个过程是同步进行的. 队列层:所有待处理事件都会放到这里.上一层事件放到这里,下一层从这里获取事件 异步层:事先创建好线程,让瞎猜呢和嗯不断的去处理队列层的任务,上层不关心这些,它只负责把任务放到队列里,所以对上层来说这里是异步的. 看张图: 如果你不熟悉c++11的内容 以下文章仅供参考 c++

(原创)C++半同步半异步线程池2

(原创)C++半同步半异步线程池 c++11 boost技术交流群:296561497,欢迎大家来交流技术. 线程池可以高效的处理任务,线程池中开启多个线程,等待同步队列中的任务到来,任务到来多个线程会抢着执行任务,当到来的任务太多,达到上限时需要等待片刻,任务上限保证内存不会溢出.线程池的效率和cpu核数相关,多核的话效率更高,线程数一般取cpu数量+2比较合适,否则线程过多,线程切换频繁反而会导致效率降低. 线程池有两个活动过程:1.外面不停的往线程池添加任务:2.线程池内部不停的取任务执行

SpringMVC整合TaskExecutor线程池的配置/使用

一.配置jdbc.properties添加: #------------ Task ------------ task.core_pool_size=5 task.max_pool_size=50 task.queue_capacity=1000 task.keep_alive_seconds=60 二.配置Spring的applicationContext.xml添加: <bean id="taskExecutor" class="org.springframewor

半同步半异步线程池的实现(C++11)

简介 处理大量并发任务时,一个请求对应一个线程来处理任务,线程的创建和销毁将消耗过多的系统资源,并增加上下文切换代价.线程池技术通过在系统中预先创建一定数量的线程(通常和cpu核数相同),当任务到达时,从线程池中分配一个线程进行处理,线程在处理完任务之后不用销毁,等待重用. 线程池包括半同步半异步和领导者追随者两种实现方式.线程池包括三部分,第一层是同步服务层,它处理来自上层的任务请求.第二层是同步队列层,同步服务层中的任务将添加到队列中.第三层是异步服务层,多个线程同时处理队列中的任务. 先贴

异步线程池

1.创建类实现AsyncTaskExecutor, InitializingBean, DisposableBean接口,重写方法. import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.DisposableBean;import org.springframework.beans.factory.InitializingBean;import org.springframework.core.task