异步编程 - 09 Spring框架中的异步执行_@Async注解异步执行原理&源码解析

2023-09-09 14:13:27 浏览数 (1)

概述

在Spring中调用线程将在调用含有@Async注释的方法时立即返回,Spring是如何做到的呢?其实是其对标注@Async注解的类做了代理,比如下面的类Async-AnnotationExample。

代码语言:javascript复制
public class AsyncAnnotationExample {
    @Async
    public CompletableFuture<String> doSomething() {

        // 1.创建future
        CompletableFuture<String> result = new CompletableFuture<String>();
        // 2.模拟任务执行
        try {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName()   "doSomething");
        } catch (Exception e) {
            e.printStackTrace();
        }
        result.complete("done");

        // 3.返回结果
        return result;
    }
}

由于AsyncAnnotationExample类中方法doSomething被标注了@Async注解,所以Spring框架在开启异步处理后会对AsyncAnnotationExample的实例进行代理,代理后的类代码框架如下所示。

代码语言:javascript复制
public class AsyncAnnotationExampleProxy {
    
    public AsyncAnnotationExample getAsyncTask() {
        return asyncTask;
    }

    public void setAsyncAnnotationExample(AsyncAnnotationExample asyncTask) {
        this.asyncTask = asyncTask;
    }

    private AsyncAnnotationExample asyncTask;
    private TaskExecutor executor = new SimpleAsyncTaskExecutor();
    public CompletableFuture<String> dosomthingAsyncFuture() {

        return CompletableFuture.supplyAsync(new Supplier<String>() {

            @Override
            public String get() {
                try {
                    return asyncTask.dosomthing().get();
                } catch (Throwable e) {
                    throw new CompletionException(e);
                }
            }
        },executor);
    }
}

如上代码所示,Spring会对AsyncAnnotationExample类进行代理,并且会把AsyncAnnotationExample的实例注入AsyncAnnotationExampleProxy内部,当我们调用AsyncAnnotationExample的dosomthing方法时,实际调用的是AsyncAnnotation ExampleProxy的dosomthing方法,后者使用CompletableFuture.supplyAsync开启了一个异步任务(其马上返回一个CompletableFuture对象),并且使用默认的SimpleAsync TaskExecutor线程池作为异步处理线程,然后在异步任务内具体调用了AsyncAnnotationExample实例的dosomthing方法。

默认情况下,Spring框架是使用Cglib对标注@Async注解的方法进行代理的,具体拦截器是AnnotationAsyncExecutionInterceptor,我们看看其invoke方法。

代码语言:javascript复制
public Object invoke(final MethodInvocation invocation) throws Throwable {
    //1.被代理的目标对象
    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    //2. 获取被代理的方法
    Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
    final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
    //3. 判断使用哪个执行器执行被代理的方法
    AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
    if (executor == null) {
        throw new IllegalStateException(
                "No executor specified and no default executor set on AsyncExecutionInterceptor either");
    }
    //4. 使用Callable包装要执行的方法
    Callable<Object> task = () -> {
        try {
            Object result = invocation.proceed();
            if (result instanceof Future) {
                return ((Future<?>) result).get();
            }
        }
        catch (ExecutionException ex) {
            handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
        }
        catch (Throwable ex) {
            handleError(ex, userDeclaredMethod, invocation.getArguments());
        }
        return null;
    };
    //5. 提交包装的Callable任务到指定执行器执行
    return doSubmit(task, executor, invocation.getMethod().getReturnType());
}

·代码1获取被代理的目标对象的Class对象,本例中为class:com.artisan.async.AsyncProgram.AsyncAnnotationExample的Class对象;

·代码2获取被代理的方法,本例中为public java.util.concurrent.CompletableFuture:com.artisan.async.AsyncProgram.AsyncAnnotationExample.dosomthing();

·代码3根据规则获取使用哪个执行器TaskExecutor执行被代理的方法,其代码如下所示。

代码语言:javascript复制
private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16);
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
    //4.1获取对应方法的执行器
    AsyncTaskExecutor executor = this.executors.get(method);
    //4.2不存在则按照规则查找
    if (executor == null) {
        //4.2.1 如果注解@Async中指定了执行器名称
        Executor targetExecutor;
        String qualifier = getExecutorQualifier(method);
        if (StringUtils.hasLength(qualifier)) {
            targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
        }
        //4.2.2 获取默认执行器
        else {
            targetExecutor = this.defaultExecutor;
            if (targetExecutor == null) {
                synchronized (this.executors) {
                    if (this.defaultExecutor == null) {
                        this.defaultExecutor = getDefaultExecutor(this.beanFactory);
                    }
                    targetExecutor = this.defaultExecutor;
                }
            }
        }
        //4.2.3
        if (targetExecutor == null) {
            return null;
        }
        //4.2.4 添加执行器到缓存
        executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
                (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
        this.executors.put(method, executor);
    }
    //4.3返回查找的执行器
    return executor;
}

