异步任务spring @Async注解源码解析

1.引子

开启异步任务使用方法:

1).方法上加@Async注解

2).启动类或者配置类上@EnableAsync

2.源码解析

虽然spring5已经出来了,但是我们还是使用的spring4,本文就根据spring-context-4.3.14.RELEASE.jar来分析源码。

2.1.@Async

org.springframework.scheduling.annotation.Async 源码注释翻译:

 1 /**
 2  * Annotation that marks a method as a candidate for <i>asynchronous</i> execution.
 3  * Can also be used at the type level, in which case all of the type‘s methods are
 4  * considered as asynchronous.该注解可以标记一个异步执行的方法,也可以用来标注类,表示类中的所有方法都是异步执行的。
 5  *
 6  * <p>In terms of target method signatures, any parameter types are supported.
 7  * However, the return type is constrained to either {@code void} or
 8  * {@link java.util.concurrent.Future}. In the latter case, you may declare the
 9  * more specific {@link org.springframework.util.concurrent.ListenableFuture} or
10  * {@link java.util.concurrent.CompletableFuture} types which allow for richer
11  * interaction with the asynchronous task and for immediate composition with
12  * further processing steps.入参随意,但返回值只能是void或者Future.(ListenableFuture接口/CompletableFuture类)13  *
14  * <p>A {@code Future} handle returned from the proxy will be an actual asynchronous
15  * {@code Future} that can be used to track the result of the asynchronous method
16  * execution. However, since the target method needs to implement the same signature,
17  * it will have to return a temporary {@code Future} handle that just passes a value
18  * through: e.g. Spring‘s {@link AsyncResult}, EJB 3.1‘s {@link javax.ejb.AsyncResult},
19  * or {@link java.util.concurrent.CompletableFuture#completedFuture(Object)}.
20  * Future是代理返回的切实的异步返回,用以追踪异步方法的返回值。当然也可以使用AsyncResult类(实现ListenableFuture接口)(Spring或者EJB都有)或者CompletableFuture类
21  * @author Juergen Hoeller
22  * @author Chris Beams
23  * @since 3.0
24  * @see AnnotationAsyncExecutionInterceptor
25  * @see AsyncAnnotationAdvisor
26  */
27 @Target({ElementType.METHOD, ElementType.TYPE})
28 @Retention(RetentionPolicy.RUNTIME)
29 @Documented
30 public @interface Async {
31
32     /**
33      * A qualifier value for the specified asynchronous operation(s).
34      * <p>May be used to determine the target executor to be used when executing this
35      * method, matching the qualifier value (or the bean name) of a specific
36      * {@link java.util.concurrent.Executor Executor} or
37      * {@link org.springframework.core.task.TaskExecutor TaskExecutor}
38      * bean definition.用以限定执行方法的执行器名称(自定义):Executor或者TaskExecutor
39      * <p>When specified on a class level {@code @Async} annotation, indicates that the
40      * given executor should be used for all methods within the class. Method level use
41      * of {@code Async#value} always overrides any value set at the class level.
42      * @since 3.1.2   加在类上表示整个类都使用,加在方法上会覆盖类上的设置
43      */
44     String value() default "";
45
46 }

上图源码注释已经写的很清晰了哈,主要注意3点:

1)返回值:不要返回值直接void;需要返回值用AsyncResult或者CompletableFuture

2)可自定义执行器并指定例如:@Async("otherExecutor")

3)@Async  必须不同类间调用: A类--》B类.C方法()(@Async注释在B类/方法中),如果在同一个类中调用,会变同步执行,例如:A类.B()-->A类[email protected] C(),原因是:底层实现是代理对注解扫描实现的,B方法上没有注解,没有生成相应的代理类。(当然把@Async加到类上也能解决但所有方法都异步了,一般不这么用!)

2.2 @EnableAsync

老规矩咱们直接看类注释:

