一、背景
在很多场景中,业务操作完成后会完成一些收尾操作,并不希望实时等待其实时返回结果,甚至不关心执行成功与否,比如:
- 下单完成后给用户发送短信
- 流程审批完成后发送邮件通知
或者一些查询操作需要调用多个二方或者三方服务组装返回结果,并且这些调用之前没有依赖关系,比如某电商平台退货详情需要展示订单信息、商品信息、用户详细信息等.
这些场景都可以考虑使用异步编程,所谓异步编程,就是不使用业务主线程,利用线程池或者其他套件开启新的线程完成后续操作,针对不关心执行结果的场景直接使用新线程完成后续业务,主线程直接返回调用,对于关心执行结果的场景,调用后返回多线程句柄,等多线程执行完成后由业务主线程统一组装结果并返回.
二、Spring异步编程介绍
spring3.1版本开始提供了开箱即用的异步编程套件,相关实现都放在spring-context模块,不需要引入其他额外的包,在配置类或者应用启动门面类上添加@EnableAsync即可开启异步化能力.
spring异步编程的实现依赖于Aop和动态代理,其具体实现此处不做赘述,简单描述一下spring异步编程用到的几个核心概念:
- 切入点(Pointcut):用白话来说,spring要对哪些功能做增强处理,要么是表达式,要么是注解,他们所代表的位置就是切入点,就本篇而言就是做异步化的位置.
- 通知(Advice):对于满足切入点的程序做个性化增强处理的动作,spring异步编程中就是用线程池处理@Async注解的方法.
- 增强器(Advisor): 切入点和通知一起组成了增强器,也就是知道了在哪切入,也知道怎么切入,还需要一个角色去做这件事.
- 动态代理: 基于被代理的类,在程序启动时生成代理对象并将增强逻辑添加进去,常用的有jdk动态代理和cglib动态代理.
基于前边几个概念,spring异步即是在应用启动时扫描@Async注解的类或者方法,生成代理类,然后将多线程处理能力嵌入进去.
三、异步编程接入
1.开启异步能力
在应用启动类添加@EnableAsync注解:
代码语言:javascript复制@SpringBootApplication
@EnableAsync
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
2.添加异步注解
在需要实现异步化的方法上添加@Async注解:
代码语言:javascript复制@Slf4j
@Service
public class TestBuzz {
@Async
public void testAsync() {
log.info("TestBuzz.testAsync thread={}",Thread.currentThread().getName());
}
}
3.模拟异步调用
代码语言:javascript复制@GetMapping("/test_async")
public IResp testAsync() {
log.info("TestController.testAsync thread={}",Thread.currentThread().getName());
//异步化调用
this.testBuzz.testAsync();
return IResp.getSuccessResult();
}
启动并模拟请求:
代码语言:javascript复制curl http://localhost:8088/test_async
就这么简单,我们通过两个注解就完成了异步编程.
四、原理&源码解析
从前两节的介绍中我们知道,spring利用Aop和动态代理在@Async标注的类生成代理并织入了多线程处理能力,那么接下来我们从源码层面分析一下其实现原理.
开启异步化能力时序图:
按照时序图从头到尾分析一下,并重点介绍其中涉及的几个类的实现.
代码语言:javascript复制@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
Class<? extends Annotation> annotation() default Annotation.class;
boolean proxyTargetClass() default false;
AdviceMode mode() default AdviceMode.PROXY;
int order() default Ordered.LOWEST_PRECEDENCE;
}
annotation表示使用异步的注解,默认是@Async和EJB 3.1的@javax.ejb.Asynchronou,当然用户也可以提供自定义注解.
proxyTargetClass表示是否基于需要代理的类创建子类,仅在模式设置为AdviceMode.PROXY时适用,默认是false,需要注意的是将此属性设置为true将影响所有需要代理的Spring托管bean,而不仅仅是标记有@Async的bean。例如,其他标有Spring的@Transactional批注的bean将同时升级为子类代理.
mode表示使用哪种通知模式,默认是AdviceMode.PROXY,需要注意代理模式仅允许通过代理拦截调用,同一类中的本地调用无法以这种方式被拦截;在本地调用中,此类方法上的Async注释将被忽略,因为Spring的拦截器甚至不会在这种运行时场景中起作用.如果需要拦截本地调用或者其他更高级的拦截诉求,可以考虑考虑将其切换为AdviceMode.ASPECTJ.
order代表AsyncAnnotationBeanPostProcessor的顺序,默认值是最低,以便生成代理的时候最贴近代理目标.
最重要的是该注解导入了AsyncConfigurationSelector类,毫无疑问AsyncConfigurationSelector是开启异步能力配置的入口.
代码语言:javascript复制public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
AsyncConfigurationSelector继承自AdviceModeImportSelector,根据代理模式选择不同的配置,默认我们使用AdviceMode.PROXY,直接看ProxyAsyncConfiguration实现.
代码语言:javascript复制@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
ProxyAsyncConfiguration继承自AbstractAsyncConfiguration,其将@EnableAsync注解属性解析出来备用,并将异步化配置注入进来.
代码语言:javascript复制@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
if (configurers.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
AsyncConfigurer configurer = configurers.iterator().next();
this.executor = configurer::getAsyncExecutor;
this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
}
这里可以看出,用户可以实现AsyncConfigurer接口来使用自定义线程池和异常处理器,回到AbstractAsyncConfiguration,创建了一个AsyncAnnotationBeanPostProcessor类型的bean并注入容器,并且把角色定义成基础设施,不向外提供服务,看一下AsyncAnnotationBeanPostProcessor继承关系:
从继承关系来看,这个类有很多身份信息并且拥有很多能力,实现了BeanPostProcessor接口我们暂且将其定义成一个后置处理器,实现了AopInfrastructBean接口将不会被Aop处理,继承了ProxyProcessorSuppor又拥有了代理处理相关能力,实现了BeanFactoryAware拥有了bean管理能力,看一下其代码实现:
代码语言:javascript复制public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME =
AnnotationAsyncExecutionInterceptor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;
@Nullable
private Supplier<Executor> executor;
@Nullable
private Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
@Nullable
private Class<? extends Annotation> asyncAnnotationType;
public AsyncAnnotationBeanPostProcessor() {
setBeforeExistingAdvisors(true);
}
public void configure(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
this.executor = executor;
this.exceptionHandler = exceptionHandler;
}
public void setExecutor(Executor executor) {
this.executor = SingletonSupplier.of(executor);
}
public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
this.exceptionHandler = SingletonSupplier.of(exceptionHandler);
}
public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
this.asyncAnnotationType = asyncAnnotationType;
}
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
}
spring管理的bean初始化过程执行顺序BeanFactoryAware是在后置处理器BeanPostProcessor之前,我们先分析setBeanFactory方法,该方法调用父类实现先把BeanFactory注入进来,然后创建了一个增强器AsyncAnnotationAdvisor(给后置处理器postProcessAfterInitialization方法备用),看一下继承关系:
接着看AsyncAnnotationAdvisor构造器:
代码语言:javascript复制public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
如同我们前边所说,增强器由advice和pointcut组成,这里分别构建了通知和切入点,先看构造通知:
代码语言:javascript复制protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
构建通知用的是AnnotationAsyncExecutionInterceptor,看一下继承关系:
本质上是一个MethodInterceptor,执行拦截操作的时候调用invoke方法:
代码语言:javascript复制public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
该方法先获取AsyncTaskExecutor异步任务执行器,简单理解为线程池,然后在线程池中执行异步逻辑,继续看determineAsyncExecutor获取线程池:
代码语言:javascript复制protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}
先从缓存中获取,如果获取到直接返回,否则如果@Async注解有指定线程池就根据名字获取,否则获取默认线程池.
接着看线程池提交异步操作doSubmit:
代码语言:javascript复制protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}
可以看出支持异步方法返回结果为CompletableFuture、ListenableFuture和Future的有返回值的操作,其他返回类型或者返回类型为void都当做无返回值异步提交.
回到前边构造切入点操作:
代码语言:javascript复制protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}
方法中构造了两个AnnotationMatchingPointcut,一个匹配方法切入点,另一个是匹配类切入点,然后做了union操作构造了一个ComposablePointcut混合切入点,只要满足类或者方法上带有@Async注解都符合切入规则,这个切入点在AsyncAnnotationBeanPostProcessor后置处理器构造代理类会用到.
前边分析了setBeanFactory构造增强器的操作,我们继续分析后置处理器的postProcessAfterInitialization操作,先看代码实现:
代码语言:javascript复制public Object postProcessAfterInitialization(Object bean, String beanName) {
if (this.advisor == null || bean instanceof AopInfrastructureBean) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
if (bean instanceof Advised) {
Advised advised = (Advised) bean;
if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
// Add our local Advisor to the existing proxy's Advisor chain...
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
}
else {
advised.addAdvisor(this.advisor);
}
return bean;
}
}
if (isEligible(bean, beanName)) {
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
return proxyFactory.getProxy(getProxyClassLoader());
}
// No proxy needed.
return bean;
}
如果增强器为null或者目标bean是AopInfrastructureBean基础组件类型直接放过,如果bean是待通知对象切满足该Advisor的通知条件,直接将该增强器添加到待通知对象的增强器列表中,否则如果目标bean满足该增强器的切入条件,利用动态代理生成代理类并将该Advisor添加到其增强器列表返回.
这段代码是动态代理生成代理类并织入通知逻辑的核心点,我们主要分析isEligible和生成代理的逻辑,先分析是否满足切入逻辑的方法isEligible:
代码语言:javascript复制protected boolean isEligible(Class<?> targetClass) {
Boolean eligible = this.eligibleBeans.get(targetClass);
if (eligible != null) {
return eligible;
}
if (this.advisor == null) {
return false;
}
eligible = AopUtils.canApply(this.advisor, targetClass);
this.eligibleBeans.put(targetClass, eligible);
return eligible;
}
先从缓存中获取改bean是否有被增强的资格,如果已被缓存直接返回缓存结果,否则如果增强器为null,则返回无资格,最后调用AopUtils.canApply检查目标类是否满足Advisor切入的规则,继续看AopUtils.canApply实现:
代码语言:javascript复制public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {
if (advisor instanceof IntroductionAdvisor) {
return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
}
else if (advisor instanceof PointcutAdvisor) {
PointcutAdvisor pca = (PointcutAdvisor) advisor;
return canApply(pca.getPointcut(), targetClass, hasIntroductions);
}
else {
// It doesn't have a pointcut so we assume it applies.
return true;
}
}
根据Advisor的类型检查目标类是否满足切入资格,和明显前边AsyncAnnotationBeanPostProcessor构造的是PointcutAdvisor类型的增强器,继续看canApply实现:
代码语言:javascript复制public static boolean canApply(Pointcut pc, Class<?> targetClass, boolean hasIntroductions) {
Assert.notNull(pc, "Pointcut must not be null");
if (!pc.getClassFilter().matches(targetClass)) {
return false;
}
MethodMatcher methodMatcher = pc.getMethodMatcher();
if (methodMatcher == MethodMatcher.TRUE) {
// No need to iterate the methods if we're matching any method anyway...
return true;
}
IntroductionAwareMethodMatcher introductionAwareMethodMatcher = null;
if (methodMatcher instanceof IntroductionAwareMethodMatcher) {
introductionAwareMethodMatcher = (IntroductionAwareMethodMatcher) methodMatcher;
}
Set<Class<?>> classes = new LinkedHashSet<>();
if (!Proxy.isProxyClass(targetClass)) {
classes.add(ClassUtils.getUserClass(targetClass));
}
classes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetClass));
for (Class<?> clazz : classes) {
Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz);
for (Method method : methods) {
if (introductionAwareMethodMatcher != null ?
introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) :
methodMatcher.matches(method, targetClass)) {
return true;
}
}
}
return false;
}
其实简单来说,就是检查目标类上或者方法上是否有@Async注解,如果有就返回满足切入规则,否则返回不符合切入规则.
回到前边后置处理器postProcessAfterInitialization方法,如果目标bean满足切入规则,则使用代理工厂ProxyFactory生成代理对象并返回:
代码语言:javascript复制if (isEligible(bean, beanName)) {
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
return proxyFactory.getProxy(getProxyClassLoader());
}
先生成代理工厂,然后检查给定bean类上的接口,并将它们应用于ProxyFactory(如果不适用,退化成直接代理目标类),将增强器添加到代理工厂中,最后由代理工厂生成代理对象,接着看生成代理类的实现:
代码语言:javascript复制public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException {
if (config.isOptimize() || config.isProxyTargetClass() || hasNoUserSuppliedProxyInterfaces(config)) {
Class<?> targetClass = config.getTargetClass();
if (targetClass == null) {
throw new AopConfigException("TargetSource cannot determine target class: "
"Either an interface or a target is required for proxy creation.");
}
if (targetClass.isInterface() || Proxy.isProxyClass(targetClass)) {
return new JdkDynamicAopProxy(config);
}
return new ObjenesisCglibAopProxy(config);
}
else {
return new JdkDynamicAopProxy(config);
}
}
先创建Aop代理,如果目标类是接口或者目标类是代理类,使用jdk动态代理,否则使用cglib动态代理,两种代理区别这里不展开细讲,简单分析一下其构造代理类的原理,先看JdkDynamicAopProxy:
代码语言:javascript复制public Object getProxy(@Nullable ClassLoader classLoader) {
if (logger.isTraceEnabled()) {
logger.trace("Creating JDK dynamic proxy: " this.advised.getTargetSource());
}
Class<?>[] proxiedInterfaces = AopProxyUtils.completeProxiedInterfaces(this.advised, true);
findDefinedEqualsAndHashCodeMethods(proxiedInterfaces);
return Proxy.newProxyInstance(classLoader, proxiedInterfaces, this);
}
到这里我们看到了熟悉的jdk动态代理实现Proxy.newProxyInstance,寻找需要代理的接口,然后生成接口的动态代理对象,这里需要注意一下,JdkDynamicAopProxy实现了InvocationHandler接口,JDK动态代理会在内存中生成一个类名为Proxy0形式的代理类,调用Proxy0方法,jvm内部调用类Proxy.InvocationHandler.invoke方法,也就是JdkDynamicAopProxy实现InvocationHandler接口的invoke方法:
代码语言:javascript复制public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
MethodInvocation invocation;
Object oldProxy = null;
boolean setProxyContext = false;
TargetSource targetSource = this.advised.targetSource;
Object target = null;
try {
if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) {
return equals(args[0]);
}
else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) {
return hashCode();
}
else if (method.getDeclaringClass() == DecoratingProxy.class) {
return AopProxyUtils.ultimateTargetClass(this.advised);
}
else if (!this.advised.opaque && method.getDeclaringClass().isInterface() &&
method.getDeclaringClass().isAssignableFrom(Advised.class)) {
return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args);
}
Object retVal;
if (this.advised.exposeProxy) {
oldProxy = AopContext.setCurrentProxy(proxy);
setProxyContext = true;
}
target = targetSource.getTarget();
Class<?> targetClass = (target != null ? target.getClass() : null);
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
if (chain.isEmpty()) {
Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
}
else {
invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
retVal = invocation.proceed();
}
Class<?> returnType = method.getReturnType();
if (retVal != null && retVal == target &&
returnType != Object.class && returnType.isInstance(proxy) &&
!RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) {
retVal = proxy;
}
else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) {
throw new AopInvocationException(
"Null return value from advice does not match primitive return type for: " method);
}
return retVal;
}
finally {
if (target != null && !targetSource.isStatic()) {
targetSource.releaseTarget(target);
}
if (setProxyContext) {
AopContext.setCurrentProxy(oldProxy);
}
}
}
先取出被织入的拦截逻辑,本篇中就是AnnotationAsyncExecutionInterceptor,然后指定方法调用,也就是代理类的调用,本质上就是先调用增强逻辑和最原始被代理类的方法的调用.
然后我们再看一下cglib动态代理实现CglibAopProxy:
代码语言:javascript复制public Object getProxy(@Nullable ClassLoader classLoader) {
try {
Class<?> rootClass = this.advised.getTargetClass();
Assert.state(rootClass != null, "Target class must be available for creating a CGLIB proxy");
Class<?> proxySuperClass = rootClass;
if (ClassUtils.isCglibProxyClass(rootClass)) {
proxySuperClass = rootClass.getSuperclass();
Class<?>[] additionalInterfaces = rootClass.getInterfaces();
for (Class<?> additionalInterface : additionalInterfaces) {
this.advised.addInterface(additionalInterface);
}
}
validateClassIfNecessary(proxySuperClass, classLoader);
Enhancer enhancer = createEnhancer();
if (classLoader != null) {
enhancer.setClassLoader(classLoader);
if (classLoader instanceof SmartClassLoader &&
((SmartClassLoader) classLoader).isClassReloadable(proxySuperClass)) {
enhancer.setUseCache(false);
}
}
enhancer.setSuperclass(proxySuperClass);
enhancer.setInterfaces(AopProxyUtils.completeProxiedInterfaces(this.advised));
enhancer.setNamingPolicy(SpringNamingPolicy.INSTANCE);
enhancer.setStrategy(new ClassLoaderAwareUndeclaredThrowableStrategy(classLoader));
Callback[] callbacks = getCallbacks(rootClass);
Class<?>[] types = new Class<?>[callbacks.length];
for (int x = 0; x < types.length; x ) {
types[x] = callbacks[x].getClass();
}
enhancer.setCallbackFilter(new ProxyCallbackFilter(
this.advised.getConfigurationOnlyCopy(), this.fixedInterceptorMap, this.fixedInterceptorOffset));
enhancer.setCallbackTypes(types);
return createProxyClassAndInstance(enhancer, callbacks);
}
catch (CodeGenerationException | IllegalArgumentException ex) {
throw new AopConfigException("Could not generate CGLIB subclass of " this.advised.getTargetClass()
": Common causes of this problem include using a final class or a non-visible class",
ex);
}
catch (Throwable ex) {
// TargetSource.getTarget() failed
throw new AopConfigException("Unexpected AOP exception", ex);
}
}
我们也看到了熟悉的cglib动态代理实现Enhancer,CGLB动态代理会在内存生成一个类名为?EnhancerByCGLIB?b3361405形式的代理类,调用xxx?EnhancerByCGLIB?b3361405代理类方法,内部调用MethodInterceptor.intercept(),看一下getCallbacks方法,也即是将被代理类的拦截调用装配成MethodInterceptor的逻辑:
代码语言:javascript复制private Callback[] getCallbacks(Class<?> rootClass) throws Exception {
boolean exposeProxy = this.advised.isExposeProxy();
boolean isFrozen = this.advised.isFrozen();
boolean isStatic = this.advised.getTargetSource().isStatic();
Callback aopInterceptor = new DynamicAdvisedInterceptor(this.advised);
Callback targetInterceptor;
if (exposeProxy) {
targetInterceptor = (isStatic ?
new StaticUnadvisedExposedInterceptor(this.advised.getTargetSource().getTarget()) :
new DynamicUnadvisedExposedInterceptor(this.advised.getTargetSource()));
}
else {
targetInterceptor = (isStatic ?
new StaticUnadvisedInterceptor(this.advised.getTargetSource().getTarget()) :
new DynamicUnadvisedInterceptor(this.advised.getTargetSource()));
}
Callback targetDispatcher = (isStatic ?
new StaticDispatcher(this.advised.getTargetSource().getTarget()) : new SerializableNoOp());
Callback[] mainCallbacks = new Callback[] {
aopInterceptor, // for normal advice
targetInterceptor, // invoke target without considering advice, if optimized
new SerializableNoOp(), // no override for methods mapped to this
targetDispatcher, this.advisedDispatcher,
new EqualsInterceptor(this.advised),
new HashCodeInterceptor(this.advised)
};
Callback[] callbacks;
if (isStatic && isFrozen) {
Method[] methods = rootClass.getMethods();
Callback[] fixedCallbacks = new Callback[methods.length];
this.fixedInterceptorMap = new HashMap<>(methods.length);
// TODO: small memory optimization here (can skip creation for methods with no advice)
for (int x = 0; x < methods.length; x ) {
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(methods[x], rootClass);
fixedCallbacks[x] = new FixedChainStaticTargetInterceptor(
chain, this.advised.getTargetSource().getTarget(), this.advised.getTargetClass());
this.fixedInterceptorMap.put(methods[x].toString(), x);
}
callbacks = new Callback[mainCallbacks.length fixedCallbacks.length];
System.arraycopy(mainCallbacks, 0, callbacks, 0, mainCallbacks.length);
System.arraycopy(fixedCallbacks, 0, callbacks, mainCallbacks.length, fixedCallbacks.length);
this.fixedInterceptorOffset = mainCallbacks.length;
}
else {
callbacks = mainCallbacks;
}
return callbacks;
}
在此篇幅异步编程场景,调用代理类方法会直接调用到DynamicAdvisedInterceptor的intercept:
代码语言:javascript复制public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
Object oldProxy = null;
boolean setProxyContext = false;
Object target = null;
TargetSource targetSource = this.advised.getTargetSource();
try {
if (this.advised.exposeProxy) {
// Make invocation available if necessary.
oldProxy = AopContext.setCurrentProxy(proxy);
setProxyContext = true;
}
target = targetSource.getTarget();
Class<?> targetClass = (target != null ? target.getClass() : null);
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
Object retVal;
if (chain.isEmpty() && Modifier.isPublic(method.getModifiers())) {
Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
retVal = methodProxy.invoke(target, argsToUse);
}
else {
// We need to create a method invocation...
retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
}
retVal = processReturnType(proxy, target, method, retVal);
return retVal;
}
finally {
if (target != null && !targetSource.isStatic()) {
targetSource.releaseTarget(target);
}
if (setProxyContext) {
// Restore old proxy.
AopContext.setCurrentProxy(oldProxy);
}
}
}
先获取代理类对应方法的拦截器链,如果没有拦截器链且方法是public类型,直接调用代理方法返回,否则将方法连同拦截器链构造成CglibMethodInvocation并执行.
在JdkDynamicAopProxy和CglibAopProxy生成的代理类执行的过程都会调用到前边所说的AnnotationAsyncExecutionInterceptor类的invoke方法,也即是异步执行的逻辑.
jdk动态代理异步执行时序图:
Cglib代理异步执行时序图:
五、总结
从本篇第三节异步编程使用方式来看,spring异步编程接入特别简单,但是从第四节的原理和源码解析来看,其实现也挺复杂的,这就是spring的强大之处,把困难留给自己,把便利留给使用者,把一些复杂的实现对用户做到透明化.
从spring异步编程的源码来看,其使用了很多技术和功能点:
- 导入配置:AsyncConfigurationSelector
- 后置处理器:AsyncAnnotationBeanPostProcessor
- Aop编程:AsyncAnnotationAdvisor
- 线程池:AsyncTaskExecutor
- 拦截器: AnnotationAsyncExecutionInterceptor
- 切入点: ComposablePointcut/AnnotationMatchingPointcut
- 工厂模式: BeanFactory和ProxyFactory
- 动态代理: JdkDynamicAopProxy和CglibAopProxy
- 代理类调用委托处理: jdk动态代理委托给JdkDynamicAopProxy.invoke,cglib动态代理类委托给DynamicAdvisedInterceptor.intercept
由于篇幅问题,中间还有很多细节没覆盖到,比如说获取线程池的逻辑设计也比较巧妙,感兴趣的也可以深入研究一下:
代码语言:javascript复制protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
return beanFactory.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
logger.debug("Could not find unique TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
}
}
catch (NoSuchBeanDefinitionException ex) {
logger.debug("Could not find default TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
}
}
}
return null;
}
spring异步的使用主要记住两个点,2个注解和一个返回值,在启动类或者配置使用@EnableAsync开启异步,在需要异步调用的方法上添加@Async注解,异步支持的返回类型有CompletableFuture、ListenableFuture和Future和void.