Spring Boot多任务并发处理

2022-09-21 10:11:39 浏览数 (1)

Spring Boot多任务并发处理

创建公共线程池

代码语言:javascript复制
@ConfigurationProperties(prefix = "common-thread-pool")
@Data
@Slf4j
@Configuration
@EnableAsync
public class CommonThreadPoolConfig implements AsyncConfigurer {
    public static final String COMMON_THREAD_POOL_BEAN_NAME = "commonThreadPoolTaskExecutor";
    /**
     * 核心线程大小
     */
    private int corePoolSize;
    /**
     * 最大线程大小
     */
    private int maxPoolSize;
    /**
     * 非核心线程存活时间,单位:秒
     */
    private int keepAliveSeconds;
    /**
     * 等待队列容量
     */
    private int queueCapacity;
    /**
     * 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
     */
    private int awaitTerminationMillis = 60;
    /**
     * 等待任务在关机时完成--表明等待所有线程执行完
     */
    private boolean waitForJobsToCompleteOnShutdown = Boolean.TRUE;
    /**
     * 线程名称前缀
     */
    private String threadNamePrefix = "common-thread-pool-";
    /**
     * rejection-policy:当pool已经达到max size的时候,如何处理新任务
     * CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
     */
    private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();

    @Bean(COMMON_THREAD_POOL_BEAN_NAME)
    public ThreadPoolTaskExecutor commonThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        threadPool.setCorePoolSize(corePoolSize);
        threadPool.setMaxPoolSize(maxPoolSize);
        threadPool.setQueueCapacity(queueCapacity);
        threadPool.setWaitForTasksToCompleteOnShutdown(waitForJobsToCompleteOnShutdown);
        threadPool.setAwaitTerminationSeconds(awaitTerminationMillis);
        threadPool.setKeepAliveSeconds(keepAliveSeconds);
        threadPool.setThreadNamePrefix(threadNamePrefix);
        threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        threadPool.initialize();
        return threadPool;
    }

    /**
     * 设置默认线程池
     * 使用注解{@link Async}不加参数指定线程池时,就用公共线程池
     *
     * @return java.util.concurrent.Executor
     * @author yuyouyang
     * @date 2022/1/3 下午7:22
     * @version
     */
    @Override
    public Executor getAsyncExecutor() {
        return commonThreadPoolTaskExecutor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncExceptionHandler();
    }

    /**
     * 自定义异常处理类
     *
     * @author liunh
     */
    class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
        @Override
        public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {
            log.error("common-thread-pool exception message - {}", throwable.getMessage());
            log.error("common-thread-pool exception method name - {}", method.getName());
            log.error("common-thread-pool exception parameter value - {}", JSON.toJSONString(obj));
        }
    }
}

配置yml

代码语言:javascript复制
common-thread-pool:
  corePoolSize: 50
  maxPoolSize: 200
  keepAliveSeconds: 300
  queueCapacity: 10
  • 因为这个公共线程池的使用场景是IO密集型,故核心线程数、最大线程数都设置比较大。但等待队列容量设置比较小,是为了不让调用者等待太久,更容易去使用非核心线程数。

多任务并发

代码语言:javascript复制
@Resource(name = CommonThreadPoolConfig.COMMON_THREAD_POOL_BEAN_NAME)
private ThreadPoolTaskExecutor commonThreadPoolTaskExecutor;
代码语言:javascript复制
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> aService.saveBatch(aToSave),                                                                           commonThreadPoolTaskExecutor);
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> bService.saveBatch(bToSave),                                                                           commonThreadPoolTaskExecutor);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> cService.saveBatch(cToSave),                                                                           commonThreadPoolTaskExecutor);
CompletableFuture.allOf(future, future1, future2);

异步调用

发送邮件

代码语言:javascript复制
@Async(CommonThreadPoolConfig.COMMON_THREAD_POOL_BEAN_NAME)
public void send(String receiverName, String receiverAccount, String subject, String content) {}

0 人点赞