Spring Boot多任务并发处理
创建公共线程池
代码语言:javascript复制@ConfigurationProperties(prefix = "common-thread-pool")
@Data
@Slf4j
@Configuration
@EnableAsync
public class CommonThreadPoolConfig implements AsyncConfigurer {
public static final String COMMON_THREAD_POOL_BEAN_NAME = "commonThreadPoolTaskExecutor";
/**
* 核心线程大小
*/
private int corePoolSize;
/**
* 最大线程大小
*/
private int maxPoolSize;
/**
* 非核心线程存活时间,单位:秒
*/
private int keepAliveSeconds;
/**
* 等待队列容量
*/
private int queueCapacity;
/**
* 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
*/
private int awaitTerminationMillis = 60;
/**
* 等待任务在关机时完成--表明等待所有线程执行完
*/
private boolean waitForJobsToCompleteOnShutdown = Boolean.TRUE;
/**
* 线程名称前缀
*/
private String threadNamePrefix = "common-thread-pool-";
/**
* rejection-policy:当pool已经达到max size的时候,如何处理新任务
* CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
*/
private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
@Bean(COMMON_THREAD_POOL_BEAN_NAME)
public ThreadPoolTaskExecutor commonThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
threadPool.setCorePoolSize(corePoolSize);
threadPool.setMaxPoolSize(maxPoolSize);
threadPool.setQueueCapacity(queueCapacity);
threadPool.setWaitForTasksToCompleteOnShutdown(waitForJobsToCompleteOnShutdown);
threadPool.setAwaitTerminationSeconds(awaitTerminationMillis);
threadPool.setKeepAliveSeconds(keepAliveSeconds);
threadPool.setThreadNamePrefix(threadNamePrefix);
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
threadPool.initialize();
return threadPool;
}
/**
* 设置默认线程池
* 使用注解{@link Async}不加参数指定线程池时,就用公共线程池
*
* @return java.util.concurrent.Executor
* @author yuyouyang
* @date 2022/1/3 下午7:22
* @version
*/
@Override
public Executor getAsyncExecutor() {
return commonThreadPoolTaskExecutor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncExceptionHandler();
}
/**
* 自定义异常处理类
*
* @author liunh
*/
class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {
log.error("common-thread-pool exception message - {}", throwable.getMessage());
log.error("common-thread-pool exception method name - {}", method.getName());
log.error("common-thread-pool exception parameter value - {}", JSON.toJSONString(obj));
}
}
}
配置yml
代码语言:javascript复制common-thread-pool:
corePoolSize: 50
maxPoolSize: 200
keepAliveSeconds: 300
queueCapacity: 10
- 因为这个公共线程池的使用场景是IO密集型,故核心线程数、最大线程数都设置比较大。但等待队列容量设置比较小,是为了不让调用者等待太久,更容易去使用非核心线程数。
多任务并发
代码语言:javascript复制@Resource(name = CommonThreadPoolConfig.COMMON_THREAD_POOL_BEAN_NAME)
private ThreadPoolTaskExecutor commonThreadPoolTaskExecutor;
代码语言:javascript复制CompletableFuture<Void> future = CompletableFuture.runAsync(() -> aService.saveBatch(aToSave), commonThreadPoolTaskExecutor);
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> bService.saveBatch(bToSave), commonThreadPoolTaskExecutor);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> cService.saveBatch(cToSave), commonThreadPoolTaskExecutor);
CompletableFuture.allOf(future, future1, future2);
异步调用
发送邮件
代码语言:javascript复制@Async(CommonThreadPoolConfig.COMMON_THREAD_POOL_BEAN_NAME)
public void send(String receiverName, String receiverAccount, String subject, String content) {}