关于Spring中的@Async注解以及为什么不建议使用 - Java技术债务

2024-06-21 17:02:09 浏览数 (3)

简介

Async 注解是 Java 8 中的一个注解,用于标识一个方法是异步执行的。当一个方法被标记为 Async 时,该方法将在一个新的线程中执行,并且可以立即返回一个 CompletableFuture 对象。使用 CompletableFuture 可以更轻松地管理异步计算的结果。下面是一个使用 Async 注解的示例代码:

代码语言:javascript复制
@Async
public CompletableFuture doSomethingAsync() {    
	// 异步执行一些操作
}

在上面的代码中,doSomethingAsync() 方法被标记为 Async,这意味着该方法将在一个新的线程中异步执行,同时返回一个 CompletableFuture 对象。

应用场景

  • 同步: 同步就是整个处理过程顺序执行,当各个过程都执行完毕,并返回结果。
  • 异步: 异步调用则是只是发送了调用的指令,调用者无需等待被调用的方法完全执行完毕;而是继续执行下面的流程。

例如, 在某个调用中,需要顺序调用 A, B, C三个过程方法;如他们都是同步调用,则需要将他们都顺序执行完毕之后,方算作过程执行完毕;如B为一个异步的调用方法,则在执行完A之后,调用B,并不等待B完成,而是执行开始调用C,待C执行完毕之后,就意味着这个过程执行完毕了。

在Java中,一般在处理类似的场景之时,都是基于创建独立的线程去完成相应的异步调用逻辑,通过主线程和不同的业务子线程之间的执行流程,从而在启动独立的线程之后,主线程继续执行而不会产生停滞等待的情况。

Spring 已经实现的线程池

  1. SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。
  2. SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方。
  3. ConcurrentTaskExecutor:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类。
  4. SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类。
  5. ThreadPoolTaskExecutor :最常使用,推荐。其实质是对java.util.concurrent.ThreadPoolExecutor的包装。

异步的方法

  1. 最简单的异步调用,返回值为void
  2. 带参数的异步调用,异步方法可以传入参数
  3. 存在返回值,常调用返回Future

Spring中启用@Async

配置类

代码语言:javascript复制
@Configuration
@Slf4j
public class AsyncConfiguration implements AsyncConfigurer {

    /**
     * 核心线程
     */
    private static final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() == 1 ? 4 : Runtime.getRuntime().availableProcessors();

