Spring异步编程

2021-04-27 20:14:17 浏览数 (1)

一、背景

在很多场景中,业务操作完成后会完成一些收尾操作,并不希望实时等待其实时返回结果,甚至不关心执行成功与否,比如:

  • 下单完成后给用户发送短信
  • 流程审批完成后发送邮件通知

或者一些查询操作需要调用多个二方或者三方服务组装返回结果,并且这些调用之前没有依赖关系,比如某电商平台退货详情需要展示订单信息、商品信息、用户详细信息等.

这些场景都可以考虑使用异步编程,所谓异步编程,就是不使用业务主线程,利用线程池或者其他套件开启新的线程完成后续操作,针对不关心执行结果的场景直接使用新线程完成后续业务,主线程直接返回调用,对于关心执行结果的场景,调用后返回多线程句柄,等多线程执行完成后由业务主线程统一组装结果并返回.

二、Spring异步编程介绍

spring3.1版本开始提供了开箱即用的异步编程套件,相关实现都放在spring-context模块,不需要引入其他额外的包,在配置类或者应用启动门面类上添加@EnableAsync即可开启异步化能力.

spring异步编程的实现依赖于Aop和动态代理,其具体实现此处不做赘述,简单描述一下spring异步编程用到的几个核心概念:

  • 切入点(Pointcut):用白话来说,spring要对哪些功能做增强处理,要么是表达式,要么是注解,他们所代表的位置就是切入点,就本篇而言就是做异步化的位置.
  • 通知(Advice):对于满足切入点的程序做个性化增强处理的动作,spring异步编程中就是用线程池处理@Async注解的方法.
  • 增强器(Advisor): 切入点和通知一起组成了增强器,也就是知道了在哪切入,也知道怎么切入,还需要一个角色去做这件事.
  • 动态代理: 基于被代理的类,在程序启动时生成代理对象并将增强逻辑添加进去,常用的有jdk动态代理和cglib动态代理.

基于前边几个概念,spring异步即是在应用启动时扫描@Async注解的类或者方法,生成代理类,然后将多线程处理能力嵌入进去.

三、异步编程接入

1.开启异步能力

在应用启动类添加@EnableAsync注解:

代码语言:javascript复制
@SpringBootApplication
@EnableAsync
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

2.添加异步注解

在需要实现异步化的方法上添加@Async注解:

代码语言:javascript复制
@Slf4j
@Service
public class TestBuzz {
    @Async
    public void testAsync() {
        log.info("TestBuzz.testAsync thread={}",Thread.currentThread().getName());
    }
}

3.模拟异步调用

代码语言:javascript复制
@GetMapping("/test_async")
public IResp testAsync() {
    log.info("TestController.testAsync thread={}",Thread.currentThread().getName());
    //异步化调用
    this.testBuzz.testAsync();
    return IResp.getSuccessResult();
}

启动并模拟请求:

代码语言:javascript复制
curl http://localhost:8088/test_async

就这么简单,我们通过两个注解就完成了异步编程.

四、原理&源码解析

从前两节的介绍中我们知道,spring利用Aop和动态代理在@Async标注的类生成代理并织入了多线程处理能力,那么接下来我们从源码层面分析一下其实现原理.

开启异步化能力时序图:

按照时序图从头到尾分析一下,并重点介绍其中涉及的几个类的实现.

代码语言:javascript复制
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

  Class<? extends Annotation> annotation() default Annotation.class;

  boolean proxyTargetClass() default false;

  AdviceMode mode() default AdviceMode.PROXY;

  int order() default Ordered.LOWEST_PRECEDENCE;
}

annotation表示使用异步的注解,默认是@Async和EJB 3.1的@javax.ejb.Asynchronou,当然用户也可以提供自定义注解.

proxyTargetClass表示是否基于需要代理的类创建子类,仅在模式设置为AdviceMode.PROXY时适用,默认是false,需要注意的是将此属性设置为true将影响所有需要代理的Spring托管bean,而不仅仅是标记有@Async的bean。例如,其他标有Spring的@Transactional批注的bean将同时升级为子类代理.

mode表示使用哪种通知模式,默认是AdviceMode.PROXY,需要注意代理模式仅允许通过代理拦截调用,同一类中的本地调用无法以这种方式被拦截;在本地调用中,此类方法上的Async注释将被忽略,因为Spring的拦截器甚至不会在这种运行时场景中起作用.如果需要拦截本地调用或者其他更高级的拦截诉求,可以考虑考虑将其切换为AdviceMode.ASPECTJ.

