背景
我们的业务场景里需要调用外部请求,这个外部系统是一个异构框架,不能直接走 rpc 调用。外部资源处理过程通常不可控,为了提高系统可用性,与外部系统解耦,通常的方案可以走消息队列或者直接 http 调用。http 调用相对轻量,不用额外引入中间件,同时可以将外部调用通过异步线程池提交,避免阻塞业务主流程。
spring boot 异步线程池实践
spring boot 框架已经实现 java.util.concurrent.Executor
接口的线程池类主要有以下几种
SyncTaskExecutor
这个实现类是并没有异步执行,而是在任务执行execute
方法中直接调用了task.run()
,其本质就是一个同步方法实现。ThreadPoolTaskExecutor
这个实现类是我们通常所使用的,查看初始化源码initializeExecutor
可以看到,它的初始化定义了异步线程池java.util.concurrent.ThreadPoolExecutor
对象,线程池队列BlockingQueue<Runnable>
对象,这些初始化定义好的对象可以直接使用。
SimpleAsyncTaskExecutor
这个实现类也不推荐使用,查看它的任务执行方法可以看到,每次调用都是 new 一个新的线程,当我们任务较多且任务执行时间较长时,很消耗服务资源。
SimpleThreadPoolTaskExecutor
这个实现类是Quartz SimpleThreadPool
的子类,主要适用于Quartz Scheduler
调度器。
综上,我们在使用 spring boot 的异步线程类时,主要考虑使用ThreadPoolTaskExecutor
这个实现类。
线程池参数配置
在spring boot 框架中使用异步线程,主要通过@Async
注解,程序中的配置有以下几个需要注意的地方:
- 在服务启动类或者被调用的异步方法加上
@EnableAsync
注解,来开启异步方法调用;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableAsync
@SpringBootApplication
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
}
- 自定义线程池参数。便于通过日志排查问题,需要自定义线程池前缀,同时还可以自定义线程池参数
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class AsyncPoolConfig {
@Bean("myAsynExecutor")
public Executor createMyAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5); // 核心线程数
executor.setMaxPoolSize(20); // 最大线程数
executor.setQueueCapacity(100); // 任务队列大小
executor.setKeepAliveSeconds(60); // 线程池存活时间
executor.setAllowCoreThreadTimeOut(true); // 是否允许核心线程超时
executor.setThreadNamePrefix("myAsynExecutor-"); // 线程命名前缀
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 任务队列拒绝策略-直接调用线程运行任务
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 任务队列拒绝策略-直接拒绝并抛异常
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); // 任务队列拒绝策略-直接丢弃
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); // 任务队列拒绝策略-直接丢弃最老的任务
executor.setWaitForTasksToCompleteOnShutdown(true); // 等待其他线程任务完成才销毁
executor.setAwaitTerminationSeconds(60); // 线程中任务的等待时间
executor.initialize();
return executor;
}
}
上面我们自定义了异步线程池,主要参数极其注释都标明了。
- 在程序中使用我们上面自定义的异步线程,直接在异步线程注解
@Async
中指定我们前面定义的 bean 名称。
@Service
public class MyServiceImpl {
@Async("myAsynExecutor")
public void doMyExecute() throws Exeception {
// do you business service
}
}
- 需要注意,主方法与被调用的方法需要定义不同的类中,因为 spring boot 默认同一类中的方法调用不会被 AOP 拦截,会导致注解无法生效。同时,最好还是自定义一些线程的核心参数及拒绝策略,不然 springboot 会默认每次都新创建一个线程来执行异步任务,当异步调用较多且调用流程长时,线程的开销比较大,容易导致 OOM .
任务在线程池中执行的过程大致如下:
- 当方法被调用后,提交一个异步任务,会进行一系列校验。首先会判断当前线程池中已有的线程数是否小于定义的核心线程数,满足条件则创建核心线程或者复用线程执行异步方法调用。
- 当线程池中线程数大于核心线程时,则判断任务队列是否已满,未满则放入队列中等待核心线程调度
- 当任务队列已满时,判断线程池中线程数是否大于定义的最大线程数,小于则创建新线程来执行异步方法调用
- 当任务队列已满,且线程池中线程数大于定义的最大线程数,则执行任务的拒绝策略。
最后,用一张流程图来清晰展现这个过程
我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!