代码4.1从缓存executors中尝试获取method方法对应的执行器,如果存在则直接执行代码4.3返回;否则执行代码4.2.1判断方法的注解@Async中是否指定了执行器名称,如果有则尝试从Spring的bean工厂内获取该名称的执行器的实例,否则执行代码4.2.2获取默认的执行器(SimpleAsyncTaskExecutor),然后代码4.2.4把执行器放入缓存。

到这里就探讨完成了AnnotationAsyncExecutionInterceptor的invoke方法内代码3是如何确定那个执行器,然后在invoke方法中的代码4使用Callable包装要执行的方法,代码5提交包装的Callable任务到指定执行器。

到这里所有的执行使用的都是调用线程,调用线程提交异步任务到执行器后就返回了,异步任务真正执行的是具体执行器中的线程。下面我们看看代码5 doSubmit的代码。

代码语言:javascript复制
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
    //5.1判断方法返回值是否为CompletableFuture类型或者是其子类
    if (CompletableFuture.class.isAssignableFrom(returnType)) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return task.call();
            }
            catch (Throwable ex) {
                throw new CompletionException(ex);
            }
        }, executor);
    }
    //5.2判断返回值类型是否为ListenableFuture类型或者是其子类
    else if (ListenableFuture.class.isAssignableFrom(returnType)) {
        return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
    }
    //5.3判断返回值类型是否为ListenableFuture类型或者是其子类
    else if (Future.class.isAssignableFrom(returnType)) {
        return executor.submit(task);
    }
    //5.4其他情况下没有返回值
    else {
        executor.submit(task);
        return null;
    }
}

·代码5.1判断方法返回值是否为CompletableFuture类型或者是其子类,如果是则把任务使用CompletableFuture.supplyAsync方法提交到线程池executor执行,该方法会马上返回一个CompletableFuture对象。

·代码5.2判断方法返回值是否为ListenableFuture类型或者是其子类,如果是则把任务提交到线程池executor执行,该方法会马上返回一个ListenableFuture对象。

·代码5.3判断方法返回值是否为Future类型或者是其子类,如果是则把任务提交到线程池executor执行,该方法会马上返回一个Future对象。

·代码5.4说明方法不需要返回值,直接提交任务到线程池executor后返回null。

上面我们讲解了代理拦截器AnnotationAsyncExecutionInterceptor的invoke方法如何对标注@Async的方法进行处理,实现异步执行的。其实还有一部分还没讲,前面说了要开始异步处理,必须使用@EnableAsync注解或者task:annotation-driven/来开启异步处理,那么这两个部分背后到底做了什么呢?下面我们就来一探究竟。

首先我们看看添加@EnableAsync注解后发生了什么?在Spring容器启动的过程中会有一系列扩展接口对Bean的元数据定义、初始化、实例化做拦截处理,也存在一些处理器类可以动态地向Spring容器添加一些框架需要使用的Bean实例。其中ConfigurationClassPostProcessor处理器类则是用来解析注解类,并把其注册到Spring容器中的,其可以解析标注@Configuration、@Component、@ComponentScan、@Import、@ImportResource等的Bean。当我们使用context:annotation-config/或者context:component-scan/时,Spring容器会默认把ConfigurationClassPostProcessor处理器注入Spring容器。

而@EnableAsync的定义如下:

代码语言:javascript复制
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
...
}

所以我们添加了@EnableAsync注解后,ConfigurationClassPostProcessor会解析其中的@Import(AsyncConfigurationSelector.class),并把AsyncConfigurationSelector的实例注入Spring容器,其代码如下所示。

代码语言:javascript复制
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
    private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
            "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
  
    @Override
    @Nullable
    public String[] selectImports(AdviceMode adviceMode) {
        switch (adviceMode) {
            case PROXY:
                return new String[] {ProxyAsyncConfiguration.class.getName()};
            case ASPECTJ:
                return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
            default:
                return null;
        }
    }
}