//开启spring异步执行器,类似xml中的task标签配置,需要联合@Configuration注解一起使用
Enables Spring‘s asynchronous method execution capability, similar to functionality found in Spring‘s <task:*> XML namespace.
To be used together with @Configuration classes as follows, enabling annotation-driven async processing for an entire Spring application context:
 @Configuration
 @EnableAsync
 public class AppConfig {

}
MyAsyncBean is a user-defined type with one or more methods annotated with either Spring‘s @Async annotation, the EJB 3.1 @javax.ejb.Asynchronous annotation, or any custom annotation specified via the annotation() attribute. The aspect is added transparently for any registered bean, for instance via this configuration:
 @Configuration
 public class AnotherAppConfig {

@Bean
     public MyAsyncBean asyncBean() {
         return new MyAsyncBean();
     }
 }

//默认情况下spring会先搜索TaskExecutor类型的bean或者名字为taskExecutor的Executor类型的bean,都不存在使用SimpleAsyncTaskExecutor执行器
By default, Spring will be searching for an associated thread pool definition: either a unique TaskExecutor bean in the context, or an Executor bean named "taskExecutor" otherwise. If neither of the two is resolvable, a SimpleAsyncTaskExecutor will be used to process async method invocations. Besides, annotated methods having a void return type cannot transmit any exception back to the caller. By default, such uncaught exceptions are only logged.
To customize all this, implement AsyncConfigurer and provide:
your own Executor through the getAsyncExecutor() method, and your own AsyncUncaughtExceptionHandler through the getAsyncUncaughtExceptionHandler() method.//可实现AsyncConfigurer接口复写getAsyncExecutor获取异步执行器,getAsyncUncaughtExceptionHandler获取异步未捕获异常处理器
 @Configuration
 @EnableAsync
 public class AppConfig implements AsyncConfigurer {

@Override
     public Executor getAsyncExecutor() {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
         executor.setCorePoolSize(7);
         executor.setMaxPoolSize(42);
         executor.setQueueCapacity(11);
         executor.setThreadNamePrefix("MyExecutor-");
         executor.initialize();
         return executor;
     }

@Override
     public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
         return MyAsyncUncaughtExceptionHandler();
     }
 }
If only one item needs to be customized, null can be returned to keep the default settings. Consider also extending from AsyncConfigurerSupport when possible.
Note: In the above example the ThreadPoolTaskExecutor is not a fully managed Spring bean. Add the @Bean annotation to the getAsyncExecutor() method if you want a fully managed bean. In such circumstances it is no longer necessary to manually call the executor.initialize() method as this will be invoked automatically when the bean is initialized.
For reference, the example above can be compared to the following Spring XML configuration:
 <beans>

<task:annotation-driven executor="myExecutor" exception-handler="exceptionHandler"/>

<task:executor id="myExecutor" pool-size="7-42" queue-capacity="11"/>

<bean id="asyncBean" class="com.foo.MyAsyncBean"/>

<bean id="exceptionHandler" class="com.foo.MyAsyncUncaughtExceptionHandler"/>

</beans>
 //注解类和xml基本一致,但是使用注解类还可以自定义线程名前缀(上面的AppConfig-》getAsyncExecutor-》setThreadNamePrefix)
The above XML-based and JavaConfig-based examples are equivalent except for the setting of the thread name prefix of the Executor; this is because the <task:executor> element does not expose such an attribute. This demonstrates how the JavaConfig-based approach allows for maximum configurability through direct access to actual componentry.
The mode() attribute controls how advice is applied: If the mode is AdviceMode.PROXY (the default), then the other attributes control the behavior of the proxying. Please note that proxy mode allows for interception of calls through the proxy only; local calls within the same class cannot get intercepted that way.//这里就说明了@Async必须在不同方法中调用,即第一部分注意的第三点。
Note that if the mode() is set to AdviceMode.ASPECTJ, then the value of the proxyTargetClass() attribute will be ignored. Note also that in this case the spring-aspects module JAR must be present on the classpath, with compile-time weaving or load-time weaving applying the aspect to the affected classes. There is no proxy involved in such a scenario; local calls will be intercepted as well.//当然也可以用Aspect模式织入(需要引入spring-aspects模块需要的jar)

