你知道 @Async 是怎么让方法异步执行的吗?

2022-12-06 16:26:52 浏览数 (2)

前言

@Async 是通过注解标记来开启方法的异步执行的;对于注解的底层实现,除了 java 原生提供那种依赖编译期植入的之外,其他的基本都差不多,即运行时通过反射等方式拦截到打了注解的类或者方法,然后执行时进行横切拦截;另外这里还有一个点就是方法异步执行,所以对于 @Async 的剖析,就一定绕不开两个基本的知识点,就是代理和线程池。 在了解到这些之后,我们来拆解下 @Async 的基本原理。

如何开启生效?

The @EnableAsync annotation switches on Spring’s ability to run @Async methods in a background thread pool.

通过 @EnableAsync 来开启异步方法的能力。

代码语言:javascript复制
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { // ...` } 复制代码

@EnableAsync 注解 Import 了 AsyncConfigurationSelector,这个在 SpringBoot 中是非常常见的一种写法,这里需要关注的是选择了哪个自动配置类;adviceMode 默认是 false,这里就以 ProxyAsyncConfiguration 为例:

代码语言:javascript复制
@Override @Nullable public String[] selectImports(AdviceMode adviceMode) {     switch (adviceMode) {         case PROXY:             return new String[] {ProxyAsyncConfiguration.class.getName()};         case ASPECTJ:             return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};         default:             return null;     } } 复制代码

AsyncAnnotationBeanPostProcessor

org.springframework.scheduling.annotation.ProxyAsyncConfiguration中最主要的就是创建 AsyncAnnotationBeanPostProcessor,从名字看,AsyncAnnotationBeanPostProcessor 就是来处理 @Async 注解的;目的很明确,就是创建对应 bean 的代理对象,以便于执行方法时能够进行 AOP 拦截(具体细节可以看 org.springframework.aop.framework.AbstractAdvisingBeanPostProcessor#postProcessAfterInitialization这个方法)。

代码语言:javascript复制
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName); if (!proxyFactory.isProxyTargetClass()) {    evaluateProxyInterfaces(bean.getClass(), proxyFactory); } proxyFactory.addAdvisor(this.advisor); customizeProxyFactory(proxyFactory); 复制代码

编辑

AnnotationAsyncExecutionInterceptor

这里涉及到 AOP 的一些基础知识,可以查阅之前写的 juejin.cn/post/684490… 这篇文章

AOP 中最外层的是代理类,然后是织入器(advisor),再接着是切面(advice he PointCut);前面已经将创建代理对象的逻辑进行了介绍,所以接下来是织入器(advisor)和切面的创建。实际上织入器(advisor)的创建逻辑也是在 AsyncAnnotationBeanPostProcessor 中完成的。

