异步线程池

1.创建类实现AsyncTaskExecutor, InitializingBean, DisposableBean接口,重写方法。

import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.DisposableBean;import org.springframework.beans.factory.InitializingBean;import org.springframework.core.task.AsyncTaskExecutor;

import java.util.concurrent.Callable;import java.util.concurrent.Future;

/** * The class Exception handling async task executor. * */@Slf4jpublic class ExceptionHandlingAsyncTaskExecutor implements AsyncTaskExecutor, InitializingBean, DisposableBean {

private final AsyncTaskExecutor executor;

/**    * Instantiates a new Exception handling async task executor.    *    * @param executor the executor    */   ExceptionHandlingAsyncTaskExecutor(AsyncTaskExecutor executor) {      this.executor = executor;   }

/**    * Execute.    *    * @param task the task    */   @Override   public void execute(Runnable task) {      executor.execute(createWrappedRunnable(task));   }

/**    * Execute.    *    * @param task         the task    * @param startTimeout the start timeout    */   @Override   public void execute(Runnable task, long startTimeout) {      executor.execute(createWrappedRunnable(task), startTimeout);   }

private <T> Callable<T> createCallable(final Callable<T> task) {      return () -> {         try {            return task.call();         } catch (Exception e) {            handle(e);            throw e;         }      };   }

private Runnable createWrappedRunnable(final Runnable task) {      return () -> {         try {            task.run();         } catch (Exception e) {            handle(e);         }      };   }

/**    * Handle.    *    * @param e the e    */   private void handle(Exception e) {      log.error("Caught async exception", e);   }

/**    * Submit future.    *    * @param task the task    *    * @return the future    */   @Override   public Future<?> submit(Runnable task) {      return executor.submit(createWrappedRunnable(task));   }

/**    * Submit future.    *    * @param <T>  the type parameter    * @param task the task    *    * @return the future    */   @Override   public <T> Future<T> submit(Callable<T> task) {      return executor.submit(createCallable(task));   }

/**    * Destroy.    *    * @throws Exception the exception    */   @Override   public void destroy() throws Exception {      if (executor instanceof DisposableBean) {         DisposableBean bean = (DisposableBean) executor;         bean.destroy();      }   }

/**    * After properties set.    *    * @throws Exception the exception    */   @Override   public void afterPropertiesSet() throws Exception {      if (executor instanceof InitializingBean) {         InitializingBean bean = (InitializingBean) executor;         bean.afterPropertiesSet();      }   }}2.创建AsyncTaskExecutorConfiguration类,实现AsyncConfigurer接口,重写方法
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;import org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.annotation.AsyncConfigurer;import org.springframework.scheduling.annotation.EnableAsync;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration@EnableAsync@EnableSchedulingpublic class AsyncTaskExecutorConfiguration implements AsyncConfigurer {

@Bean(name = "taskExecutor")    public Executor getAsyncExecutor() {        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        executor.setCorePoolSize(5);        executor.setMaxPoolSize(20);        executor.setQueueCapacity(1000);        executor.setKeepAliveSeconds(60000);        executor.setThreadNamePrefix("");        return new ExceptionHandlingAsyncTaskExecutor(executor);    }

@Override    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {        return new SimpleAsyncUncaughtExceptionHandler();    }}3.使用。
@Resourceprivate TaskExecutor taskExecutor;
taskExecutor.execute(new Runnable() {     @Override     public void run() {         try {             sout("异步使用线程池");         } catch (IOException e) {             e.printStackTrace();         }     } });

原文地址:https://www.cnblogs.com/chunming-an/p/10255721.html

时间: 2024-11-10 11:10:49

异步线程池的相关文章

使用C++11 开发一个半同步半异步线程池

摘自:<深入应用C++11>第九章 实际中,主要有两种方法处理大量的并发任务,一种是一个请求由系统产生一个相应的处理请求的线程(一对一) 另外一种是系统预先生成一些用于处理请求的进程,当请求的任务来临时,先放入同步队列中,分配一个处理请求的进程去处理任务, 线程处理完任务后还可以重用,不会销毁,而是等待下次任务的到来.(一对多的线程池技术) 线程池技术,能避免大量线程的创建和销毁动作,节省资源,对于多核处理器,由于线程被分派配到多个cpu,会提高并行处理的效率. 线程池技术分为半同步半异步线程

