详解Java线程池参数

2022-09-06 10:26:48 浏览数 (1)

大家好,又见面了,我是你们的朋友全栈君。

详解Java线程池参数

目前线程池的类一般使用

  • spring的:org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
  • JDK的:java.util.concurrent.ThreadPoolExecutor

它们的配置差不多,spring的做了一些配置参数的简化,最终调用JDK的API 参考资料:https://blog.csdn.net/zhouhl_cn/article/details/7392607

相关概念

概念比喻

  • 线程池(thread pool) ===>工厂
  • 线程(thread) ===>工人,属于某个工厂,被工厂所管理
  • 任务(task) ===>待工人处理的事情,即实现Runnable或Callable的类

线程池行为比喻

比喻的例子

  1. 小王(任务)去银行(线程池)办理业务,银行刚开始营业,窗口服务员还未就位(初始线程数是0)
  2. 第二行:于是经理(线程池管理者)催促柜台人员到1号窗接待(创建线程),于是小王被安排到1号窗办理业务
  3. 第三行:接着小张来办理业务,小王还没办完呢,轮不到小张,该银行总共就2个窗口(coePoolSize是2),于是经理又催促另一个窗口服务员到2号窗接待(又创建线程),小张也开始办理业务
  4. 紧接着小李(又一个任务)也来了,前面两人还没办理完呢.银行有座位1张(队列size是1),还空着呢,于是经理安排小李到座位上等候,并告知他: 如果1、2号窗空出,小李就可以前去
  5. 很不幸,窗口满了,座位也满了,这时小赵又到了银行,经理于是安排临时工(corePoolSize外的线程)在大堂站着,手持pad设备给小赵办理业务
  6. 银行业务真忙,小周又来了,经理苦呀,窗口满了,临时工也上了,座位也满了(达到了maxPoolSize),于是只能按《超出银行最大接待能力处理办法》(拒绝策略)拒绝小周办理业务
  7. 忙了一天,进来办业务的人终于少了,临时工闲了2小时(keepAliveTime),经理见他没事做,让他下班去.
  8. 由于银行规定之《正式员工闲着处理办法》(是否清理corePoolSize线程开关),即使正式工闲着,也不得提前下班,所以1、2号窗的正式工继续待着(池内保持corePoolSize个线程),经理没有办法让他们早下班

线程池参数

说明:maxPoolSize / maximumPoolSize 的意思是,spring的线程池叫maxPoolSize,而JDK线程池叫maximumPoolSize,等价

参数调优

参数如何设置跟系统的负载有直接的关系,假设下面的参数表示目前的系统负载: tasks,每秒需要处理的最大任务数量 tasktime,处理第个任务所需要的时间 responsetime,系统允许任务最大的响应时间,比如每个任务的响应时间不得超过2秒

  • corePoolSize: 每个任务需要tasktime秒处理,则每个线程每钞可处理1/tasktime个任务。系统每秒有tasks个任务需要处理,则需要的线程数为:tasks/(1/tasktime),即tasks*tasktime个线程数。假设系统每秒任务数为1001000,每个任务耗时0.1秒,则需要100*0.1至1000*0.1,即10100个线程。那么corePoolSize应该设置为大于10,具体数字最好根据8020原则,即80%情况下系统每秒任务数,若系统80%的情况下第秒任务数小于200,最多时为1000,则corePoolSize可设置为20
  • queueCapacity: 任务队列的长度要根据核心线程数,以及系统对任务响应时间的要求有关。队列长度可以设置为(corePoolSize/tasktime)*responsetime: (20/0.1)*2=400,即队列长度可设置为400 队列长度设置过大,会导致任务响应时间过长,切忌以下写法: LinkedBlockingQueue queue = new LinkedBlockingQueue(); 这实际上是将队列长度设置为Integer.MAX_VALUE,将会导致线程数量永远为corePoolSize,再也不会增加,当任务数量陡增时,任务响应时间也将随之陡增
  • maxPoolSize: 当系统负载达到最大值时,核心线程数已无法按时处理完所有任务,这时就需要增加线程。每秒200个任务需要20个线程,那么当每秒达到1000个任务时,则需要(1000-queueCapacity)*(20/200),即60个线程,可将maxPoolSize设置为60
  • keepAliveTime: 线程数量只增加不减少也不行。当负载降低时,可减少线程数量,如果一个线程空闲时间达到keepAliveTiime,该线程就退出。默认情况下线程池最少会保持corePoolSize个线程
  • allowCoreThreadTimeout: 默认情况下核心线程不会退出,可通过将该参数设置为true,让核心线程也退出。 以上关于线程数量的计算并没有考虑CPU的情况。若结合CPU的情况,比如,当线程数量达到50时,CPU达到100%,则将maxPoolSize设置为60也不合适,此时若系统负载长时间维持在每秒1000个任务,则超出线程池处理能力,应设法降低每个任务的处理时间(tasktime)