order代表AsyncAnnotationBeanPostProcessor的顺序,默认值是最低,以便生成代理的时候最贴近代理目标.

最重要的是该注解导入了AsyncConfigurationSelector类,毫无疑问AsyncConfigurationSelector是开启异步能力配置的入口.

代码语言:javascript复制
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

  private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
      "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
  @Override
  @Nullable
  public String[] selectImports(AdviceMode adviceMode) {
    switch (adviceMode) {
      case PROXY:
        return new String[] {ProxyAsyncConfiguration.class.getName()};
      case ASPECTJ:
        return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
      default:
        return null;
    }
  }
}

AsyncConfigurationSelector继承自AdviceModeImportSelector,根据代理模式选择不同的配置,默认我们使用AdviceMode.PROXY,直接看ProxyAsyncConfiguration实现.

代码语言:javascript复制
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
  @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
    Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
    AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
    bpp.configure(this.executor, this.exceptionHandler);
    Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
    if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
      bpp.setAsyncAnnotationType(customAsyncAnnotation);
    }
    bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
    bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
    return bpp;
  }
}

ProxyAsyncConfiguration继承自AbstractAsyncConfiguration,其将@EnableAsync注解属性解析出来备用,并将异步化配置注入进来.

代码语言:javascript复制
@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
  if (CollectionUtils.isEmpty(configurers)) {
    return;
  }
  if (configurers.size() > 1) {
    throw new IllegalStateException("Only one AsyncConfigurer may exist");
  }
  AsyncConfigurer configurer = configurers.iterator().next();
  this.executor = configurer::getAsyncExecutor;
  this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
}

这里可以看出,用户可以实现AsyncConfigurer接口来使用自定义线程池和异常处理器,回到AbstractAsyncConfiguration,创建了一个AsyncAnnotationBeanPostProcessor类型的bean并注入容器,并且把角色定义成基础设施,不向外提供服务,看一下AsyncAnnotationBeanPostProcessor继承关系:

从继承关系来看,这个类有很多身份信息并且拥有很多能力,实现了BeanPostProcessor接口我们暂且将其定义成一个后置处理器,实现了AopInfrastructBean接口将不会被Aop处理,继承了ProxyProcessorSuppor又拥有了代理处理相关能力,实现了BeanFactoryAware拥有了bean管理能力,看一下其代码实现:

代码语言:javascript复制
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
  public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME =
      AnnotationAsyncExecutionInterceptor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;
  @Nullable
  private Supplier<Executor> executor;
  @Nullable
  private Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
  @Nullable
  private Class<? extends Annotation> asyncAnnotationType;
  public AsyncAnnotationBeanPostProcessor() {
    setBeforeExistingAdvisors(true);
  }
  public void configure(
      @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
    this.executor = executor;
    this.exceptionHandler = exceptionHandler;
  }
  public void setExecutor(Executor executor) {
    this.executor = SingletonSupplier.of(executor);
  }
  public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
    this.exceptionHandler = SingletonSupplier.of(exceptionHandler);
  }
  public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
    Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
    this.asyncAnnotationType = asyncAnnotationType;
  }
  @Override
  public void setBeanFactory(BeanFactory beanFactory) {
    super.setBeanFactory(beanFactory);
    AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
    if (this.asyncAnnotationType != null) {
      advisor.setAsyncAnnotationType(this.asyncAnnotationType);
    }
    advisor.setBeanFactory(beanFactory);
    this.advisor = advisor;
  }
}

spring管理的bean初始化过程执行顺序BeanFactoryAware是在后置处理器BeanPostProcessor之前,我们先分析setBeanFactory方法,该方法调用父类实现先把BeanFactory注入进来,然后创建了一个增强器AsyncAnnotationAdvisor(给后置处理器postProcessAfterInitialization方法备用),看一下继承关系:

接着看AsyncAnnotationAdvisor构造器:

代码语言:javascript复制
public AsyncAnnotationAdvisor(
    @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

  Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
  asyncAnnotationTypes.add(Async.class);
  try {
    asyncAnnotationTypes.add((Class<? extends Annotation>)
        ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
  }
  catch (ClassNotFoundException ex) {
    // If EJB 3.1 API not present, simply ignore.
  }
  this.advice = buildAdvice(executor, exceptionHandler);
  this.pointcut = buildPointcut(asyncAnnotationTypes);
}

如同我们前边所说,增强器由advice和pointcut组成,这里分别构建了通知和切入点,先看构造通知:

代码语言:javascript复制
protected Advice buildAdvice(
    @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
  AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
  interceptor.configure(executor, exceptionHandler);
  return interceptor;
}

构建通知用的是AnnotationAsyncExecutionInterceptor,看一下继承关系:

本质上是一个MethodInterceptor,执行拦截操作的时候调用invoke方法:

代码语言:javascript复制
public Object invoke(final MethodInvocation invocation) throws Throwable {
  Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
  Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
  final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

  AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
  if (executor == null) {
    throw new IllegalStateException(
        "No executor specified and no default executor set on AsyncExecutionInterceptor either");
  }
  Callable<Object> task = () -> {
    try {
      Object result = invocation.proceed();
      if (result instanceof Future) {
        return ((Future<?>) result).get();
      }
    }
    catch (ExecutionException ex) {
      handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
    }
    catch (Throwable ex) {
      handleError(ex, userDeclaredMethod, invocation.getArguments());
    }
    return null;
  };
  return doSubmit(task, executor, invocation.getMethod().getReturnType());
}

该方法先获取AsyncTaskExecutor异步任务执行器,简单理解为线程池,然后在线程池中执行异步逻辑,继续看determineAsyncExecutor获取线程池:

代码语言:javascript复制
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
  Executor targetExecutor;
  String qualifier = getExecutorQualifier(method);
  if (StringUtils.hasLength(qualifier)) {
    targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
  }
  else {
    targetExecutor = this.defaultExecutor.get();
  }
  if (targetExecutor == null) {
    return null;
  }
  executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
      (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
  this.executors.put(method, executor);
}
return executor;
}

先从缓存中获取,如果获取到直接返回,否则如果@Async注解有指定线程池就根据名字获取,否则获取默认线程池.

接着看线程池提交异步操作doSubmit:

代码语言:javascript复制
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
  if (CompletableFuture.class.isAssignableFrom(returnType)) {
    return CompletableFuture.supplyAsync(() -> {
      try {
        return task.call();
      }
      catch (Throwable ex) {
        throw new CompletionException(ex);
      }
    }, executor);
  }
  else if (ListenableFuture.class.isAssignableFrom(returnType)) {
    return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
  }
  else if (Future.class.isAssignableFrom(returnType)) {
    return executor.submit(task);
  }
  else {
    executor.submit(task);
    return null;
  }
}

可以看出支持异步方法返回结果为CompletableFuture、ListenableFuture和Future的有返回值的操作,其他返回类型或者返回类型为void都当做无返回值异步提交.

回到前边构造切入点操作:

代码语言:javascript复制
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
  ComposablePointcut result = null;
  for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
    Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
    Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
    if (result == null) {
      result = new ComposablePointcut(cpc);
    }
    else {
      result.union(cpc);
    }
    result = result.union(mpc);
  }
  return (result != null ? result : Pointcut.TRUE);
}

方法中构造了两个AnnotationMatchingPointcut,一个匹配方法切入点,另一个是匹配类切入点,然后做了union操作构造了一个ComposablePointcut混合切入点,只要满足类或者方法上带有@Async注解都符合切入规则,这个切入点在AsyncAnnotationBeanPostProcessor后置处理器构造代理类会用到.

前边分析了setBeanFactory构造增强器的操作,我们继续分析后置处理器的postProcessAfterInitialization操作,先看代码实现:

代码语言:javascript复制
public Object postProcessAfterInitialization(Object bean, String beanName) {
  if (this.advisor == null || bean instanceof AopInfrastructureBean) {
    // Ignore AOP infrastructure such as scoped proxies.
    return bean;
  }
  if (bean instanceof Advised) {
    Advised advised = (Advised) bean;
    if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
      // Add our local Advisor to the existing proxy's Advisor chain...
      if (this.beforeExistingAdvisors) {
        advised.addAdvisor(0, this.advisor);
      }
      else {
        advised.addAdvisor(this.advisor);
      }
      return bean;
    }
  }
  if (isEligible(bean, beanName)) {
    ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
    if (!proxyFactory.isProxyTargetClass()) {
      evaluateProxyInterfaces(bean.getClass(), proxyFactory);
    }
    proxyFactory.addAdvisor(this.advisor);
    customizeProxyFactory(proxyFactory);
    return proxyFactory.getProxy(getProxyClassLoader());
  }
  // No proxy needed.
  return bean;
}