    @Primary
    @Bean("commonExecutor")
    public Executor commonExecutor() {
        log.info("==== start initialize common async executor =====");
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
        taskExecutor.setMaxPoolSize(CORE_POOL_SIZE);
        taskExecutor.setQueueCapacity(1024);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("commonExecutor--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(20);
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean("xmarsRecommendationExecutor")
    public Executor xmarsRecommendationExecutor() {
        log.info("==== start initialize xmars recommendation async executor =====");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(CORE_POOL_SIZE);
        executor.setMaxPoolSize(CORE_POOL_SIZE);
        executor.setQueueCapacity(10240);
        executor.setKeepAliveSeconds(120);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("xmars-recommendation-executor--");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }

    @Override
    public Executor getAsyncExecutor() {
        return commonExecutor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return ((ex, method, params) -> log.error("执行异步任务:{}", method, ex));
    }
}

启用方式

代码语言:javascript复制
// Spring boot启用:
@EnableAsync
@EnableTransactionManagement
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

@Async默认线程池

Spring应用默认的线程池,指在@Async注解在使用时,不指定线程池的名称,@Async默认异步配置使用的是SimpleAsyncTaskExecutor,该线程池默认来一个任务创建一个线程,若系统中不断的创建线程,最终会导致系统占用内存过高,引发OutOfMemoryError错误。

针对线程创建问题,SimpleAsyncTaskExecutor提供了限流机制,通过concurrencyLimit属性来控制开关,当concurrencyLimit>=0时开启限流机制,默认关闭限流机制即concurrencyLimit=-1,当关闭情况下,会不断创建新的线程来处理任务。基于默认配置,SimpleAsyncTaskExecutor并不是严格意义的线程池,达不到线程复用的功能。

使用方式

无返回值调用

基于@Async无返回值调用,直接在使用类,使用方法(建议在使用方法)上,加上注解。若需要抛出异常,需手动new一个异常抛出。

代码语言:javascript复制
/**
 * 带参数的异步调用 异步方法可以传入参数    
 *  对于返回值是void,异常会被AsyncUncaughtExceptionHandler处理掉    

 * @param s
 */
@Async
public void asyncInvokeWithException(String s) {
    log.info("asyncInvokeWithParameter, parementer={}", s);
    throw new IllegalArgumentException(s);
}

有返回值Future调用

代码语言:javascript复制
/**
 * 异常调用返回Future
 * 对于返回值是Future,不会被AsyncUncaughtExceptionHandler处理,需要我们在方法中捕获异常并处理
 * 或者在调用方在调用Futrue.get时捕获异常进行处理
 *
 * @param i
 * @return
 */
@Async
public Future asyncInvokeReturnFuture(int i) {
    log.info("asyncInvokeReturnFuture, parementer={}", i);
    Future future;
    try {
        Thread.sleep(1000 * 1);
        future = new AsyncResult("success:"   i);
        throw new IllegalArgumentException("a");
    } catch (InterruptedException e) {
        future = new AsyncResult("error");
    } catch (IllegalArgumentException e) {
        future = new AsyncResult("error-IllegalArgumentException");
    }
    return future;
}

以上两种使用也可以使用CompletableFuture 代替,具体详情请看:

CompletableFuture

默认线程池的弊端

  • 在线程池应用中,参考阿里巴巴java开发规范:线程池不允许使用Executors去创建,不允许使用系统默认的线程池,推荐通过ThreadPoolExecutor的方式,这样的处理方式让开发的工程师更加明确线程池的运行规则,规避资源耗尽的风险。* Executors各个方法的弊端:
  • newFixedThreadPoolnewSingleThreadExecutor:主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
  • newCachedThreadPoolnewScheduledThreadPool:要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

@Async应用自定义线程池

自定义线程池,可对系统中线程池更加细粒度的控制,方便调整线程池大小配置,线程执行异常控制和处理。在设置系统自定义线程池代替默认线程池时,虽可通过多种模式设置,但替换默认线程池最终产生的线程池有且只能设置一个(不能设置多个类继承AsyncConfigurer)。自定义线程池有如下模式:

  • 重新实现接口AsyncConfigurer
  • 继承AsyncConfigurerSupport
  • 配置由自定义的TaskExecutor替代内置的任务执行器

通过查看Spring源码关于@Async的默认调用规则,会优先查询源码中实现AsyncConfigurer这个接口的类,实现这个接口的类为AsyncConfigurerSupport。但默认配置的线程池和异步处理方法均为空,所以,无论是继承或者重新实现接口,都需指定一个线程池。且重新实现 public Executor getAsyncExecutor()方法。

实现接口AsyncConfigurer

详情请看上边配置类https://www.notion.so/cuizb/Async-46059826a5fb4e1a911753d911703fdf?pvs=4#dc5bb44d51a0448b861738a337ea7f0e

继承类AsyncConfigurerSupport

类AsyncConfigurerSupport实现接口AsyncConfigurer

配置自定义的TaskExecutor

由于AsyncConfigurer的默认线程池在源码中为空,Spring通过beanFactory.getBean(TaskExecutor.class)先查看是否有线程池,未配置时,通过beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class),又查询是否存在默认名称为TaskExecutor的线程池。

所以可以在项目中,定义名称为TaskExecutor的bean生成一个默认线程池。也可不指定线程池的名称,申明一个线程池,本身底层是基于TaskExecutor.class便可。

比如:

代码语言:javascript复制
Executor.class:ThreadPoolExecutorAdapter->ThreadPoolExecutor->AbstractExecutorService->ExecutorService->Executor

这样的模式,最终底层为Executor.class,在替换默认的线程池时,需设置默认的线程池名称为TaskExecutor

代码语言:javascript复制
TaskExecutor.class:ThreadPoolTaskExecutor->SchedulingTaskExecutor->AsyncTaskExecutor->TaskExecutor

这样的模式,最终底层为TaskExecutor.class,在替换默认的线程池时,可不指定线程池名称。最新面试题整理好了,点击Java面试库 小程序在线刷题。

代码语言:javascript复制
@EnableAsync
@Configuration
public class TaskPoolConfig {
   @Bean(name = AsyncExecutionAspectSupport.DEFAULT_TASK_EXECUTOR_BEAN_NAME)
   public Executor taskExecutor() {
       ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程池大小
       executor.setCorePoolSize(10);
       //最大线程数
       executor.setMaxPoolSize(20);
       //队列容量
       executor.setQueueCapacity(200);
       //活跃时间
       executor.setKeepAliveSeconds(60);
       //线程名字前缀
       executor.setThreadNamePrefix("taskExecutor-");
       executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
       return executor;
   }
  @Bean(name = "new_task")
   public Executor taskExecutor() {
       ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程池大小
       executor.setCorePoolSize(10);
       //最大线程数
       executor.setMaxPoolSize(20);
       //队列容量
       executor.setQueueCapacity(200);
       //活跃时间
       executor.setKeepAliveSeconds(60);
       //线程名字前缀
       executor.setThreadNamePrefix("taskExecutor-");
       executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
       return executor;
   }
}

@Async注解,使用系统默认或者自定义的线程池(代替默认线程池)。可在项目中设置多个线程池,在异步调用时,指明需要调用的线程池名称,如@Async("new_task")

源码解析

AsyncExecutionAspectSupport.java

代码语言:javascript复制
// 定义了一个受保护的方法,该方法接受一个Method类型的参数,并返回一个AsyncTaskExecutor类型的对象。  
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {  
    // 尝试从executors这个Map中获取与给定方法关联的AsyncTaskExecutor。  
    AsyncTaskExecutor executor = this.executors.get(method);  
  
    // 如果executor为空,表示当前方法还没有对应的AsyncTaskExecutor。  
    if (executor == null) {  
        // 根据方法确定执行器的标识符。  
        String qualifier = getExecutorQualifier(method);  
  
        // 如果获取到了非空的执行器标识符。  
        if (StringUtils.hasLength(qualifier)) {  
            // 在beanFactory中查找具有指定标识符的执行器。  
            targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);  
        }  
        // 如果没有获取到执行器标识符,则使用默认的执行器。  
        else {  
            targetExecutor = this.defaultExecutor.get();  
        }  
  
        // 如果仍然没有找到执行器,则返回null。  
        if (targetExecutor == null) {  
            return null;  
        }  
  
        // 如果targetExecutor是AsyncListenableTaskExecutor的实例,则直接赋值给executor。  
        // 否则,使用targetExecutor创建一个新的TaskExecutorAdapter并将其赋值给executor。  
        executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?  
                (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));  
  