c++11 实现半同步半异步线程池

感受: 随着深入学习,现代c++给我带来越来越多的惊喜- c++真的变强大了. 半同步半异步线程池: 其实很好理解,分为三层 同步层:通过IO复用或者其他多线程多进程等不断的将待处理事件添加到队列中,这个过程是同步进行的. 队列层:所有待处理事件都会放到这里.上一层事件放到这里,下一层从这里获取事件 异步层:事先创建好线程,让瞎猜呢和嗯不断的去处理队列层的任务,上层不关心这些,它只负责把任务放到队列里,所以对上层来说这里是异步的. 看张图: 如果你不熟悉c++11的内容 以下文章仅供参考 c++

异步线程池的实现(一)-------具体实现方法

本篇是这个内容的第一篇,主要是写:遇到的问题,和自己摸索实现的方法.后面还会有一篇是总结性地写线程池的相关内容(偏理论的). 一.背景介绍 朋友的项目开发到一定程度之后,又遇到了一些问题:在某些流程中的一些节点,由于是串联执行的.上一步要等下一步执行完毕:或者提交数据之后要等待后台其他系统处理完成之后,才能返回结果.这样就会导致,请求发起方不得不一直等待结果,用户体验很不好:从项目优化来说,模块与模块之间构成了强耦合,这也是不利于以后扩展的,更不用说访问量上来之后,肯定会抓瞎的问题.所以,我就着

异步线程池的使用

合理使用异步线程开发项目能提高一个项目的并发量,减少响应时间.下面就简单介绍一下异步线程池的使用,参考博客:https://blog.csdn.net/hry2015/article/details/67640534 spring 对@Async定义异步任务的方法有3种: 1.最简单的异步调用,返回值为void: 2.带参数的异步调用,异步方法可以传入参数: 3.异常调用返回Future 代码如下: package com.hry.spring.async.annotation; import

(原创)C++半同步半异步线程池2

(原创)C++半同步半异步线程池 c++11 boost技术交流群:296561497,欢迎大家来交流技术. 线程池可以高效的处理任务,线程池中开启多个线程,等待同步队列中的任务到来,任务到来多个线程会抢着执行任务,当到来的任务太多,达到上限时需要等待片刻,任务上限保证内存不会溢出.线程池的效率和cpu核数相关,多核的话效率更高,线程数一般取cpu数量+2比较合适,否则线程过多,线程切换频繁反而会导致效率降低. 线程池有两个活动过程:1.外面不停的往线程池添加任务:2.线程池内部不停的取任务执行

半同步半异步线程池的实现(C++11)

简介 处理大量并发任务时,一个请求对应一个线程来处理任务,线程的创建和销毁将消耗过多的系统资源,并增加上下文切换代价.线程池技术通过在系统中预先创建一定数量的线程(通常和cpu核数相同),当任务到达时,从线程池中分配一个线程进行处理,线程在处理完任务之后不用销毁,等待重用. 线程池包括半同步半异步和领导者追随者两种实现方式.线程池包括三部分,第一层是同步服务层,它处理来自上层的任务请求.第二层是同步队列层,同步服务层中的任务将添加到队列中.第三层是异步服务层,多个线程同时处理队列中的任务. 先贴

Springboot的异步线程池图鉴

1:定义线程池 @EnableAsync @Configuration class TaskPoolConfig { @Bean("taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor

[email&#160;protected]异步线程池的配置及应用

示例: 1. 配置 @EnableAsync @Configuration public class TaskExecutorConfiguration { @Autowired private TaskExecutorProperties taskExecutorProperties; @Bean public Executor routeGen() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); execut

spring线程池ThreadPoolTaskExecutor与阻塞队列BlockingQueue

一: ThreadPoolTaskExecutor是一个spring的线程池技术,查看代码可以看到这样一个字段: private ThreadPoolExecutor threadPoolExecutor; 可以发现,spring的  ThreadPoolTaskExecutor是使用的jdk中的java.util.concurrent.ThreadPoolExecutor进行实现, 直接看代码: @Override protected ExecutorService initializeExe