如果增强器为null或者目标bean是AopInfrastructureBean基础组件类型直接放过,如果bean是待通知对象切满足该Advisor的通知条件,直接将该增强器添加到待通知对象的增强器列表中,否则如果目标bean满足该增强器的切入条件,利用动态代理生成代理类并将该Advisor添加到其增强器列表返回.

这段代码是动态代理生成代理类并织入通知逻辑的核心点,我们主要分析isEligible和生成代理的逻辑,先分析是否满足切入逻辑的方法isEligible:

代码语言:javascript复制
protected boolean isEligible(Class<?> targetClass) {
  Boolean eligible = this.eligibleBeans.get(targetClass);
  if (eligible != null) {
    return eligible;
  }
  if (this.advisor == null) {
    return false;
  }
  eligible = AopUtils.canApply(this.advisor, targetClass);
  this.eligibleBeans.put(targetClass, eligible);
  return eligible;
}

先从缓存中获取改bean是否有被增强的资格,如果已被缓存直接返回缓存结果,否则如果增强器为null,则返回无资格,最后调用AopUtils.canApply检查目标类是否满足Advisor切入的规则,继续看AopUtils.canApply实现:

代码语言:javascript复制
public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {
  if (advisor instanceof IntroductionAdvisor) {
    return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
  }
  else if (advisor instanceof PointcutAdvisor) {
    PointcutAdvisor pca = (PointcutAdvisor) advisor;
    return canApply(pca.getPointcut(), targetClass, hasIntroductions);
  }
  else {
    // It doesn't have a pointcut so we assume it applies.
    return true;
  }
}

根据Advisor的类型检查目标类是否满足切入资格,和明显前边AsyncAnnotationBeanPostProcessor构造的是PointcutAdvisor类型的增强器,继续看canApply实现:

代码语言:javascript复制
public static boolean canApply(Pointcut pc, Class<?> targetClass, boolean hasIntroductions) {
  Assert.notNull(pc, "Pointcut must not be null");
  if (!pc.getClassFilter().matches(targetClass)) {
    return false;
  }
  MethodMatcher methodMatcher = pc.getMethodMatcher();
  if (methodMatcher == MethodMatcher.TRUE) {
    // No need to iterate the methods if we're matching any method anyway...
    return true;
  }
  IntroductionAwareMethodMatcher introductionAwareMethodMatcher = null;
  if (methodMatcher instanceof IntroductionAwareMethodMatcher) {
    introductionAwareMethodMatcher = (IntroductionAwareMethodMatcher) methodMatcher;
  }
  Set<Class<?>> classes = new LinkedHashSet<>();
  if (!Proxy.isProxyClass(targetClass)) {
    classes.add(ClassUtils.getUserClass(targetClass));
  }
  classes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetClass));
  for (Class<?> clazz : classes) {
    Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz);
    for (Method method : methods) {
      if (introductionAwareMethodMatcher != null ?
          introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) :
          methodMatcher.matches(method, targetClass)) {
        return true;
      }
    }
  }
  return false;
}

其实简单来说,就是检查目标类上或者方法上是否有@Async注解,如果有就返回满足切入规则,否则返回不符合切入规则.

回到前边后置处理器postProcessAfterInitialization方法,如果目标bean满足切入规则,则使用代理工厂ProxyFactory生成代理对象并返回:

代码语言:javascript复制
if (isEligible(bean, beanName)) {
  ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
  if (!proxyFactory.isProxyTargetClass()) {
    evaluateProxyInterfaces(bean.getClass(), proxyFactory);
  }
  proxyFactory.addAdvisor(this.advisor);
  customizeProxyFactory(proxyFactory);
  return proxyFactory.getProxy(getProxyClassLoader());
}

先生成代理工厂,然后检查给定bean类上的接口,并将它们应用于ProxyFactory(如果不适用,退化成直接代理目标类),将增强器添加到代理工厂中,最后由代理工厂生成代理对象,接着看生成代理类的实现:

