简介
Async 注解是 Java 8 中的一个注解,用于标识一个方法是异步执行的。当一个方法被标记为 Async 时,该方法将在一个新的线程中执行,并且可以立即返回一个 CompletableFuture 对象。使用 CompletableFuture 可以更轻松地管理异步计算的结果。下面是一个使用 Async 注解的示例代码:
代码语言:javascript复制@Async
public CompletableFuture doSomethingAsync() {
// 异步执行一些操作
}
在上面的代码中,doSomethingAsync() 方法被标记为 Async,这意味着该方法将在一个新的线程中异步执行,同时返回一个 CompletableFuture 对象。
应用场景
- 同步: 同步就是整个处理过程顺序执行,当各个过程都执行完毕,并返回结果。
- 异步: 异步调用则是只是发送了调用的指令,调用者无需等待被调用的方法完全执行完毕;而是继续执行下面的流程。
例如, 在某个调用中,需要顺序调用 A, B, C三个过程方法;如他们都是同步调用,则需要将他们都顺序执行完毕之后,方算作过程执行完毕;如B为一个异步的调用方法,则在执行完A之后,调用B,并不等待B完成,而是执行开始调用C,待C执行完毕之后,就意味着这个过程执行完毕了。
在Java中,一般在处理类似的场景之时,都是基于创建独立的线程去完成相应的异步调用逻辑,通过主线程和不同的业务子线程之间的执行流程,从而在启动独立的线程之后,主线程继续执行而不会产生停滞等待的情况。
Spring 已经实现的线程池
SimpleAsyncTaskExecutor
:不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。SyncTaskExecutor
:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方。ConcurrentTaskExecutor
:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类。SimpleThreadPoolTaskExecutor
:是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类。ThreadPoolTaskExecutor
:最常使用,推荐。其实质是对java.util.concurrent.ThreadPoolExecutor
的包装。
异步的方法
- 最简单的异步调用,返回值为void
- 带参数的异步调用,异步方法可以传入参数
- 存在返回值,常调用返回Future
Spring中启用@Async
配置类
代码语言:javascript复制@Configuration
@Slf4j
public class AsyncConfiguration implements AsyncConfigurer {
/**
* 核心线程
*/
private static final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() == 1 ? 4 : Runtime.getRuntime().availableProcessors();
@Primary
@Bean("commonExecutor")
public Executor commonExecutor() {
log.info("==== start initialize common async executor =====");
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//设置线程池参数信息
taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
taskExecutor.setMaxPoolSize(CORE_POOL_SIZE);
taskExecutor.setQueueCapacity(1024);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setThreadNamePrefix("commonExecutor--");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(20);
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();
return taskExecutor;
}
@Bean("xmarsRecommendationExecutor")
public Executor xmarsRecommendationExecutor() {
log.info("==== start initialize xmars recommendation async executor =====");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(CORE_POOL_SIZE);
executor.setMaxPoolSize(CORE_POOL_SIZE);
executor.setQueueCapacity(10240);
executor.setKeepAliveSeconds(120);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("xmars-recommendation-executor--");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
@Override
public Executor getAsyncExecutor() {
return commonExecutor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return ((ex, method, params) -> log.error("执行异步任务:{}", method, ex));
}
}
启用方式
代码语言:javascript复制// Spring boot启用:
@EnableAsync
@EnableTransactionManagement
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
@Async默认线程池
Spring应用默认的线程池,指在@Async
注解在使用时,不指定线程池的名称,@Async
默认异步配置使用的是SimpleAsyncTaskExecutor
,该线程池默认来一个任务创建一个线程,若系统中不断的创建线程,最终会导致系统占用内存过高,引发OutOfMemoryError错误。
针对线程创建问题,SimpleAsyncTaskExecutor
提供了限流机制,通过concurrencyLimit
属性来控制开关,当concurrencyLimit>=0
时开启限流机制,默认关闭限流机制即concurrencyLimit=-1
,当关闭情况下,会不断创建新的线程来处理任务。基于默认配置,SimpleAsyncTaskExecutor
并不是严格意义的线程池,达不到线程复用的功能。
使用方式
无返回值调用
基于@Async
无返回值调用,直接在使用类,使用方法(建议在使用方法)上,加上注解。若需要抛出异常,需手动new一个异常抛出。
/**
* 带参数的异步调用 异步方法可以传入参数
* 对于返回值是void,异常会被AsyncUncaughtExceptionHandler处理掉
* @param s
*/
@Async
public void asyncInvokeWithException(String s) {
log.info("asyncInvokeWithParameter, parementer={}", s);
throw new IllegalArgumentException(s);
}
有返回值Future调用
代码语言:javascript复制/**
* 异常调用返回Future
* 对于返回值是Future,不会被AsyncUncaughtExceptionHandler处理,需要我们在方法中捕获异常并处理
* 或者在调用方在调用Futrue.get时捕获异常进行处理
*
* @param i
* @return
*/
@Async
public Future asyncInvokeReturnFuture(int i) {
log.info("asyncInvokeReturnFuture, parementer={}", i);
Future future;
try {
Thread.sleep(1000 * 1);
future = new AsyncResult("success:" i);
throw new IllegalArgumentException("a");
} catch (InterruptedException e) {
future = new AsyncResult("error");
} catch (IllegalArgumentException e) {
future = new AsyncResult("error-IllegalArgumentException");
}
return future;
}
以上两种使用也可以使用CompletableFuture
代替,具体详情请看:
CompletableFuture
默认线程池的弊端
- 在线程池应用中,参考阿里巴巴java开发规范:线程池不允许使用Executors去创建,不允许使用系统默认的线程池,推荐通过ThreadPoolExecutor的方式,这样的处理方式让开发的工程师更加明确线程池的运行规则,规避资源耗尽的风险。* Executors各个方法的弊端:
newFixedThreadPool
和newSingleThreadExecutor
:主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。newCachedThreadPool
和newScheduledThreadPool
:要问题是线程数最大数是Integer.MAX_VALUE
,可能会创建数量非常多的线程,甚至OOM。
@Async应用自定义线程池
自定义线程池,可对系统中线程池更加细粒度的控制,方便调整线程池大小配置,线程执行异常控制和处理。在设置系统自定义线程池代替默认线程池时,虽可通过多种模式设置,但替换默认线程池最终产生的线程池有且只能设置一个(不能设置多个类继承AsyncConfigurer)。自定义线程池有如下模式:
- 重新实现接口
AsyncConfigurer
- 继承
AsyncConfigurerSupport
- 配置由自定义的
TaskExecutor
替代内置的任务执行器
通过查看Spring源码关于@Async
的默认调用规则,会优先查询源码中实现AsyncConfigurer这个接口的类,实现这个接口的类为AsyncConfigurerSupport。但默认配置的线程池和异步处理方法均为空,所以,无论是继承或者重新实现接口,都需指定一个线程池。且重新实现 public Executor getAsyncExecutor()
方法。
实现接口AsyncConfigurer
详情请看上边配置类https://www.notion.so/cuizb/Async-46059826a5fb4e1a911753d911703fdf?pvs=4#dc5bb44d51a0448b861738a337ea7f0e
继承类AsyncConfigurerSupport
类AsyncConfigurerSupport实现接口AsyncConfigurer
配置自定义的TaskExecutor
由于AsyncConfigurer的默认线程池在源码中为空,Spring通过beanFactory.getBean(TaskExecutor.class)
先查看是否有线程池,未配置时,通过beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class)
,又查询是否存在默认名称为TaskExecutor的线程池。
所以可以在项目中,定义名称为TaskExecutor的bean生成一个默认线程池。也可不指定线程池的名称,申明一个线程池,本身底层是基于TaskExecutor.class
便可。
比如:
代码语言:javascript复制Executor.class:ThreadPoolExecutorAdapter->ThreadPoolExecutor->AbstractExecutorService->ExecutorService->Executor
这样的模式,最终底层为Executor.class
,在替换默认的线程池时,需设置默认的线程池名称为TaskExecutor
TaskExecutor.class:ThreadPoolTaskExecutor->SchedulingTaskExecutor->AsyncTaskExecutor->TaskExecutor
这样的模式,最终底层为TaskExecutor.class
,在替换默认的线程池时,可不指定线程池名称。最新面试题整理好了,点击Java面试库 小程序在线刷题。
@EnableAsync
@Configuration
public class TaskPoolConfig {
@Bean(name = AsyncExecutionAspectSupport.DEFAULT_TASK_EXECUTOR_BEAN_NAME)
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程池大小
executor.setCorePoolSize(10);
//最大线程数
executor.setMaxPoolSize(20);
//队列容量
executor.setQueueCapacity(200);
//活跃时间
executor.setKeepAliveSeconds(60);
//线程名字前缀
executor.setThreadNamePrefix("taskExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
@Bean(name = "new_task")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程池大小
executor.setCorePoolSize(10);
//最大线程数
executor.setMaxPoolSize(20);
//队列容量
executor.setQueueCapacity(200);
//活跃时间
executor.setKeepAliveSeconds(60);
//线程名字前缀
executor.setThreadNamePrefix("taskExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
@Async
注解,使用系统默认或者自定义的线程池(代替默认线程池)。可在项目中设置多个线程池,在异步调用时,指明需要调用的线程池名称,如@Async("new_task")
。
源码解析
AsyncExecutionAspectSupport.java
代码语言:javascript复制// 定义了一个受保护的方法,该方法接受一个Method类型的参数,并返回一个AsyncTaskExecutor类型的对象。
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
// 尝试从executors这个Map中获取与给定方法关联的AsyncTaskExecutor。
AsyncTaskExecutor executor = this.executors.get(method);
// 如果executor为空,表示当前方法还没有对应的AsyncTaskExecutor。
if (executor == null) {
// 根据方法确定执行器的标识符。
String qualifier = getExecutorQualifier(method);
// 如果获取到了非空的执行器标识符。
if (StringUtils.hasLength(qualifier)) {
// 在beanFactory中查找具有指定标识符的执行器。
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
// 如果没有获取到执行器标识符,则使用默认的执行器。
else {
targetExecutor = this.defaultExecutor.get();
}
// 如果仍然没有找到执行器,则返回null。
if (targetExecutor == null) {
return null;
}
// 如果targetExecutor是AsyncListenableTaskExecutor的实例,则直接赋值给executor。
// 否则,使用targetExecutor创建一个新的TaskExecutorAdapter并将其赋值给executor。
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
// 将方法和执行器的关联存储到executors这个Map中。
this.executors.put(method, executor);
}
// 返回与给定方法关联的AsyncTaskExecutor。
return executor;
}
简单总结:
- 这个方法首先尝试从
executors
的Map中直接获取与给定方法关联的AsyncTaskExecutor
。 - 如果Map中没有,它会根据方法的某些特性(通过
getExecutorQualifier
方法)来查找或确定一个执行器。 - 如果找到了执行器,它会检查这个执行器是否是
AsyncListenableTaskExecutor
的实例。如果是,则直接使用;如果不是,则将其包装在TaskExecutorAdapter
中。 - 最后,将方法与确定的执行器关联存储在
executors
的Map中,并返回该执行器。
AsyncConfigurer.java
代码语言:javascript复制public interface AsyncConfigurer {
/**
* The {@link Executor} instance to be used when processing async
* method invocations.
*/
@Nullable
default Executor getAsyncExecutor() {
return null;
}
/**
* The {@link AsyncUncaughtExceptionHandler} instance to be used
* when an exception is thrown during an asynchronous method execution
* with {@code void} return type.
*/
@Nullable
default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
AsyncExecutionAspectSupport.java
代码语言:javascript复制// 定义了一个受保护的方法,该方法接受一个可空的BeanFactory作为参数,并返回一个Executor类型的对象。
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
// 如果beanFactory不为空,则继续执行以下逻辑。
if (beanFactory != null) {
try {
// 尝试从beanFactory中获取一个TaskExecutor类型的bean。
// 这里特别指定了TaskExecutor而不是普通的Executor,因为后者可能会匹配到ScheduledExecutorService,
// 这对于我们的目的来说是不可用的。TaskExecutor更明确地为此设计。
return beanFactory.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
// 如果找到了多个TaskExecutor bean,记录一条debug级别的日志。
logger.debug("Could not find unique TaskExecutor bean", ex);
try {
// 尝试获取名为DEFAULT_TASK_EXECUTOR_BEAN_NAME的Executor类型的bean。
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
// 如果连默认的TaskExecutor bean都没有找到,并且logger的info级别是启用的,
// 则记录一条info级别的日志,说明找到了多个TaskExecutor bean,但没有一个是名为'taskExecutor'的,
// 并列出所有找到的bean的名称。
if (logger.isInfoEnabled()) {
logger.info("More than one TaskExecutor bean found within the context, and none is named 'taskExecutor'. "
"Mark one of them as primary or name it 'taskExecutor' (possibly as an alias) in order to use it for async processing: " ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
// 如果连一个TaskExecutor bean都没有找到,记录一条debug级别的日志。
logger.debug("Could not find default TaskExecutor bean", ex);
try {
// 再次尝试获取名为DEFAULT_TASK_EXECUTOR_BEAN_NAME的Executor类型的bean。
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
// 如果连名为DEFAULT_TASK_EXECUTOR_BEAN_NAME的bean都没有找到,记录一条info级别的日志,
// 说明没有找到用于异步处理的TaskExecutor bean。
logger.info("No task executor bean found for async processing: "
"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
}
// 如果没有找到任何可用的bean,方法返回null。
}
}
// 如果beanFactory本身就是null,方法也返回null。
return null;
}
https://mmbiz.qpic.cn/mmbiz_png/mR4CwoLXicg3LaOuyIHahh7jtrUvagx2fV3FSO6caGkylwaMw37PHiayowQkDjxV82c2Duic9d58eiclekW5WkKB1g/640?wx_fmt=png
这个方法通过一系列的异常捕获来尝试获取一个可用的 TaskExecutor
bean。如果beanFactory中没有找到任何 TaskExecutor
bean,或者没有找到名为 DEFAULT_TASK_EXECUTOR_BEAN_NAME
的bean,方法最终会返回null。这样的设计允许开发者在Spring容器中配置一个或多个 TaskExecutor
bean,并通过名称或标记其中一个为primary来指定哪个bean应该被用作默认的异步任务执行器。如果没有找到任何bean,则框架可能会回退到使用本地的默认执行器,或者完全不使用执行器。
如果没有找到项目中设置的默认线程池时,采用spring 默认的线程池
代码语言:javascript复制/**
* This implementation searches for a unique {@link org.springframework.core.task.TaskExecutor}
* bean in the context, or for an {@link Executor} bean named "taskExecutor" otherwise.
* If neither of the two is resolvable (e.g. if no {@code BeanFactory} was configured at all),
* this implementation falls back to a newly created {@link SimpleAsyncTaskExecutor} instance
* for local use if no default could be found.
* @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME
*/
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
为什么不建议直接使用 @Async 注解?
在泰山版《阿里巴巴开发手册》规定开发中不建议使用 Async 注解,这是为什么?在实际开发中,异步编程已经成为了一个必备的技能。为了帮助开发者更轻松地进行异步编程,Java 8 引入了 Async 注解,使得异步编程变得更加简单。然而,虽然 Async 注解可以为我们带来很多好处,但是在某些情况下直接使用 Async 注解并不是一个好主意。
Async 注解的优点
使用 Async 注解有以下优点:
- 简化异步编程:使用 Async 注解可以使得异步编程变得更加简单。开发者只需要将需要异步执行的方法标记为 Async,然后使用 CompletableFuture 来管理异步计算的结果即可。
- 提高应用程序的响应速度:使用 Async 注解可以将耗时的操作异步执行,从而避免阻塞主线程,提高应用程序的响应速度。
- 提高系统的并发性能:使用 Async 注解可以使得多个异步操作并发执行,从而提高系统的并发性能。
Async 注解的局限性
然而,虽然 Async 注解可以为我们带来很多好处,但是它也有一些局限性。
- 异常处理:使用 Async 注解时,异常处理可能会变得更加复杂。由于异步操作是在另一个线程中执行的,因此如果异步操作抛出了异常,这个异常可能不会被捕获。为了解决这个问题,开发者需要使用 CompletableFuture 的异常处理机制来捕获异步操作抛出的异常。
- 内存占用:使用 Async 注解时,由于每个异步操作都会在一个新的线程中执行,因此可能会导致大量的线程被创建。这可能会导致内存占用过高,从而导致应用程序性能下降。
- 阻塞操作:使用 Async 注解时,如果异步操作中包含了阻塞操作,这可能会导致线程池中的线程被阻塞,从而导致应用程序性能下降。
不建议直接使用 Async 注解的原因
由于 Async 注解的局限性,直接使用 Async 注解可能不是一个好主意。下面是不建议直接使用 Async 注解的原因:
- 可能会导致性能问题:由于 Async 注解会创建新的线程来执行异步操作,因此如果使用不当,可能会导致线程池中的线程被过度消耗,从而导致性能问题。
- 可能会导致内存泄漏问题:如果使用 Async 注解时没有正确地管理线程池,可能会导致内存泄漏问题。例如,如果不正确地配置线程池大小,可能会导致线程池中的线程无法回收,从而导致内存泄漏。
- 可能会导致死锁问题:如果异步操作中包含了阻塞操作,可能会导致线程池中的线程被阻塞,从而导致死锁问题。
综上所述,直接使用 Async 注解可能会导致各种问题,因此不建议直接使用 Async 注解。
如何更好地使用 Async 注解
虽然不建议直接使用 Async 注解,但是在某些情况下,使用 Async 注解仍然是一个不错的选择。下面是一些使用 Async 注解的最佳实践:
- 配置线程池:使用 Async 注解时,应该配置合适的线程池大小。线程池的大小应该根据应用程序的性质和需求来确定。
- 使用异常处理机制:使用 Async 注解时,应该使用 CompletableFuture 的异常处理机制来捕获异步操作抛出的异常。
- 避免阻塞操作:使用 Async 注解时,应该避免在异步操作中包含阻塞操作。如果必须使用阻塞操作,应该使用 CompletableFuture 的 supplyAsync() 方法来确保阻塞操作在一个新的线程中执行。
- 使用 CompletableFuture API:使用 Async 注解时,应该使用 CompletableFuture API 来管理异步计算的结果。使用 CompletableFuture API 可以更加轻松地处理异步操作的结果,并避免一些潜在的问题。
综上所述,使用 Async 注解可以为我们带来很多好处,但是在使用 Async 注解时需要注意一些问题,以避免出现性能问题、内存泄漏问题和死锁问题等。因此,在使用 Async 注解时,我们应该遵循一些最佳实践来确保代码的正确性和性能。