        // 将方法和执行器的关联存储到executors这个Map中。  
        this.executors.put(method, executor);  
    }  
  
    // 返回与给定方法关联的AsyncTaskExecutor。  
    return executor;  
}

简单总结:

  1. 这个方法首先尝试从executors的Map中直接获取与给定方法关联的AsyncTaskExecutor
  2. 如果Map中没有,它会根据方法的某些特性(通过getExecutorQualifier方法)来查找或确定一个执行器。
  3. 如果找到了执行器,它会检查这个执行器是否是AsyncListenableTaskExecutor的实例。如果是,则直接使用;如果不是,则将其包装在TaskExecutorAdapter中。
  4. 最后,将方法与确定的执行器关联存储在executors的Map中,并返回该执行器。

AsyncConfigurer.java

代码语言:javascript复制
public interface AsyncConfigurer {

	/**
	 * The {@link Executor} instance to be used when processing async
	 * method invocations.
	 */
	@Nullable
	default Executor getAsyncExecutor() {
		return null;
	}

	/**
	 * The {@link AsyncUncaughtExceptionHandler} instance to be used
	 * when an exception is thrown during an asynchronous method execution
	 * with {@code void} return type.
	 */
	@Nullable
	default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
		return null;
	}
}

AsyncExecutionAspectSupport.java