代码语言:javascript复制
public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException {
  if (config.isOptimize() || config.isProxyTargetClass() || hasNoUserSuppliedProxyInterfaces(config)) {
    Class<?> targetClass = config.getTargetClass();
    if (targetClass == null) {
      throw new AopConfigException("TargetSource cannot determine target class: "  
          "Either an interface or a target is required for proxy creation.");
    }
    if (targetClass.isInterface() || Proxy.isProxyClass(targetClass)) {
      return new JdkDynamicAopProxy(config);
    }
    return new ObjenesisCglibAopProxy(config);
  }
  else {
    return new JdkDynamicAopProxy(config);
  }
}

先创建Aop代理,如果目标类是接口或者目标类是代理类,使用jdk动态代理,否则使用cglib动态代理,两种代理区别这里不展开细讲,简单分析一下其构造代理类的原理,先看JdkDynamicAopProxy:

代码语言:javascript复制
public Object getProxy(@Nullable ClassLoader classLoader) {
  if (logger.isTraceEnabled()) {
    logger.trace("Creating JDK dynamic proxy: "   this.advised.getTargetSource());
  }
  Class<?>[] proxiedInterfaces = AopProxyUtils.completeProxiedInterfaces(this.advised, true);
  findDefinedEqualsAndHashCodeMethods(proxiedInterfaces);
  return Proxy.newProxyInstance(classLoader, proxiedInterfaces, this);
}

到这里我们看到了熟悉的jdk动态代理实现Proxy.newProxyInstance,寻找需要代理的接口,然后生成接口的动态代理对象,这里需要注意一下,JdkDynamicAopProxy实现了InvocationHandler接口,JDK动态代理会在内存中生成一个类名为Proxy0形式的代理类,调用Proxy0方法,jvm内部调用类Proxy.InvocationHandler.invoke方法,也就是JdkDynamicAopProxy实现InvocationHandler接口的invoke方法:

代码语言:javascript复制
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  MethodInvocation invocation;
  Object oldProxy = null;
  boolean setProxyContext = false;
  TargetSource targetSource = this.advised.targetSource;
  Object target = null;
  try {
    if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) {
      return equals(args[0]);
    }
    else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) {
      return hashCode();
    }
    else if (method.getDeclaringClass() == DecoratingProxy.class) {
      return AopProxyUtils.ultimateTargetClass(this.advised);
    }
    else if (!this.advised.opaque && method.getDeclaringClass().isInterface() &&
        method.getDeclaringClass().isAssignableFrom(Advised.class)) {
      return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args);
    }
    Object retVal;
    if (this.advised.exposeProxy) {
      oldProxy = AopContext.setCurrentProxy(proxy);
      setProxyContext = true;
    }
    target = targetSource.getTarget();
    Class<?> targetClass = (target != null ? target.getClass() : null);
    List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
    if (chain.isEmpty()) {
      Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
      retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
    }
    else {
      invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
      retVal = invocation.proceed();
    }
    Class<?> returnType = method.getReturnType();
    if (retVal != null && retVal == target &&
        returnType != Object.class && returnType.isInstance(proxy) &&
        !RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) {
      retVal = proxy;
    }
    else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) {
      throw new AopInvocationException(
          "Null return value from advice does not match primitive return type for: "   method);
    }
    return retVal;
  }
  finally {
    if (target != null && !targetSource.isStatic()) {
      targetSource.releaseTarget(target);
    }
    if (setProxyContext) {
      AopContext.setCurrentProxy(oldProxy);
    }
  }
}

先取出被织入的拦截逻辑,本篇中就是AnnotationAsyncExecutionInterceptor,然后指定方法调用,也就是代理类的调用,本质上就是先调用增强逻辑和最原始被代理类的方法的调用.

然后我们再看一下cglib动态代理实现CglibAopProxy:

代码语言:javascript复制
public Object getProxy(@Nullable ClassLoader classLoader) {
  try {
    Class<?> rootClass = this.advised.getTargetClass();
    Assert.state(rootClass != null, "Target class must be available for creating a CGLIB proxy");
    Class<?> proxySuperClass = rootClass;
    if (ClassUtils.isCglibProxyClass(rootClass)) {
      proxySuperClass = rootClass.getSuperclass();
      Class<?>[] additionalInterfaces = rootClass.getInterfaces();
      for (Class<?> additionalInterface : additionalInterfaces) {
        this.advised.addInterface(additionalInterface);
      }
    }
    validateClassIfNecessary(proxySuperClass, classLoader);
    Enhancer enhancer = createEnhancer();
    if (classLoader != null) {
      enhancer.setClassLoader(classLoader);
      if (classLoader instanceof SmartClassLoader &&
          ((SmartClassLoader) classLoader).isClassReloadable(proxySuperClass)) {
        enhancer.setUseCache(false);
      }
    }
    enhancer.setSuperclass(proxySuperClass);
    enhancer.setInterfaces(AopProxyUtils.completeProxiedInterfaces(this.advised));
    enhancer.setNamingPolicy(SpringNamingPolicy.INSTANCE);
    enhancer.setStrategy(new ClassLoaderAwareUndeclaredThrowableStrategy(classLoader));
    Callback[] callbacks = getCallbacks(rootClass);
    Class<?>[] types = new Class<?>[callbacks.length];
    for (int x = 0; x < types.length; x  ) {
      types[x] = callbacks[x].getClass();
    }
    enhancer.setCallbackFilter(new ProxyCallbackFilter(
        this.advised.getConfigurationOnlyCopy(), this.fixedInterceptorMap, this.fixedInterceptorOffset));
    enhancer.setCallbackTypes(types);
    return createProxyClassAndInstance(enhancer, callbacks);
  }
  catch (CodeGenerationException | IllegalArgumentException ex) {
    throw new AopConfigException("Could not generate CGLIB subclass of "   this.advised.getTargetClass()  
        ": Common causes of this problem include using a final class or a non-visible class",
        ex);
  }
  catch (Throwable ex) {
    // TargetSource.getTarget() failed
    throw new AopConfigException("Unexpected AOP exception", ex);
  }
}

我们也看到了熟悉的cglib动态代理实现Enhancer,CGLB动态代理会在内存生成一个类名为?EnhancerByCGLIB?b3361405形式的代理类,调用xxx?EnhancerByCGLIB?b3361405代理类方法,内部调用MethodInterceptor.intercept(),看一下getCallbacks方法,也即是将被代理类的拦截调用装配成MethodInterceptor的逻辑:

代码语言:javascript复制
private Callback[] getCallbacks(Class<?> rootClass) throws Exception {
  boolean exposeProxy = this.advised.isExposeProxy();
  boolean isFrozen = this.advised.isFrozen();
  boolean isStatic = this.advised.getTargetSource().isStatic();
  Callback aopInterceptor = new DynamicAdvisedInterceptor(this.advised);
  Callback targetInterceptor;
  if (exposeProxy) {
    targetInterceptor = (isStatic ?
        new StaticUnadvisedExposedInterceptor(this.advised.getTargetSource().getTarget()) :
        new DynamicUnadvisedExposedInterceptor(this.advised.getTargetSource()));
  }
  else {
    targetInterceptor = (isStatic ?
        new StaticUnadvisedInterceptor(this.advised.getTargetSource().getTarget()) :
        new DynamicUnadvisedInterceptor(this.advised.getTargetSource()));
  }
  Callback targetDispatcher = (isStatic ?
      new StaticDispatcher(this.advised.getTargetSource().getTarget()) : new SerializableNoOp());
  Callback[] mainCallbacks = new Callback[] {
      aopInterceptor,  // for normal advice
      targetInterceptor,  // invoke target without considering advice, if optimized
      new SerializableNoOp(),  // no override for methods mapped to this
      targetDispatcher, this.advisedDispatcher,
      new EqualsInterceptor(this.advised),
      new HashCodeInterceptor(this.advised)
  };
  Callback[] callbacks;
  if (isStatic && isFrozen) {
    Method[] methods = rootClass.getMethods();
    Callback[] fixedCallbacks = new Callback[methods.length];
    this.fixedInterceptorMap = new HashMap<>(methods.length);

    // TODO: small memory optimization here (can skip creation for methods with no advice)
    for (int x = 0; x < methods.length; x  ) {
      List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(methods[x], rootClass);
      fixedCallbacks[x] = new FixedChainStaticTargetInterceptor(
          chain, this.advised.getTargetSource().getTarget(), this.advised.getTargetClass());
      this.fixedInterceptorMap.put(methods[x].toString(), x);
    }
    callbacks = new Callback[mainCallbacks.length   fixedCallbacks.length];
    System.arraycopy(mainCallbacks, 0, callbacks, 0, mainCallbacks.length);
    System.arraycopy(fixedCallbacks, 0, callbacks, mainCallbacks.length, fixedCallbacks.length);
    this.fixedInterceptorOffset = mainCallbacks.length;
  }
  else {
    callbacks = mainCallbacks;
  }
  return callbacks;
}