下面是源码:

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(AsyncConfigurationSelector.class)public @interface ç {

   /**该属性用来支持用户自定义异步注解,默认扫描spring的@Async和EJB3.1的@code @javax.ejb.Asynchronous    * Indicate the ‘async‘ annotation type to be detected at either class    * or method level.    * <p>By default, both Spring‘s @{@link Async} annotation and the EJB 3.1    * {@code @javax.ejb.Asynchronous} annotation will be detected.    * <p>This attribute exists so that developers can provide their own    * custom annotation type to indicate that a method (or all methods of    * a given class) should be invoked asynchronously.    */   Class<? extends Annotation> annotation() default Annotation.class;

   /**标明是否需要创建CGLIB子类代理,AdviceMode=PROXY时才适用。注意设置为true时,其它spring管理的bean也会升级到CGLIB子类代理    * Indicate whether subclass-based (CGLIB) proxies are to be created as opposed    * to standard Java interface-based proxies.    * <p><strong>Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}</strong>.    * <p>The default is {@code false}.    * <p>Note that setting this attribute to {@code true} will affect <em>all</em>    * Spring-managed beans requiring proxying, not just those marked with {@code @Async}.    * For example, other beans marked with Spring‘s {@code @Transactional} annotation    * will be upgraded to subclass proxying at the same time. This approach has no    * negative impact in practice unless one is explicitly expecting one type of proxy    * vs. another &mdash; for example, in tests.    */   boolean proxyTargetClass() default false;

   /**标明异步通知将会如何实现,默认PROXY,如需支持同一个类中非异步方法调用另一个异步方法,需要设置为ASPECTJ    * Indicate how async advice should be applied.    * <p><b>The default is {@link AdviceMode#PROXY}.</b>    * Please note that proxy mode allows for interception of calls through the proxy    * only. Local calls within the same class cannot get intercepted that way; an    * {@link Async} annotation on such a method within a local call will be ignored    * since Spring‘s interceptor does not even kick in for such a runtime scenario.    * For a more advanced mode of interception, consider switching this to    * {@link AdviceMode#ASPECTJ}.    */   AdviceMode mode() default AdviceMode.PROXY;

   /**标明异步注解bean处理器应该遵循的执行顺序,默认最低的优先级(Integer.MAX_VALUE,值越小优先级越高)    * Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor}    * should be applied.    * <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run    * after all other post-processors, so that it can add an advisor to    * existing proxies rather than double-proxy.    */   int order() default Ordered.LOWEST_PRECEDENCE;

}

执行流程:

如上图,核心注解就是@Import(AsyncConfigurationSelector.class),一看就是套路ImportSelector接口的selectImports()方法,源码如下:

 1 /**查询器:基于@EanableAsync中定义的模式AdviceMode加在@Configuration标记的类上,确定抽象异步配置类的实现类
 2  * Selects which implementation of {@link AbstractAsyncConfiguration} should be used based
 3  * on the value of {@link EnableAsync#mode} on the importing {@code @Configuration} class.
 4  *
 5  * @author Chris Beams
 6  * @since 3.1
 7  * @see EnableAsync
 8  * @see ProxyAsyncConfiguration
 9  */
10 public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
11
12     private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
13             "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
14
15     /**
16      * {@inheritDoc}
17      * @return {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration} for
18      * {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()}, respectively
19      */
20     @Override
21     public String[] selectImports(AdviceMode adviceMode) {
22         switch (adviceMode) {
23             case PROXY://如果配置的PROXY,使用ProxyAsyncConfiguration
24                 return new String[] { ProxyAsyncConfiguration.class.getName() };
25             case ASPECTJ://如果配置的ASPECTJ,使用ProxyAsyncConfiguration
26                 return new String[] { ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME };
27             default:
28                 return null;
29         }
30     }
31
32 }

