聊聊PowerJob的ThreadPoolConfig

2024-01-23 09:32:22 浏览数 (2)

本文主要研究一下PowerJob的ThreadPoolConfig

ThreadPoolConfig

tech/powerjob/server/config/ThreadPoolConfig.java

代码语言:javascript复制
@Slf4j
@EnableAsync
@Configuration
public class ThreadPoolConfig {

    @Bean(PJThreadPool.TIMING_POOL)
    public TaskExecutor getTimingPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4);
        // use SynchronousQueue
        executor.setQueueCapacity(0);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("PJ-TIMING-");
        executor.setRejectedExecutionHandler(new NewThreadRunRejectedExecutionHandler(PJThreadPool.TIMING_POOL));
        return executor;
    }

    @Bean(PJThreadPool.BACKGROUND_POOL)
    public AsyncTaskExecutor initBackgroundPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 8);
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 16);
        executor.setQueueCapacity(8192);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("PJ-BG-");
        executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newDiscard(PJThreadPool.BACKGROUND_POOL));
        return executor;
    }

    @Bean(PJThreadPool.LOCAL_DB_POOL)
    public TaskExecutor initOmsLocalDbPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        int tSize = Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
        executor.setCorePoolSize(tSize);
        executor.setMaxPoolSize(tSize);
        executor.setQueueCapacity(2048);
        executor.setThreadNamePrefix("PJ-LOCALDB-");
        executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newAbort(PJThreadPool.LOCAL_DB_POOL));
        return executor;
    }

    /**
     * 引入 WebSocket 支持后需要手动初始化调度线程池
     */
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(Math.max(Runtime.getRuntime().availableProcessors() * 8, 32));
        scheduler.setThreadNamePrefix("PJ-DEFAULT-");
        scheduler.setDaemon(true);
        return scheduler;
    }

}

ThreadPoolConfig定义了PowerJobTimingPool、PowerJobBackgroundPool、PowerJobLocalDbPool四个线程池,以及一个taskScheduler,这里使用的是spring的ThreadPoolTaskExecutor及ThreadPoolTaskScheduler,他们都继承了ExecutorConfigurationSupport

ExecutorConfigurationSupport

org/springframework/scheduling/concurrent/ExecutorConfigurationSupport.java

代码语言:javascript复制
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory
		implements BeanNameAware, InitializingBean, DisposableBean {

	//......

	/**
	 * Calls {@code initialize()} after the container applied all property values.
	 * @see #initialize()
	 */
	@Override
	public void afterPropertiesSet() {
		initialize();
	}

	/**
	 * Set up the ExecutorService.
	 */
	public void initialize() {
		if (logger.isDebugEnabled()) {
			logger.debug("Initializing ExecutorService"   (this.beanName != null ? " '"   this.beanName   "'" : ""));
		}
		if (!this.threadNamePrefixSet && this.beanName != null) {
			setThreadNamePrefix(this.beanName   "-");
		}
		this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
	}	

	protected abstract ExecutorService initializeExecutor(
			ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);

	/**
	 * Perform a shutdown on the underlying ExecutorService.
	 * @see java.util.concurrent.ExecutorService#shutdown()
	 * @see java.util.concurrent.ExecutorService#shutdownNow()
	 */
	public void shutdown() {
		if (logger.isDebugEnabled()) {
			logger.debug("Shutting down ExecutorService"   (this.beanName != null ? " '"   this.beanName   "'" : ""));
		}
		if (this.executor != null) {
			if (this.waitForTasksToCompleteOnShutdown) {
				this.executor.shutdown();
			}
			else {
				for (Runnable remainingTask : this.executor.shutdownNow()) {
					cancelRemainingTask(remainingTask);
				}
			}
			awaitTerminationIfNecessary(this.executor);
		}
	}					
}		

ExecutorConfigurationSupport实现了InitializingBean、DisposableBean接口,在afterPropertiesSet方法执行initialize进行初始化,在destroy方法执行shutdown关闭线程池

小结

ThreadPoolConfig定义了PowerJobTimingPool、PowerJobBackgroundPool、PowerJobLocalDbPool四个线程池,以及一个taskScheduler,这里使用的是spring的ThreadPoolTaskExecutor及ThreadPoolTaskScheduler,他们都继承了ExecutorConfigurationSupport;ExecutorConfigurationSupport实现了InitializingBean、DisposableBean接口,可以自动初始化及销毁线程池。

0 人点赞