代码语言:javascript复制
// 定义了一个受保护的方法,该方法接受一个可空的BeanFactory作为参数,并返回一个Executor类型的对象。  
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {  
    // 如果beanFactory不为空,则继续执行以下逻辑。  
    if (beanFactory != null) {  
        try {  
            // 尝试从beanFactory中获取一个TaskExecutor类型的bean。  
            // 这里特别指定了TaskExecutor而不是普通的Executor,因为后者可能会匹配到ScheduledExecutorService,  
            // 这对于我们的目的来说是不可用的。TaskExecutor更明确地为此设计。  
            return beanFactory.getBean(TaskExecutor.class);  
        }  
        catch (NoUniqueBeanDefinitionException ex) {  
            // 如果找到了多个TaskExecutor bean,记录一条debug级别的日志。  
            logger.debug("Could not find unique TaskExecutor bean", ex);  
            try {  
                // 尝试获取名为DEFAULT_TASK_EXECUTOR_BEAN_NAME的Executor类型的bean。  
                return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);  
            }  
            catch (NoSuchBeanDefinitionException ex2) {  
                // 如果连默认的TaskExecutor bean都没有找到,并且logger的info级别是启用的,  
                // 则记录一条info级别的日志,说明找到了多个TaskExecutor bean,但没有一个是名为'taskExecutor'的,  
                // 并列出所有找到的bean的名称。  
                if (logger.isInfoEnabled()) {  
                    logger.info("More than one TaskExecutor bean found within the context, and none is named 'taskExecutor'. "    
                            "Mark one of them as primary or name it 'taskExecutor' (possibly as an alias) in order to use it for async processing: "   ex.getBeanNamesFound());  
                }  
            }  
        }  
        catch (NoSuchBeanDefinitionException ex) {  
            // 如果连一个TaskExecutor bean都没有找到,记录一条debug级别的日志。  
            logger.debug("Could not find default TaskExecutor bean", ex);  
            try {  
                // 再次尝试获取名为DEFAULT_TASK_EXECUTOR_BEAN_NAME的Executor类型的bean。  
                return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);  
            }  
            catch (NoSuchBeanDefinitionException ex2) {  
                // 如果连名为DEFAULT_TASK_EXECUTOR_BEAN_NAME的bean都没有找到,记录一条info级别的日志,  
                // 说明没有找到用于异步处理的TaskExecutor bean。  
                logger.info("No task executor bean found for async processing: "    
                        "no bean of type TaskExecutor and no bean named 'taskExecutor' either");  
            }  
            // 如果没有找到任何可用的bean,方法返回null。  
        }  
    }  
    // 如果beanFactory本身就是null,方法也返回null。  
    return null;  
}

https://mmbiz.qpic.cn/mmbiz_png/mR4CwoLXicg3LaOuyIHahh7jtrUvagx2fV3FSO6caGkylwaMw37PHiayowQkDjxV82c2Duic9d58eiclekW5WkKB1g/640?wx_fmt=png

这个方法通过一系列的异常捕获来尝试获取一个可用的 TaskExecutor bean。如果beanFactory中没有找到任何 TaskExecutor bean,或者没有找到名为 DEFAULT_TASK_EXECUTOR_BEAN_NAME 的bean,方法最终会返回null。这样的设计允许开发者在Spring容器中配置一个或多个 TaskExecutor bean,并通过名称或标记其中一个为primary来指定哪个bean应该被用作默认的异步任务执行器。如果没有找到任何bean,则框架可能会回退到使用本地的默认执行器,或者完全不使用执行器。

如果没有找到项目中设置的默认线程池时,采用spring 默认的线程池

代码语言:javascript复制
/**
 * This implementation searches for a unique {@link org.springframework.core.task.TaskExecutor}
 * bean in the context, or for an {@link Executor} bean named "taskExecutor" otherwise.
 * If neither of the two is resolvable (e.g. if no {@code BeanFactory} was configured at all),
 * this implementation falls back to a newly created {@link SimpleAsyncTaskExecutor} instance
 * for local use if no default could be found.
 * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME
 */
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
	Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
	return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