我们就选一个类ProxyAsyncConfiguration(JDK接口代理)看一下具体实现:

 1 /**
 2  * {@code @Configuration} class that registers the Spring infrastructure beans necessary
 3  * to enable proxy-based asynchronous method execution.
 4  *
 5  * @author Chris Beams
 6  * @author Stephane Nicoll
 7  * @since 3.1
 8  * @see EnableAsync
 9  * @see AsyncConfigurationSelector
10  */
11 @Configuration
12 @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
13 public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
14
15     @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
16     @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
17     public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
18         Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
19         AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();//新建一个异步注解bean后处理器
20         Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
21         //如果@EnableAsync中用户自定义了annotation属性,即异步注解类型,那么设置                  if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
22             bpp.setAsyncAnnotationType(customAsyncAnnotation);
23         }
24         if (this.executor != null) {//Executor:设置线程任务执行器
25             bpp.setExecutor(this.executor);
26         }
27         if (this.exceptionHandler != null) {//AsyncUncaughtExceptionHandler:设置异常处理器
28             bpp.setExceptionHandler(this.exceptionHandler);
29         }
30         bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));//设置是否升级到CGLIB子类代理,默认不开启
31         bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));//设置执行优先级,默认最后执行
32         return bpp;
33     }
34
35 }

如上图,ProxyAsyncConfiguration就两点:

1.就是继承了AbstractAsyncConfiguration类

2.定义了一个bean:AsyncAnnotationBeanPostProcessor

2.AbstractAsyncConfiguration源码:

 1 /**
 2  * Abstract base {@code Configuration} class providing common structure for enabling
 3  * Spring‘s asynchronous method execution capability.
 4  * 抽象异步配置类,封装了通用结构,用以支持spring的异步方法执行能力
 5  * @author Chris Beams
 6  * @author Stephane Nicoll
 7  * @since 3.1
 8  * @see EnableAsync
 9  */
10 @Configuration
11 public abstract class AbstractAsyncConfiguration implements ImportAware {
12
13     protected AnnotationAttributes enableAsync;//enableAsync的注解属性
14
15     protected Executor executor;//Doug Lea老李头设计的线程任务执行器
16
17     protected AsyncUncaughtExceptionHandler exceptionHandler;//异常处理器
18
19
20     @Override
21     public void setImportMetadata(AnnotationMetadata importMetadata) {
22         this.enableAsync = AnnotationAttributes.fromMap(
23                 importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
24         if (this.enableAsync == null) {
25             throw new IllegalArgumentException(
26                     "@EnableAsync is not present on importing class " + importMetadata.getClassName());
27         }
28     }
29
30     /**
31      * Collect any {@link AsyncConfigurer} beans through autowiring.
32      */
33     @Autowired(required = false)
34     void setConfigurers(Collection<AsyncConfigurer> configurers) {
35         if (CollectionUtils.isEmpty(configurers)) {
36             return;
37         }
38         if (configurers.size() > 1) {
39             throw new IllegalStateException("Only one AsyncConfigurer may exist");
40         }
41         AsyncConfigurer configurer = configurers.iterator().next();
42         this.executor = configurer.getAsyncExecutor();
43         this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler();
44     }
45
46 }

很清晰哈,

属性:

1)注解属性

2)异步任务执行器

3)异常处理器

方法:

1)setImportMetadata 设置注解属性,即属性1

2)setConfigurers 设置异步任务执行器和异常处理器,即属性2,3

2.AsyncAnnotationBeanPostProcessor这个Bean,类图如下:

后面详细分析AOP详细过程。

2.3.AOP-Advisor切面初始化:(AsyncAnnotationBeanPostProcessor -》setBeanFactory())

AsyncAnnotationBeanPostProcessor这个类的Bean 初始化时 : BeanFactoryAware接口setBeanFactory方法中,对AsyncAnnotationAdvisor异步注解切面进行了构造。

 1 @Override
 2     public void setBeanFactory(BeanFactory beanFactory) {
 3         super.setBeanFactory(beanFactory);
 4
 5         AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
 6         if (this.asyncAnnotationType != null) {
 7             advisor.setAsyncAnnotationType(this.asyncAnnotationType);
 8         }
 9         advisor.setBeanFactory(beanFactory);
10         this.advisor = advisor;
11     }

