springboot异步线程实践

2023-11-02 08:38:55 浏览数 (1)

背景

我们的业务场景里需要调用外部请求,这个外部系统是一个异构框架,不能直接走 rpc 调用。外部资源处理过程通常不可控,为了提高系统可用性,与外部系统解耦,通常的方案可以走消息队列或者直接 http 调用。http 调用相对轻量,不用额外引入中间件,同时可以将外部调用通过异步线程池提交,避免阻塞业务主流程。

spring boot 异步线程池实践

spring boot 框架已经实现 java.util.concurrent.Executor 接口的线程池类主要有以下几种

Executor 实现类Executor 实现类
  • SyncTaskExecutor 这个实现类是并没有异步执行,而是在任务执行execute方法中直接调用了task.run(),其本质就是一个同步方法实现。
  • ThreadPoolTaskExecutor 这个实现类是我们通常所使用的,查看初始化源码initializeExecutor可以看到,它的初始化定义了异步线程池java.util.concurrent.ThreadPoolExecutor 对象,线程池队列BlockingQueue<Runnable>对象,这些初始化定义好的对象可以直接使用。
ThreadPoolTaskExecutor 初始化ThreadPoolTaskExecutor 初始化
  • SimpleAsyncTaskExecutor 这个实现类也不推荐使用,查看它的任务执行方法可以看到,每次调用都是 new 一个新的线程,当我们任务较多且任务执行时间较长时,很消耗服务资源。
SimpleAsyncTaskExecutor execute 方法SimpleAsyncTaskExecutor execute 方法
  • SimpleThreadPoolTaskExecutor 这个实现类是Quartz SimpleThreadPool的子类,主要适用于Quartz Scheduler 调度器。

综上,我们在使用 spring boot 的异步线程类时,主要考虑使用ThreadPoolTaskExecutor 这个实现类。

线程池参数配置

在spring boot 框架中使用异步线程,主要通过@Async注解,程序中的配置有以下几个需要注意的地方:

  • 在服务启动类或者被调用的异步方法加上@EnableAsync注解,来开启异步方法调用;
代码语言:java复制
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@EnableAsync
@SpringBootApplication
public class MyApplication {

    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }
}
  • 自定义线程池参数。便于通过日志排查问题,需要自定义线程池前缀,同时还可以自定义线程池参数
代码语言:java复制
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class AsyncPoolConfig {

    @Bean("myAsynExecutor")
    public Executor createMyAsyncExecutor() {

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5); // 核心线程数
        executor.setMaxPoolSize(20); // 最大线程数
        executor.setQueueCapacity(100); // 任务队列大小
        executor.setKeepAliveSeconds(60); // 线程池存活时间
        executor.setAllowCoreThreadTimeOut(true); // 是否允许核心线程超时
        executor.setThreadNamePrefix("myAsynExecutor-"); // 线程命名前缀
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 任务队列拒绝策略-直接调用线程运行任务
        // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());  // 任务队列拒绝策略-直接拒绝并抛异常
        // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); // 任务队列拒绝策略-直接丢弃
        // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); // 任务队列拒绝策略-直接丢弃最老的任务
        executor.setWaitForTasksToCompleteOnShutdown(true); // 等待其他线程任务完成才销毁
        executor.setAwaitTerminationSeconds(60); // 线程中任务的等待时间
        executor.initialize();
        return executor;
    }

}

上面我们自定义了异步线程池,主要参数极其注释都标明了。

  • 在程序中使用我们上面自定义的异步线程,直接在异步线程注解@Async中指定我们前面定义的 bean 名称。
代码语言:java复制
@Service
public class MyServiceImpl {

    @Async("myAsynExecutor")
    public void doMyExecute() throws Exeception {
        // do you business service

    }
}
  • 需要注意,主方法与被调用的方法需要定义不同的类中,因为 spring boot 默认同一类中的方法调用不会被 AOP 拦截,会导致注解无法生效。同时,最好还是自定义一些线程的核心参数及拒绝策略,不然 springboot 会默认每次都新创建一个线程来执行异步任务,当异步调用较多且调用流程长时,线程的开销比较大,容易导致 OOM .

任务在线程池中执行的过程大致如下:

异步任务执行过程异步任务执行过程
  1. 当方法被调用后,提交一个异步任务,会进行一系列校验。首先会判断当前线程池中已有的线程数是否小于定义的核心线程数,满足条件则创建核心线程或者复用线程执行异步方法调用。
  2. 当线程池中线程数大于核心线程时,则判断任务队列是否已满,未满则放入队列中等待核心线程调度
  3. 当任务队列已满时,判断线程池中线程数是否大于定义的最大线程数,小于则创建新线程来执行异步方法调用
  4. 当任务队列已满,且线程池中线程数大于定义的最大线程数,则执行任务的拒绝策略。

最后,用一张流程图来清晰展现这个过程

异步线程执行过程异步线程执行过程

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

0 人点赞