代码语言:javascript复制
@Override public void setBeanFactory(BeanFactory beanFactory) {     super.setBeanFactory(beanFactory); // 创建  advisor     AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);     if (this.asyncAnnotationType != null) {         advisor.setAsyncAnnotationType(this.asyncAnnotationType);     }     advisor.setBeanFactory(beanFactory);     this.advisor = advisor; } 复制代码

在 AsyncAnnotationAdvisor 的构造函数中,会构建 Advice 和 Pointcut

代码语言:javascript复制
public AsyncAnnotationAdvisor(         @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { // 省略其他代码 /// ...     // 创建 advice     this.advice = buildAdvice(executor, exceptionHandler);     // 创建 pointcut     this.pointcut = buildPointcut(asyncAnnotationTypes); } 复制代码

Advice 就是具体执行拦截的逻辑,这里的 advice 实际上 AnnotationAsyncExecutionInterceptor(why ? 因饰Advice 是 MethodInterceptor 的父类)。

代码语言:javascript复制
protected Advice buildAdvice(         @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { // 这里     AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);     interceptor.configure(executor, exceptionHandler);     return interceptor; } 复制代码

到这里,关于 @EnableAsync 是如何开启创建异步方法的逻辑基本就介绍完了;本质上还是 Spring AOP 的那套逻辑。

Tips

除了 adviceMode,一般情况下还会涉及到另外一个参数,即 proxyTargetClass;proxyTargetClass 在设置为 true 和 false 时,对应使用的代理机制大致如下:

  • true
    • 目标对象实现了接口 – 使用 CGLIB 代理机制
    • 目标对象没有接口(只有实现类) – 使用 CGLIB 代理机制
  • false
    • 目标对象实现了接口 – 使用 JDK 动态代理机制(代理所有实现了的接口)
    • 目标对象没有接口(只有实现类) – 使用 CGLIB 代理机制

线程池

上一小节中,对 @EnableAsync 生效机制和对应的 AOP 对象创建逻辑进行了介绍;实际上 AOP 拦截到具体的方法之后的主要目的就是将执行逻辑丢到线程池中去执行。那这里就会涉及到本节的主题,即线程池。本节需要搞清楚几个问题:

  • 什么时候创建的线程池?
  • 创建的线程池类型是啥?
  • 方法执行任务是如何被提交的?

创建 AnnotationAsyncExecutionInterceptor 时初始化线程池

线程池的创建是在创建 AnnotationAsyncExecutionInterceptor 对象时完成,代码如下:

代码语言:javascript复制
public AnnotationAsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {     super(defaultExecutor); } 复制代码

在其父类 AsyncExecutionAspectSupport 中完成具体线程池创建

代码语言:javascript复制
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory)); 复制代码

在 getDefaultExecutor 方法中, 会先从 Spring 容器找 TaskExecutor 类型的线程池 Bean,如果找不到,会扩大范围找 Executor 类型的线程池 Bean,如果找不到,则返回 null。

这里是个延迟载入的操作,即只有当异步方法被调用时,才会触发 SingletonSupplier get 操作,从而触发 getBean 的逻辑,如果你在 debug 时出现没有正常走到断点的情况,可以关注下这个场景。

默认线程池 SimpleAsyncTaskExecutor

代码语言:javascript复制
@Override @Nullable protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {     Executor defaultExecutor = super.getDefaultExecutor(beanFactory);     return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); } 复制代码

从这段逻辑看,如果从 Spring 容器中没有找到对应的线程池 Bean,那么就创建 SimpleAsyncTaskExecutor 作为默认的线程池。

This class also customizes the Executor by defining a new bean. Here, the method is named taskExecutor, since this is the specific method name for which Spring searches. In our case, we want to limit the number of concurrent threads to two and limit the size of the queue to 500. There are many more things you can tune. If you do not define an Executor bean, Spring creates a SimpleAsyncTaskExecutor and uses that.

方法执行任务的提交

基于前面的分析,方法执行任务的提交一定是发生在拦截到 @Async 注解时,也就是 AnnotationAsyncExecutionInterceptor 中;通过分析代码,在其父类 AsyncExecutionInterceptor 中,验证了分析。下面是部分核心逻辑:

代码语言:javascript复制
public Object invoke(final MethodInvocation invocation) throws Throwable {     // 1、拿到 Method     // 2、根据 Method 获取 executor     AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);     // 3、创建方法执行任务 task     Callable<Object> task = () -> {     // ...     };     // 4、提交 task     return doSubmit(task, executor, invocation.getMethod().getReturnType()); } 复制代码

determineAsyncExecutor 中说明了, executor 是和方法对象绑定的,即每个方法都有一个自己的 executor;异步方法在第一次执行的时候创建自己的 executor,然后缓存到内存中。在 doSubmit 中,会根据 returnType 的类型进行相应的处理

代码语言:javascript复制
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {     // CompletableFuture     if (CompletableFuture.class.isAssignableFrom(returnType)) {         return CompletableFuture.supplyAsync(() -> {             try {                 return task.call();             }             catch (Throwable ex) {                 throw new CompletionException(ex);             }         }, executor);     }     // ListenableFuture     else if (ListenableFuture.class.isAssignableFrom(returnType)) {         return ((AsyncListenableTaskExecutor) executor).submitListenable(task);     }     // Future     else if (Future.class.isAssignableFrom(returnType)) {         return executor.submit(task);     }     // void     else {         executor.submit(task);         return null;     } }

0 人点赞