AsyncAnnotationAdvisor的类图如下:

2.4.AOP-生成代理类AopProxy(AsyncAnnotationBeanPostProcessor -》postProcessAfterInitialization())

具体的后置处理:AsyncAnnotationBeanPostProcessor的后置bean处理是通过其父类AbstractAdvisingBeanPostProcessor来实现的,

该类实现了BeanPostProcessor接口,复写postProcessAfterInitialization方法如下图所示:

 1 @Override
 2     public Object postProcessAfterInitialization(Object bean, String beanName) {
 3         if (bean instanceof AopInfrastructureBean) {
 4             // Ignore AOP infrastructure such as scoped proxies.
 5             return bean;
 6         }
 7         //把Advisor添加进bean  ProxyFactory-》AdvisedSupport-》Advised
 8         if (bean instanceof Advised) {
 9             Advised advised = (Advised) bean;
10             if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
11                 // Add our local Advisor to the existing proxy‘s Advisor chain...
12                 if (this.beforeExistingAdvisors) {
13                     advised.addAdvisor(0, this.advisor);
14                 }
15                 else {
16                     advised.addAdvisor(this.advisor);
17                 }
18                 return bean;
19             }
20         }
21         //构造ProxyFactory代理工厂,添加代理的接口,设置切面,最后返回代理类:AopProxy
22         if (isEligible(bean, beanName)) {
23             ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
24             if (!proxyFactory.isProxyTargetClass()) {
25                 evaluateProxyInterfaces(bean.getClass(), proxyFactory);
26             }
27             proxyFactory.addAdvisor(this.advisor);
28             customizeProxyFactory(proxyFactory);
29             return proxyFactory.getProxy(getProxyClassLoader());
30         }
31
32         // No async proxy needed.
33         return bean;
34     }

isEligible用于判断这个类或者这个类中的某个方法是否含有注解,AsyncAnnotationAdvisor 实现了PointcutAdvisor接口,满足条件2如下图:

19   public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {
20         if (advisor instanceof IntroductionAdvisor) {
21             return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
22         }//满足第二分支PointcutAdvisor
23         else if (advisor instanceof PointcutAdvisor) {
24             PointcutAdvisor pca = (PointcutAdvisor) advisor;
25             return canApply(pca.getPointcut(), targetClass, hasIntroductions);
26         }
27         else {
28             // It doesn‘t have a pointcut so we assume it applies.
29             return true;
30         }
31     }

isEligible校验通过后,构造ProxyFactory代理工厂,添加代理的接口,设置切面,最后返回代理类:AopProxy接口实现类

2.5.AOP-切点执行(InvocationHandler.invoke)

上一步生成的代理AopProxy接口,我们这里最终实际生成的是JdkDynamicAopProxy,即JDK动态代理类,类图如下:

最终执行的是InvocationHandler接口的invoke方法,下面是截取出来的核心代码:

 1 // 得到方法的拦截器链
 2 List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
 3 // Check whether we have any advice. If we don‘t, we can fallback on direct
 4 // reflective invocation of the target, and avoid creating a MethodInvocation.
 5 if (chain.isEmpty()) {
 6     // We can skip creating a MethodInvocation: just invoke the target directly
 7     // Note that the final invoker must be an InvokerInterceptor so we know it does
 8     // nothing but a reflective operation on the target, and no hot swapping or fancy proxying.
 9     Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
10     retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
11 }
12 else {
13     // 构造
14     invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
15     // Proceed to the joinpoint through the interceptor chain.
16     retVal = invocation.proceed();
17 }

@Async注解的拦截器是AsyncExecutionInterceptor,它继承了MethodInterceptor接口。而MethodInterceptor就是AOP规范中的Advice(切点的处理器)。

chain不为空,执行第二个分支,构造ReflectiveMethodInvocation,然后执行proceed方法。

 1 @Override
 2     public Object proceed() throws Throwable {
 3         //    如果没有拦截器,直接执行被代理的方法
 4         if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
 5             return invokeJoinpoint();
 6         }
 7
 8         Object interceptorOrInterceptionAdvice =
 9                 this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