AsyncConfigurationSelector实现了ImportSelector接口的selectImports方法,根据AdviceMode参数返回需要导入到Spring容器的Bean的全路径包名。该方法会在ConfigurationClassPostProcessor中的ConfigurationClassParser类中调用。默认情况下的adviceMode为PROXY,所以会把ProxyAsyncConfiguration的实例注入Spring容器。

ProxyAsyncConfiguration的代码如下所示。

代码语言:javascript复制
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

    @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
        Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
        AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
        bpp.configure(this.executor, this.exceptionHandler);
        Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
        if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
            bpp.setAsyncAnnotationType(customAsyncAnnotation);
        }
        bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTarget
Class"));
        bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
        return bpp;
    }

}

如上代码ProxyAsyncConfiguration的asyncAdvisor方法添加了@Bean注解,所以该方法返回的Bean也会被注入Spring容器。该方法创建了AsyncAnnotationBean PostProcessor处理器,所以AsyncAnnotationBeanPostProcessor的一个实例会被注入Spring容器中,由于其实现了BeanFactoryAware接口,所以Spring框架会调用其setBeanFactory(BeanFactory beanFactory)方法把Spring BeanFactory(存放bean的容器)注入该Bean,setBeanFactory方法代码如下所示。

代码语言:javascript复制
public void setBeanFactory(BeanFactory beanFactory) {
    super.setBeanFactory(beanFactory);

    AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
    if (this.asyncAnnotationType != null) {
        advisor.setAsyncAnnotationType(this.asyncAnnotationType);
    }
    advisor.setBeanFactory(beanFactory);
    this.advisor = advisor;
}

如上代码创建了一个AsyncAnnotationAdvisor的实例并保存到了AsyncAnnotation BeanPostProcessor的advisor变量。Spring中每个AsyncAnnotationAdvisor都包含一个Advice(切面逻辑)和一个PointCut(切点),也就是会对符合PointCut的方法使用Advice进行功能增强,对应Advice和PointCut是在AsyncAnnotationAdvisor构造函数内创建的。

代码语言:javascript复制
public AsyncAnnotationAdvisor(
        @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

    //6.1.异步注解类型
    Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
    asyncAnnotationTypes.add(Async.class);
    try {
        asyncAnnotationTypes.add((Class<? extends Annotation>)
                ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
    }
    catch (ClassNotFoundException ex) {
    }
    //6.2创建切面逻辑
    this.advice = buildAdvice(executor, exceptionHandler);
    //6.3创建切点
    this.pointcut = buildPointcut(asyncAnnotationTypes);
}

如上代码6.1收集注解@Async和@javax.ejb.Asynchronous到asyncAnnotationTypes,代码6.2则创建Advice,其代码如下所示。

代码语言:javascript复制
protected Advice buildAdvice(
        @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

    AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
    interceptor.configure(executor, exceptionHandler);
    return interceptor;
}

由上述代码可知,这里创建了AnnotationAsyncExecutionInterceptor拦截器作为切面逻辑。下面看看代码6.3如何创建切点。

代码语言:javascript复制
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
    ComposablePointcut result = null;
    for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
        Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
        Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
        if (result == null) {
            result = new ComposablePointcut(cpc);
        }
        else {
            result.union(cpc);
        }
        result = result.union(mpc);
    }
    return (result != null ? result : Pointcut.TRUE);
}

在上述代码中使用收集的注解集合asyncAnnotationTypes,并在每个注解处创建了一个AnnotationMatchingPointcut作为切点,AnnotationMatchingPointcut内部的AnnotationClassFilter的方法matches则作为某个方法是否满足切点的条件,具体代码如下所示。

代码语言:javascript复制
public boolean matches(Class<?> clazz) {
    return (this.checkInherited ? AnnotatedElementUtils.hasAnnotation(clazz, this.annotationType) :
            clazz.isAnnotationPresent(this.annotationType));
}

由如上代码可知,判断方法通过是否有注解@Async为依据来判断方法是否符合切点。

到此我们把AsyncAnnotationBeanPostProcessor的setBeanFactory(BeanFactory bean-Factory)方法逻辑讲解完毕了,其内部保存了一个AsyncAnnotationAdvisor对象用来对Spring容器中符合条件(这里为含有@Async注解的方法的Bean)的Bean的方法进行功能增强,下面我们看看AsyncAnnotationBeanPostProcessor的postProcess AfterInitialization方法是如何对这些符合条件的Bean进行代理的。