在此篇幅异步编程场景,调用代理类方法会直接调用到DynamicAdvisedInterceptor的intercept:

代码语言:javascript复制
public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
  Object oldProxy = null;
  boolean setProxyContext = false;
  Object target = null;
  TargetSource targetSource = this.advised.getTargetSource();
  try {
    if (this.advised.exposeProxy) {
      // Make invocation available if necessary.
      oldProxy = AopContext.setCurrentProxy(proxy);
      setProxyContext = true;
    }
    target = targetSource.getTarget();
    Class<?> targetClass = (target != null ? target.getClass() : null);
    List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
    Object retVal;
    if (chain.isEmpty() && Modifier.isPublic(method.getModifiers())) {
      Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
      retVal = methodProxy.invoke(target, argsToUse);
    }
    else {
      // We need to create a method invocation...
      retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
    }
    retVal = processReturnType(proxy, target, method, retVal);
    return retVal;
  }
  finally {
    if (target != null && !targetSource.isStatic()) {
      targetSource.releaseTarget(target);
    }
    if (setProxyContext) {
      // Restore old proxy.
      AopContext.setCurrentProxy(oldProxy);
    }
  }
}

先获取代理类对应方法的拦截器链,如果没有拦截器链且方法是public类型,直接调用代理方法返回,否则将方法连同拦截器链构造成CglibMethodInvocation并执行.

在JdkDynamicAopProxy和CglibAopProxy生成的代理类执行的过程都会调用到前边所说的AnnotationAsyncExecutionInterceptor类的invoke方法,也即是异步执行的逻辑.

jdk动态代理异步执行时序图:

Cglib代理异步执行时序图:

五、总结

从本篇第三节异步编程使用方式来看,spring异步编程接入特别简单,但是从第四节的原理和源码解析来看,其实现也挺复杂的,这就是spring的强大之处,把困难留给自己,把便利留给使用者,把一些复杂的实现对用户做到透明化.

从spring异步编程的源码来看,其使用了很多技术和功能点:

  • 导入配置:AsyncConfigurationSelector
  • 后置处理器:AsyncAnnotationBeanPostProcessor
  • Aop编程:AsyncAnnotationAdvisor
  • 线程池:AsyncTaskExecutor
  • 拦截器: AnnotationAsyncExecutionInterceptor
  • 切入点: ComposablePointcut/AnnotationMatchingPointcut
  • 工厂模式: BeanFactory和ProxyFactory
  • 动态代理: JdkDynamicAopProxy和CglibAopProxy
  • 代理类调用委托处理: jdk动态代理委托给JdkDynamicAopProxy.invoke,cglib动态代理类委托给DynamicAdvisedInterceptor.intercept

由于篇幅问题,中间还有很多细节没覆盖到,比如说获取线程池的逻辑设计也比较巧妙,感兴趣的也可以深入研究一下:

代码语言:javascript复制
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
  if (beanFactory != null) {
    try {
      return beanFactory.getBean(TaskExecutor.class);
    }
    catch (NoUniqueBeanDefinitionException ex) {
      logger.debug("Could not find unique TaskExecutor bean", ex);
      try {
        return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
      }
      catch (NoSuchBeanDefinitionException ex2) {
      }
    }
    catch (NoSuchBeanDefinitionException ex) {
      logger.debug("Could not find default TaskExecutor bean", ex);
      try {
        return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
      }
      catch (NoSuchBeanDefinitionException ex2) {
      }
    }
  }
  return null;
}

spring异步的使用主要记住两个点,2个注解和一个返回值,在启动类或者配置使用@EnableAsync开启异步,在需要异步调用的方法上添加@Async注解,异步支持的返回类型有CompletableFuture、ListenableFuture和Future和void.

0 人点赞