10         if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
11             // Evaluate dynamic method matcher here: static part will already have
12             // been evaluated and found to match.
13             InterceptorAndDynamicMethodMatcher dm =
14                     (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
15             if (dm.methodMatcher.matches(this.method, this.targetClass, this.arguments)) {
16                 return dm.interceptor.invoke(this);
17             }
18             else {
19                 // Dynamic matching failed.
20                 // Skip this interceptor and invoke the next in the chain.
21                 return proceed();
22             }
23         }
24         else {
25             // It‘s an interceptor, so we just invoke it: The pointcut will have
26             // been evaluated statically before this object was constructed.
27             return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
28         }
29     }

如上图,核心方法是InterceptorAndDynamicMethodMatcher.interceptor.invoke(this),实际就是执行了AsyncExecutionInterceptor.invoke,继续追!

 1 public Object invoke(final MethodInvocation invocation) throws Throwable {
 2         Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
 3         Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
 4         final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
 5
 6         AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
 7         if (executor == null) {
 8             throw new IllegalStateException(//如果没有自定义异步任务执行器,报下面这行错,不用管,可以默认执行
 9                     "No executor specified and no default executor set on AsyncExecutionInterceptor either");
10         }
11
12         Callable<Object> task = new Callable<Object>() {
13             @Override
14             public Object call() throws Exception {
15                 try {
16                     Object result = invocation.proceed();
17                     if (result instanceof Future) {
18                         return ((Future<?>) result).get();//阻塞等待执行完毕得到结果
19                     }
20                 }
21                 catch (ExecutionException ex) {
22                     handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
23                 }
24                 catch (Throwable ex) {
25                     handleError(ex, userDeclaredMethod, invocation.getArguments());
26                 }
27                 return null;
28             }
29         };
30         //提交有任务给执行器
31         return doSubmit(task, executor, invocation.getMethod().getReturnType());
32     }

终极执行核心方法doSubmit()

 1 protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
 2         if (completableFuturePresent) {//先判断是否存在CompletableFuture这个类,优先使用CompletableFuture执行任务
 3             Future<Object> result = CompletableFutureDelegate.processCompletableFuture(returnType, task, executor);
 4             if (result != null) {
 5                 return result;
 6             }
 7         }//返回值是可监听Future,定义过回调函数:addCallback
 8         if (ListenableFuture.class.isAssignableFrom(returnType)) {
 9             return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
10         }//返回值是Future
11         else if (Future.class.isAssignableFrom(returnType)) {
12             return executor.submit(task);
13         }
14         else {//没有返回值
15             executor.submit(task);
16             return null;
17         }
18     }

最终执行:就是开启一个线程启动...

1 protected void doExecute(Runnable task) {
2         Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
3         thread.start();
4     }

3.总结

整体流程大体可梳理为两条线:

1.从注解开始:@EnableAsync--》ProxyAsyncConfiguration类构造一个bean(类型:AsyncAnnotationBeanPostProcessor)

2.从AsyncAnnotationBeanPostProcessor这个类的bean的生命周期走:AOP-Advisor切面初始化(setBeanFactory())--》AOP-生成代理类AopProxy(postProcessAfterInitialization())--》AOP-切点执行(InvocationHandler.invoke)

原文地址:https://www.cnblogs.com/dennyzhangdd/p/9026303.html

时间: 2024-08-29 05:12:15

异步任务spring @Async注解源码解析的相关文章

Spring @Import注解源码解析

简介 Spring 3.0之前,创建Bean可以通过xml配置文件与扫描特定包下面的类来将类注入到Spring IOC容器内.而在Spring 3.0之后提供了JavaConfig的方式,也就是将IOC容器里Bean的元信息以java代码的方式进行描述.我们可以通过@Configuration与@Bean这两个注解配合使用来将原来配置在xml文件里的bean通过java代码的方式进行描述 @Import注解提供了@Bean注解的功能,同时还有xml配置文件里标签组织多个分散的xml文件的功能,当

