spring @Async异步方法使用及原理说明

异步类:

package com.example.spring.async;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import com.example.spring.MyLog;
/**
 * 将一个类声明为异步类,那么这个类对外暴露的方法全部成为异步方法。
 * 与异步方法的区别是这里的注解是加到类上,异步方法的注解是加到方法上。仅此而已
 * @DESC
 * @author guchuang
 *
 */
@Async
@Service
public class AsyncClass {
    public AsyncClass() {
        MyLog.info("-------------------------init AsyncClass--------------------");
    }
    volatile int index = 0;
    public void foo() {
        MyLog.info("asyncclass foo, index:" + index);
    }
    public void foo(int i) {
        this.index = i;
        MyLog.info("asyncclass foo, index:" + i);
    }
    public void bar(int i) {
        this.index = i;
        MyLog.info("asyncclass bar, index:" + i);
    }
}

异步方法:

package com.example.spring.async;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import org.springframework.web.context.WebApplicationContext;

import com.example.spring.MyLog;
/**
 *异步方法示例,关键点有三步:
 *  1.启动类增加注解 @EnableAsync
 *  2.当前类声明为服务 @Service
 *  3.方法上面添加注解 @Async
 *限制:
 *   默认类内的方法调用不会被aop拦截,也就是说同一个类内的方法调用,@Async不生效
 *解决办法:
 *  如果要使同一个类中的方法之间调用也被拦截,需要使用spring容器中的实例对象,而不是使用默认的this,因为通过bean实例的调用才会被spring的aop拦截
 *  本例使用方法: AsyncMethod asyncMethod = context.getBean(AsyncMethod.class);    然后使用这个引用调用本地的方法即可达到被拦截的目的
 *备注:
 *  这种方法只能拦截protected,default,public方法,private方法无法拦截。这个是spring aop的一个机制。
 *
 * 默认情况下异步方法的调用使用的是SimpleAsyncTaskExecutor来执行异步方法调用,实际是每个方法都会起一个新的线程。
 * 大致运行过程:(以asyncMethod.bar1();为例)
 *  1.调用bar1()方法被aop拦截
 *  2.使用cglib获取要执行的方法和入参、当前实例(后续用于反射调用方法)。这些是运行一个方法的必要条件,可以封装成独立的方法来运行
 *  3.启动新的线程,调用上面封装的实际要调用的方法
 *  4.返回方法调用的结果
 *  前提是启动的时候被spring提前处理,将方法进行封装,加载流程:
 *    AsyncAnnotationBeanPostProcessor ->
 * 如果要修改@Async异步方法底层调用:
 *  可以实现AsyncConfigurer接口,或者提供TaskExecutor实例(然后在@Async中指定这个实例),详见本例代码
 *
 * 异步方法返回类型只能有两种:void和java.util.concurrent.Future
 *  当返回类型为void的时候,方法调用过程产生的异常不会抛到调用者层面,可以通过注册AsyncUncaughtExceptionHandler来捕获此类异常
 *  当返回类型为Future的时候,方法调用过程差生的异常会抛到调用者层面
 *
 * @DESC
 * @author guchuang
 *
 */
@Service
public class AsyncMethod {
    //@Autowired
    AsyncMethod asyncMethod;

    @Autowired
    WebApplicationContext context;