为什么不建议直接使用 @Async 注解?

在泰山版《阿里巴巴开发手册》规定开发中不建议使用 Async 注解,这是为什么?在实际开发中,异步编程已经成为了一个必备的技能。为了帮助开发者更轻松地进行异步编程,Java 8 引入了 Async 注解,使得异步编程变得更加简单。然而,虽然 Async 注解可以为我们带来很多好处,但是在某些情况下直接使用 Async 注解并不是一个好主意。

Async 注解的优点

使用 Async 注解有以下优点:

  • 简化异步编程:使用 Async 注解可以使得异步编程变得更加简单。开发者只需要将需要异步执行的方法标记为 Async,然后使用 CompletableFuture 来管理异步计算的结果即可。
  • 提高应用程序的响应速度:使用 Async 注解可以将耗时的操作异步执行,从而避免阻塞主线程,提高应用程序的响应速度。
  • 提高系统的并发性能:使用 Async 注解可以使得多个异步操作并发执行,从而提高系统的并发性能。

Async 注解的局限性

然而,虽然 Async 注解可以为我们带来很多好处,但是它也有一些局限性。

  • 异常处理:使用 Async 注解时,异常处理可能会变得更加复杂。由于异步操作是在另一个线程中执行的,因此如果异步操作抛出了异常,这个异常可能不会被捕获。为了解决这个问题,开发者需要使用 CompletableFuture 的异常处理机制来捕获异步操作抛出的异常。
  • 内存占用:使用 Async 注解时,由于每个异步操作都会在一个新的线程中执行,因此可能会导致大量的线程被创建。这可能会导致内存占用过高,从而导致应用程序性能下降。
  • 阻塞操作:使用 Async 注解时,如果异步操作中包含了阻塞操作,这可能会导致线程池中的线程被阻塞,从而导致应用程序性能下降。

不建议直接使用 Async 注解的原因

由于 Async 注解的局限性,直接使用 Async 注解可能不是一个好主意。下面是不建议直接使用 Async 注解的原因:

  • 可能会导致性能问题:由于 Async 注解会创建新的线程来执行异步操作,因此如果使用不当,可能会导致线程池中的线程被过度消耗,从而导致性能问题。
  • 可能会导致内存泄漏问题:如果使用 Async 注解时没有正确地管理线程池,可能会导致内存泄漏问题。例如,如果不正确地配置线程池大小,可能会导致线程池中的线程无法回收,从而导致内存泄漏。
  • 可能会导致死锁问题:如果异步操作中包含了阻塞操作,可能会导致线程池中的线程被阻塞,从而导致死锁问题。

综上所述,直接使用 Async 注解可能会导致各种问题,因此不建议直接使用 Async 注解。

如何更好地使用 Async 注解

虽然不建议直接使用 Async 注解,但是在某些情况下,使用 Async 注解仍然是一个不错的选择。下面是一些使用 Async 注解的最佳实践:

  • 配置线程池:使用 Async 注解时,应该配置合适的线程池大小。线程池的大小应该根据应用程序的性质和需求来确定。
  • 使用异常处理机制:使用 Async 注解时,应该使用 CompletableFuture 的异常处理机制来捕获异步操作抛出的异常。
  • 避免阻塞操作:使用 Async 注解时,应该避免在异步操作中包含阻塞操作。如果必须使用阻塞操作,应该使用 CompletableFuture 的 supplyAsync() 方法来确保阻塞操作在一个新的线程中执行。
  • 使用 CompletableFuture API:使用 Async 注解时,应该使用 CompletableFuture API 来管理异步计算的结果。使用 CompletableFuture API 可以更加轻松地处理异步操作的结果,并避免一些潜在的问题。

综上所述,使用 Async 注解可以为我们带来很多好处,但是在使用 Async 注解时需要注意一些问题,以避免出现性能问题、内存泄漏问题和死锁问题等。因此,在使用 Async 注解时,我们应该遵循一些最佳实践来确保代码的正确性和性能。

0 人点赞