Feign 系列(05)Spring Cloud OpenFeign 源码解析

Feign 系列(05)Spring Cloud OpenFeign 源码解析 [TOC] Spring Cloud 系列目录(https://www.cnblogs.com/binarylei/p/11563952.html#feign) 在 上一篇 文章中我们分析 Feign 参数解析的整个流程,Feign 原生已经支持 Feign.JAX-RS 1/2 声明式规范,本文着重关注 Spring Cloud 是如果整合 OpenFeign 的,使之支持 Spring MVC? 1. Sprin

Spring Security 解析(七) —— Spring Security Oauth2 源码解析

Spring Security 解析(七) -- Spring Security Oauth2 源码解析 ??在学习Spring Cloud 时,遇到了授权服务oauth 相关内容时,总是一知半解,因此决定先把Spring Security .Spring Security Oauth2 等权限.认证相关的内容.原理及设计学习并整理一遍.本系列文章就是在学习的过程中加强印象和理解所撰写的,如有侵权请告知. 项目环境: JDK1.8 Spring boot 2.x Spring Security

Spring Boot 启动源码解析系列六:执行启动方法一

1234567891011121314151617181920212223242526272829303132333435363738394041424344 public ConfigurableApplicationContext (String... args) { StopWatch stopWatch = new StopWatch(); // 开始执行,记录开始时间 stopWatch.start(); ConfigurableApplicationContext context =

Android异步消息处理机制(4)AsyncTask源码解析

上一章我们学习了抽象类AsyncTask的基本使用(地址:http://blog.csdn.net/wangyongge85/article/details/47988569),下面我将以问答的方法分析AsyncTask源码内容,源码版本为:API22. 1. 为什么必须在UI线程实例化我们的AsyncTask,并且必须在主线程中调用execute(Params... params)? 在分析为什么在UI线程调用之前,我们先看一下实例化AsyncTask并调用execute(Params...

Spring一小部分源码解析(持续)

如何查看源码 Spring源码下载https://github.com/spring-projects/spring-framework/tags?after=v3.1.0.RC1 eclipse关联源码 自己百度吧 源代码结构组织 Build-spring-framework是整个Spring源代码的构建目录,里面是项目的构建脚本,如果要自己动手构建Spring,可以进入这个目录使用ANT进行构建. l  org.springframework.context是IoC容器的源代码目录 l  o

spring cloud ribbon源码解析(一)

我们知道spring cloud中restTemplate可以通过服务名调接口,加入@loadBalanced标签就实现了负载均衡的功能,那么spring cloud内部是如何实现的呢? 通过@loadBalanced我们进入标签 注释解释这个标签是标记为restTemplate,作为loadBalancerClient,接着去看loadBalancerClient loadBalancerClient通过继承serviceInstanceChooser,主要包含以下几个抽象方法: 1.choo

MapperScannerConfigurer源码解析

声明:源码基于mybatis-spring 1.3.2 前文 首先在阅读本文前需要明白整合后的使用方式以及熟悉MyBatis本身的工作原理,再者如果对于本文相关知识点不熟悉的可以参考下述文章. MyBatis与Spring整合 SqlSessionTemplate源码解析 Spring包扫描机制详解 前言 一般在项目中使用MyBatis时,都会和Spring整合一起使用,通过注入一个Mapper接口来操纵数据库.其中的原理就是使用了MyBatis-Spring的MapperScannerConf

Spring 源码解析之HandlerAdapter源码解析(二)

Spring 源码解析之HandlerAdapter源码解析(二) 前言 看这篇之前需要有Spring 源码解析之HandlerMapping源码解析(一)这篇的基础,这篇主要是把请求流程中的调用controller流程单独拿出来了 解决上篇文章遗留的问题 getHandler(processedRequest) 这个方法是如何查找到对应处理的HandlerExecutionChain和HandlerMapping的,比如说静态资源的处理和请求的处理肯定是不同的HandlerMapping ge