线程池使用代码示例

使用spring的项目,一般如下配置线程池,整个项目使用共同的线程池,避免各自创建线程,代码如下

代码语言:javascript复制
// 在项目中配置线程池 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
@Configuration
public class ThreadPoolConfig { 
   
    @Bean
    public ThreadPoolTaskExecutor springThreadPool() { 
   
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(50);// 核心线程数,初始是0,任务进来则创建,任务持续进来,最大达到该值
        threadPoolTaskExecutor.setMaxPoolSize(100);// 线程池维护线程的最大数量,必须在队列满后才会继续增长最大至maxPoolSize
        threadPoolTaskExecutor.setQueueCapacity(800);// 缓存队列,当线程数达到maxPoolSize且队列已满,再进来任务会被拒绝,处理方式见RejectedExecutionHandler
        threadPoolTaskExecutor.setKeepAliveSeconds(200);// 空闲多久就清理线程
        threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);// 是否清理空闲的核心线程

		// ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常(默认)
		// ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
		// ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
		// ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
        RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
        threadPoolTaskExecutor.setRejectedExecutionHandler(rejectedExecutionHandler);//对拒绝task的处理策略
        return threadPoolTaskExecutor;
    }
}

附:参考的源码

下面是JDK线程池源码,允许自由指定缓冲队列的大小和类型(workQueue)

代码语言:javascript复制
// 源码java.util.concurrent.ThreadPoolExecutor
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */
public ThreadPoolExecutor(int corePoolSize,
						  int maximumPoolSize,
						  long keepAliveTime,
						  TimeUnit unit,
						  BlockingQueue<Runnable> workQueue,
						  ThreadFactory threadFactory,
						  RejectedExecutionHandler handler) { 
   
	if (corePoolSize < 0 ||
		maximumPoolSize <= 0 ||
		maximumPoolSize < corePoolSize ||
		keepAliveTime < 0)
		throw new IllegalArgumentException();
	if (workQueue == null || threadFactory == null || handler == null)
		throw new NullPointerException();
	this.corePoolSize = corePoolSize;
	this.maximumPoolSize = maximumPoolSize;
	this.workQueue = workQueue;
	this.keepAliveTime = unit.toNanos(keepAliveTime);
	this.threadFactory = threadFactory;
	this.handler = handler;
}

下面是spring线程池关于使用的缓冲队列类型的源码,可以看到当传入的queueCapacity大于0时使用LinkedBlockingQueue类型,默认queueCapacity值是Integer.MAX_VALUE,即默认是该类型

代码语言:javascript复制
/** * Create the BlockingQueue to use for the ThreadPoolExecutor. * <p>A LinkedBlockingQueue instance will be created for a positive * capacity value; a SynchronousQueue else. * @param queueCapacity the specified queue capacity * @return the BlockingQueue instance * @see java.util.concurrent.LinkedBlockingQueue * @see java.util.concurrent.SynchronousQueue */
protected BlockingQueue<Runnable> createQueue(int queueCapacity) { 
   
	if (queueCapacity > 0) { 
   
		return new LinkedBlockingQueue<Runnable>(queueCapacity);
	}
	else { 
   
		return new SynchronousQueue<Runnable>();
	}
}

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/135033.html原文链接:https://javaforall.cn

0 人点赞