    /*@PostConstruct
    public void init() {
        this.asyncMethod = context.getBean(AsyncMethod.class);
    }*/
    @Async
    public void bar() {
        MyLog.info("sleep bar");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    @Async
    private void bar1() {
        MyLog.info("private bar");
    }
    @Async
    public void bar2() {
        MyLog.info("public bar");
    }
    @Async
    protected void bar3() {
        MyLog.info("protected bar");
    }
    @Async
    void bar4() {
        MyLog.info("default bar");
    }

    @Async
    public void foo1() {
        MyLog.info("foo1");
        this.bar1();
        this.bar2();
        asyncMethod = context.getBean(AsyncMethod.class);
        asyncMethod.bar();      //异步
        asyncMethod.bar1();     //同步
        asyncMethod.bar2();     //异步
        asyncMethod.bar3();     //异步
        asyncMethod.bar4();     //异步
    }

    /**
     * 指定这个异步方法使用的底层执行器TaskExecutor
     * @param index
     */
    @Async("async1")
    public void foo2(int index) {
        MyLog.info("foo2 with index:" + index);
       }

    @Async
    public void foo3(int index, String threadName) {
        Thread.currentThread().setName(threadName);
        MyLog.info("foo3 with index:" + index);
    }

    @Async
    public void fooE() {
        throw new RuntimeException("无返回值异步方法抛出异常");
    }
    @Async
    public Future<String> futureE() {
        throw new RuntimeException("有返回值异步方法抛出异常");
    }

    /**
     * 带返回值的异步调用
     * @return
     */
    @Async
    public Future<String> futureTask1() {
        MyLog.info("start run future task1");
        MyLog.sleep(1000);
        return new AsyncResult<String>("future task1");
    }
    @Async
    public CompletableFuture<String> futureTask2 () {
        MyLog.info("Running task  thread: " + Thread.currentThread().getName());

        CompletableFuture<String> future = new CompletableFuture<String>() {
            @Override
            public String get () throws InterruptedException, ExecutionException {
                return " task result";
            }
        };
        return future;
    }
    /**
     * 指定使用的TaskExecutor,这个bean在config中已经配置
     * @param index
     * @param time
     */
    @Async("async2")
    public void asyncSleep(int index, int time) {
        try {
            Thread.sleep(time * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        MyLog.info("task:" + index + " end");
    }

    @Async("async3")
    public void asyncSleep3(int index, int time) {
        try {
            Thread.sleep(time * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        MyLog.info("task:" + index + " end");
    }
}

配置类:

package com.example.spring.async.config;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;

import com.example.spring.MyLog;
import com.example.spring.MyThreadFactory;
import com.example.spring.async.RejectedPolicy;
/**
 * @Async异步方法线程池配置,默认不使用线程池,使用SimpleAsyncTaskExecutor(一个线程执行器,每个任务都会新建线程去执行)
 * 这里实现了接口AsyncConfigurer,并覆写了其内的方法,这样@Async默认的运行机制发生变化(使用了线程池,设置了线程运行过程的异常处理函数)
 * 备注:
 *   这里只是展示写法,要达到这个目的,可以不实现这个接口,具体见下面的方法
 * @DESC
 * @author guchuang
 *
 */
@Configuration@EnableAsync
public class AsyncConfig implements AsyncConfigurer {

    private static ExecutorService threadPool = new ThreadPoolExecutor(5, 5,
            60L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(3), new MyThreadFactory("common1"));

    private static ExecutorService threadPoolWithRejectDeal = new ThreadPoolExecutor(5, 5,
            60L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(3), new MyThreadFactory("common2"), new RejectedPolicy());

    /**
     * 这个实例声明的TaskExecutor会成为@Async方法运行的默认线程执行器
     * @Bean 使这个实例完全被spring接管
     */
    @Bean
    @Override
    public TaskExecutor getAsyncExecutor() {
        return new ConcurrentTaskExecutor(Executors.newFixedThreadPool(5,new MyThreadFactory("async")));
    }
    /**
     * 定义@Async方法默认的异常处理机制(只对void型异步返回方法有效,Future返回值类型的异常会抛给调用者)
     */
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (e, method, objects) -> MyLog.error("Method:" + method + ", exception:"+e.getMessage());
    }
    /**
     * 如果不覆写AsyncConfigurer的话,这个方法暴露bean会被当做@Async的默认线程池。
     * 注意必须是这个方法名(也就是bean name, 或者显示指定bean name @Qualifier("taskExecutor")),返回类型可以是Executor或者TaskExecutor
     * 如果没有配置的Executor,则默认使用SimpleAsyncTaskExecutor
     * 备注: 这种方式声明的bean,方法名就是bean name
     * @return
     */
    @Bean
    public Executor taskExecutor() {
        return new ConcurrentTaskExecutor(Executors.newFixedThreadPool(5,new MyThreadFactory("async0")));
    }
    /**
     * 定义其它的TaskExecutor,声明@Async方法的时候可以指定TaskExecutor,达到切换底层的目的
     * @return
     */
    @Bean
    public TaskExecutor async1() {
        return new ConcurrentTaskExecutor(Executors.newFixedThreadPool(2,new MyThreadFactory("async1")));
    }

    /**
     * 没有设置拒绝策略
     * @return
     */
    @Bean
    @Qualifier("async2")
    public TaskExecutor myAsyncExecutor2() {
        return new ConcurrentTaskExecutor(threadPool);
    }

    @Bean
    @Qualifier("async3")
    public TaskExecutor myAsyncExecutor3() {
        return new ConcurrentTaskExecutor(threadPoolWithRejectDeal);
    }

}

线程池相关类:

package com.example.spring;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class MyThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    public MyThreadFactory(String name) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = name + "-pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}
package com.example.spring.async;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

import com.example.spring.MyLog;
/**
 * 线程池满之后的处理策略类
 * @DESC
 * @author guchuang
 *
 */
public class RejectedPolicy implements RejectedExecutionHandler {
    public RejectedPolicy() { }

    /**
     * 向线程池中添加线程被拒绝时会调用这个方法。一般拒绝是因为线程池满了
     *
     * @param r 被拒绝的任务
     * @param e 拒绝这个任务的线程池
     */
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        MyLog.info("one thread is rejected, i will deal it");
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

测试类:

package com.example.spring.async;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;

import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;

import com.example.spring.BaseDemoApplicationTest;
import com.example.spring.MyLog;
import com.example.spring.async.AsyncMethod;

public class AsyncMethodTest extends BaseDemoApplicationTest {

    @Autowired
    AsyncMethod asyncMethod;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
    }
    @AfterClass
    public static void afterClass() throws Exception {
        MyLog.sleep(3000);
    }

    @Before
    public void setUp() throws Exception {
    }

    @Test
    public void test1() {
        asyncMethod.foo1();
        MyLog.info("just wait");
        MyLog.sleep(2000);
    }
    @Test
    public void test2() {
        for (int i = 0; i < 10; i++) {
            asyncMethod.foo2(i);
        }
    }
    @Test
    public void test3() {
        for (int i = 0; i < 10; i++) {
            asyncMethod.foo3(i, "gc-thread-"+i);
        }
    }

    @Test
    public void testE() {
        try {
            Future<String> result = asyncMethod.futureE();
            //这里调用get才会获得异常
            MyLog.info(result.get());
        } catch(Exception e) {
            //e.printStackTrace();
            MyLog.info("this is excepted Exception:" + e.getMessage());
        }

        asyncMethod.fooE();
        MyLog.info("end call e");
        //MyLog.sleep(1000);
    }

    @Test
    public void testFuture() throws InterruptedException, ExecutionException {
        MyLog.info("\n-----------------start-----------------------");
        Future<String> result1 = asyncMethod.futureTask1();
        CompletableFuture<String> result2 = asyncMethod.futureTask2();
        MyLog.info("result1:" + result1.get());
        MyLog.info("result2:" + result2.get());
    }

    @Test
    public void testReject() {
        MyLog.info("\n-----------------start testReject-----------------------");
        MyLog.info("start add task");
        //当超过线程词最大容量的时候,会抛出TaskRejectedException
        try {
            for (int i = 0; i < 10; i++) {
                asyncMethod.asyncSleep(i, 1);
            }
        } catch(RejectedExecutionException e) {
            MyLog.info("excepted exception:" + e.getMessage());
        }
        MyLog.info("finished add task");
        MyLog.sleep(100 * 1000);
    }

    @Test
    public void testRejectWithDeal() {
        MyLog.info("\n-----------------start testRejectWithDeal-----------------------");
        MyLog.info("start add task");
        //当超过线程词最大容量的时候,会抛出TaskRejectedException
        try {
            for (int i = 0; i < 10; i++) {
                asyncMethod.asyncSleep3(i, 1);
            }
        } catch(RejectedExecutionException e) {
            MyLog.info("excepted exception:" + e.getMessage());
        }
        MyLog.info("finished add task");
        MyLog.sleep(100 * 1000);
    }
}
package com.example.spring.async;

import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;

import com.example.spring.BaseDemoApplicationTest;
import com.example.spring.MyLog;
import com.example.spring.async.AsyncClass;

public class AsyncClassTest extends BaseDemoApplicationTest {

    @Autowired
    AsyncClass asyncClass;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
    }

    @Before
    public void setUp() throws Exception {
    }

    @Test
    public void test() {
        asyncClass.foo();
        asyncClass.foo(10);
        MyLog.sleep(100);
        asyncClass.foo();
    }

}

原文地址:https://www.cnblogs.com/gc65/p/11183836.html

时间: 2024-10-29 08:42:40

spring @Async异步方法使用及原理说明的相关文章

异步任务spring @Async注解源码解析

1.引子 开启异步任务使用方法: 1).方法上加@Async注解 2).启动类或者配置类上@EnableAsync 2.源码解析 虽然spring5已经出来了,但是我们还是使用的spring4,本文就根据spring-context-4.3.14.RELEASE.jar来分析源码. 2.1.@Async org.springframework.scheduling.annotation.Async 源码注释翻译: 1 /** 2 * Annotation that marks a method

菜鸟学SSH(十四)——Spring容器AOP的实现原理——动态代理

之前写了一篇关于IOC的博客--<Spring容器IOC解析及简单实现>,今天再来聊聊AOP.大家都知道Spring的两大特性是IOC和AOP. IOC负责将对象动态的注入到容器,从而达到一种需要谁就注入谁,什么时候需要就什么时候注入的效果,可谓是招之则来,挥之则去.想想都觉得爽,如果现实生活中也有这本事那就爽歪歪了,至于有多爽,各位自己脑补吧:而AOP呢,它实现的就是容器的另一大好处了,就是可以让容器中的对象都享有容器中的公共服务.那么容器是怎么做到的呢?它怎么就能让在它里面的对象自动拥有它

Spring读取xml配置文件的原理与实现

本篇博文的目录: 一:前言 二:spring的配置文件 三:依赖的第三方库.使用技术.代码布局 四:Document实现 五:获取Element的实现 六:解析Element元素 七:Bean创造器 八:Ioc容器的创建 九:总结 一:前言: Spring作为Bean的管理容器,在我们的项目构建中发挥了举足轻重的作用,尤其是控制反转(IOC)和依赖(DI)注入的特性,将对象的创建完全交给它来实现,当我们把与其他框架进行整合时,比如与Mybatis整合,可以把sqlMapClientTemplat

Spring系列之AOP的原理及手动实现

目录 Spring系列之IOC的原理及手动实现 Spring系列之DI的原理及手动实现 引入 到目前为止,我们已经完成了简易的IOC和DI的功能,虽然相比如Spring来说肯定是非常简陋的,但是毕竟我们是为了理解原理的,也没必要一定要做一个和Spring一样的东西.到了现在并不能让我们松一口气,前面的IOC和DI都还算比较简单,这里要介绍的AOP难度就稍微要大一点了. tips 本篇内容难度较大,每一步都需要理清思路,可能需要多看几遍,多画类图和手动实现更容易掌握. AOP 什么是AOP Asp

Spring @async原理

Spring中@Async用法总结 引言: 在Java应用中,绝大多数情况下都是通过同步的方式来实现交互处理的:但是在处理与第三方系统交互的时候,容易造成响应迟缓的情况,之前大部分都是使用多线程来完成此类任务,其实,在spring 3.x之后,就已经内置了@Async来完美解决这个问题,本文将完成介绍@Async的用法. 1.  何为异步调用? 在解释异步调用之前,我们先来看同步调用的定义:同步就是整个处理过程顺序执行,当各个过程都执行完毕,并返回结果. 异步调用则是只是发送了调用的指令,调用者

通过源码理解Spring中@Scheduled的实现原理并且实现调度任务动态装载

前提 最近的新项目和数据同步相关,有定时调度的需求.之前一直有使用过Quartz.XXL-Job.Easy Scheduler等调度框架,后来越发觉得这些框架太重量级了,于是想到了Spring内置的Scheduling模块.而原生的Scheduling模块只是内存态的调度模块,不支持任务的持久化或者配置(配置任务通过@Scheduled注解进行硬编码,不能抽离到类之外),因此考虑理解Scheduling模块的底层原理,并且基于此造一个简单的轮子,使之支持调度任务配置:通过配置文件或者JDBC数据

Spring核心框架 - AOP的原理及源码解析

一.AOP的体系结构 如下图所示:(引自AOP联盟) 层次3语言和开发环境:基础是指待增加对象或者目标对象:切面通常包括对于基础的增加应用:配置是指AOP体系中提供的配置环境或者编织配置,通过该配置AOP将基础和切面结合起来,从而完成切面对目标对象的编织实现. 层次2面向方面系统:配置模型,逻辑配置和AOP模型是为上策的语言和开发环境提供支持的,主要功能是将需要增强的目标对象.切面和配置使用AOP的API转换.抽象.封装成面向方面中的逻辑模型. 层次1底层编织实现模块:主要是将面向方面系统抽象封

Spring源码:IOC原理解析(一)

版权声明:本文为博主原创文章,转载请注明出处,欢迎交流学习! IOC(Inversion of Control),即控制反转,意思是将对象的创建和依赖关系交给第三方容器处理,我们要用的时候告诉容器我们需要什么然后直接去拿就行了.举个例子,我们有一个工厂,它生产各种产品,当你需要某个产品,比如你需要一辆汽车,你就告诉工厂你需要一辆汽车,工厂就会直接返回给你一辆汽车,而不需要你自己通过付出劳动来得到这辆汽车,你也不用关心工厂是如何生产这辆汽车.对应到我们的程序中就是,IOC容器会帮我们创建和管理对象

Java的注解机制——Spring自动装配的实现原理

使用注解主要是在需要使用Spring框架的时候,特别是使用SpringMVC.因为这时我们会发现它的强大之处:预处理. 注解实际上相当于一种标记,它允许你在运行时(源码.文档.类文件我们就不讨论了)动态地对拥有该标记的成员进行操作. 实现注解需要三个条件(我们讨论的是类似于Spring自动装配的高级应用):注解声明.使用注解的元素.操作使用注解元素的代码. 首先是注解声明,注解也是一种类型,我们要定义的话也需要编写代码,如下: 1 package annotation; 2 3 import j