代码语言:javascript复制
public Object postProcessAfterInitialization(Object bean, String beanName) {
    ...

    if (isEligible(bean, beanName)) {
        //7.1
        ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
        if (!proxyFactory.isProxyTargetClass()) {
            evaluateProxyInterfaces(bean.getClass(), proxyFactory);
        }
        //7.2 设置拦截器
        proxyFactory.addAdvisor(this.advisor);
        customizeProxyFactory(proxyFactory);
        //7.3 获取代理类
        return proxyFactory.getProxy(getProxyClassLoader());
    }

    // No proxy needed.
    return bean;
}

如上代码7.1使用prepareProxyFactory创建了代理工厂,其代码如下所示。

代码语言:javascript复制
protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
    ProxyFactory proxyFactory = new ProxyFactory();
    proxyFactory.copyFrom(this);
    proxyFactory.setTarget(bean);
    return proxyFactory;
}

代码7.2则设置在其setBeanFactory方法内创建的AsyncAnnotationAdvisor对象作为Advisor,代码7.3从代理工厂获取代理后的Bean实例并返回到Spring容器,所以当我们调用含有@Async注解的Bean的方法时候,实际调用的是被代理后的Bean。

当我们调用被代理的类的方法时,代理类内部会先使用AsyncAnnotationAdvisor中的PointCut进行比较,看其是否符合切点条件(方法是否含有@Async)注解,如果不符合则直接调用被代理的对象的原生方法,否则调用AsyncAnnotationAdvisor内部的AnnotationAsyncExecutionInterceptor进行拦截异步处理。

在了解添加@EnableAsync注解后会发生什么后,下面我们来看看当添加标签<task:annotation-driven/>开启异步处理时,背后又发生了什么?在Spring中对于标签<XXX:/>总是会存在名称为XXXTaskNamespaceHandler的处理器负责该标签的解析,所以对于标签,自然存在TaskNamespaceHandler处理器负责其解析,其代码如下所示。

代码语言:javascript复制
public class TaskNamespaceHandler extends NamespaceHandlerSupport {
    @Override
    public void init() {
        this.registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
        this.registerBeanDefinitionParser("executor", new ExecutorBeanDefinitionParser());
        this.registerBeanDefinitionParser("scheduled-tasks", new ScheduledTasksBeanDefinitionParser());
        this.registerBeanDefinitionParser("scheduler", new SchedulerBeanDefinitionParser());
    }
}

由如上代码可知,<task:annotation-driven/>是使用AnnotationDrivenBeanDefinitionParser来进行解析的,下面我们看看其parse方法。

代码语言:javascript复制
public class AnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser {
...
    @Override
    @Nullable
    public BeanDefinition parse(Element element, ParserContext parserContext) {
        Object source = parserContext.extractSource(element);

        ...
        //8.1 
        String mode = element.getAttribute("mode");
        if ("aspectj".equals(mode)) {
            // mode="aspectj"
            registerAsyncExecutionAspect(element, parserContext);
        }
        else {
            //8.2 mode="proxy"
            if (registry.containsBeanDefinition(TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)) {
                parserContext.getReaderContext().error(
                        "Only one AsyncAnnotationBeanPostProcessor may exist within the context.", source);
            }
            else {
                BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(
                        "org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor");
                builder.getRawBeanDefinition().setSource(source);
                String executor = element.getAttribute("executor");
                if (StringUtils.hasText(executor)) {
                    builder.addPropertyReference("executor", executor);
                }
                String exceptionHandler = element.getAttribute("exception-handler");
                if (StringUtils.hasText(exceptionHandler)) {
                    builder.addPropertyReference("exceptionHandler", exceptionHandler);
                }
                if (Boolean.valueOf(element.getAttribute(AopNamespaceUtils.PROXY_TARGET_CLASS_ATTRIBUTE))) {
                    builder.addPropertyValue("proxyTargetClass", true);
                }
                registerPostProcessor(parserContext, builder, TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME);
            }
        }

    
        //8.3 Finally register the composite component.
        parserContext.popAndRegisterContainingComponent();

        return null;
    }
}

由如上代码可知,其主要是用来创建AsyncAnnotationBeanPostProcessor在Spring容器中的元数据定义,并注册到Spring容器中,剩下的流程就与基于@EnableAsync注解开启异步处理的流程一样了。

小结

我们梳理如何使用Spring框架中的@Async进行异步处理,以及其内部如何使用代理的方式来实现,并且可知使用@Async实现异步编程属于声明式编程,一般情况下不需要我们显式创建线程池并提交任务到线程池,这大大减轻了的负担

好文推荐

一文彻底讲透@Async注解的原理和使用方法

0 人点赞