Java中线程池是运用场景最多的并发框架,几乎所有需要异步或者并发执行任务的程序都可以使用线程池。
合理使用线程池可以带来3个好处:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性:使用线程池可以进行统一分配、调优和监控。
1 线程池的使用
1.1 线程池的创建
推荐使用ThreadPoolExecutor创建线程池。
代码语言:javascript复制/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
线程池创建参数解释:
- corePoolSize(核心线程数):当线程池线程数量小于核心线程数时,即使有空闲线程也会创建线程,只有达到核心线程数时才不会创建。可以调用
prestartCoreThread
和prestartAllCoreThreads
来预先创建一个或全部核心线程。核心线程默认不会被销毁,除非主动调用allowCoreThreadTimeOut(true)
。 - maximumPoolSize(最大线程数):线程池允许的最大线程数。
核心线程数 非核心线程数=最大线程数
。如果队列满了,并且已创建的线程数小于最大线程数,则会创建新线程执行新任务。如果使用了无界队列,则该参数基本无效。 - keepAliveTime,unit(存活时间):线程池的工作线程空闲后,保持的存活时间。默认仅对非核心线程有效,除非主动调用
allowCoreThreadTimeOut(true)
。 - workQueue(工作队列):线程阻塞队列,只会存放由execute提交的Runnable任务。①ArrayBlockingQueue:基于数组的有界阻塞队列。②LinkedBlockingQueue:基于链表的阻塞队列,吞吐量通常高于ArrayBlockingQueue。③SynchronousQueue:不存储元素的阻塞队列,每个插入必须等待另一个线程调用移除操作。④PriorityBlockingQueue:具有优先级的无限阻塞队列。
- threadFactory(线程工厂):设置创建线程的工厂,创建线程池时要指定有意义的线程名称,方便出错时回溯。
- handler(拒绝策略):当队列和线程池都满,或者线程池不处于
RUNNING
状态时,会使用该策略,默认有4种实现:①AbortPolicy(默认):直接抛出RejectedExecutionException
异常。②DiscardPolicy:静默丢弃当前提交的任务。③DiscardOldestPolicy:线程池未关闭的情况下,丢弃队列中最久未被执行的任务,并执行当前任务。④CallerRunsPolicy:线程池未关闭的情况下,使用调用者的线程去执行当前任务。
1.2 提交任务
线程池提交任务两个方法。
execute
方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。submit
方法用于提交需要有返回值的任务。线程池会返回一个Future类型对象,该对象可以判断任务是否执行成功,并且可以通过Future.get()来获取返回值,get()会阻塞当前线程直到任务完成,get(long timeout, TimeUnit unit)会阻塞当前线程一段时间后立即返回。
1.3 线程池的关闭
可调用shutdown
或者shutdownNow
方法来关闭线程池。**原理:**遍历线程池中的线程,逐个调用线程的interrupt
方法来中断线程,所以无法响应中断的任务可能永远无法终止。shutdownNow
首先向线程池置为STOP
状态,然后停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表;shutdown
只是将线程池的状态设置成SHUTDOWN
状态,然后中断所有没有正在执行任务的线程,之前提交的任务(正在执行的和队列中的)会被执行。
只要调用了shutdown
或者shutdownNow
任意方法,isShutdown都会返回true。当所有任务都已关闭后,才表示线程池关闭,isTerminated方法返回true。通常调用shutdown
来关闭线程池,如果不一定要任务执行完,则可以调用shutdownNow
方法。
1.4 线程池的配置
要想合理的配置线程池,先要对任务特性进行分析。
- 任务性质:CPU密集、IO密集、混合型
- 任务执行时间:长、中、短
- 任务依赖性:是否依赖其他系统资源,如数据库连接、外部系统API调用
- 任务优先级:高、中、低
性质不同的任务可以使用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的线程,如N或N 1。IO密集型的任务并不是一直在执行任务,则应配置尽可能多的线程。可以大概预估请求等待时间(WT)和服务时间(ST)之间的比例。线程池大小设置为N*(1 WT/ST)。
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理,可以让高优先级的任务先执行。为了防止优先级低的任务可能永远都不能执行。可以将等待时间加入权重计算优先级。
建议使用有界队列,能增加系统的稳定性和预警能力。
1.5 线程池的监控
如果系统中存在大量使用线程池,则由必要对线程池进行监控,方便在出现问题时,可根据线程池使用情况进行快速定位。可根据线程池提供的参数进行监控,常用属性如下:
- getPoolSize:线程池当前线程数
- getCorePoolSize:线程池核心线程数
- getActiveCount:正在执行任务的线程数量
- getCompletedTaskCount:已完成任务的大致总数(任务和线程的状态可能在计算过程中有所变化)
- getTaskCount:全部任务的大致总数
- getQueue:当前线程池的任务队列
- getLargestPoolSize:线程池曾经最大线程数量
- getMaximumPoolSize:线程池允许最大线程数
- getKeepAliveTime:线程池线程存活时间
- isShutdown:线程池是否为关闭(SHUTDOWN状态)
- isTerminated:线程池是否为TERMINATED状态
通过扩展线程池进行监控。可以通过继承的方式来自定义线程池,重写beforeExecute(Thread t, Runnable r)
、afterExecute(Runnable r, Throwable t)
和terminated()
,可以在任务执行前、执行后和线程池关闭前执行一些代码进行监控。
class TPE extends ThreadPoolExecutor {
// 记录Runnable任务起始执行时间
private ConcurrentHashMap<Integer, Long> beginTimeMaps = new ConcurrentHashMap<>();
@Override
protected void beforeExecute(Thread t, Runnable r) {
// 设置起始时间
beginTimeMaps.put(r.hashCode(), new Date().getTime());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
// 获取起始时间
Long begin = beginTimeMaps.remove(r.hashCode());
Long end = new Date().getTime();
System.out.println(end-begin);
}
@Override
protected void terminated() {
// code
}
@Override
public void shutdown() {
// code
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
// code
return super.shutdownNow();
}
}
2 线程池原理
2.1 线程池状态及线程数
代码语言:javascript复制private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 工作线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
// 运行状态和工作线程数
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池控制状态,ctl
是一个原子integer包含两个字段:workerCount
线程池内有效线程数量,runState
线程池状态。高3位存储runState
,低29位存储workerCount
。
线程池的状态如下:
- RUNNING:接受新任务,并且能够处理队列任务
- SHUTDOWN:不接收新任务,但能处理队列任务
- STOP:不接收新任务,不处理队列任务
- TIDYING:所有任务都已终止,workCount为0,线程进入该状态后会调用
terminated()
钩子函数 - TERMINATED:
terminated()
函数已经调用完毕
2.2 关键方法
2.2.1 execute方法代码
在将来的某个时刻执行给定的任务,该任务可能被新线程执行也可能被线程池中已存在的线程执行。如果无法提交任务执行,因为执行器已经关闭或者达到最大容量,则该任务由当前的RejectedExecutionHandler
处理。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 获取线程池的runState和workerCount
int c = ctl.get();
// 若当前线程数小于核心线程数;则新建一个Worker并执行当前任务
if (workerCountOf(c) < corePoolSize) {
// 新增线程(command, core) core表示新增的是否为核心线程
if (addWorker(command, true))
return;
// 添加核心线程失败,重新获取线程池状态
c = ctl.get();
}
// 若当前线程数不小于核心线程数并且处于运行状态,则将任务放到阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
// 添加队列成功后,再次获取线程池状态
int recheck = ctl.get();
// 线程池不处于运行状态,并且成功移除任务。则执行拒绝策略
if (! isRunning(recheck) && remove(command))
// 执行拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0)
// 如果工作线程为0(线程池无线程),则新建一个无任务的线程
addWorker(null, false);
}
// 非运行状态或放队列失败时,直接拒绝策略
else if (!addWorker(command, false))
// 执行拒绝策略
reject(command);
}
2.2.2 addWorker方法
检查是否可根据当前线程池状态以及给定的边界(核心线程或最大线程)创建新的worker。firstTask
为新线程需先执行的任务,如果为null的话则不执行。core
如果为true则使用corePoolSize作为边界界定条件,为false则使用maximumPoolSize作为边界界定条件。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
// 线程池状态及线程池运行状态
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 当rs>=SHUTDOWN时 即SHUTDOWN(0)、STOP(1)、TIDYING(2)、TERMINATED(3),此状态不再接受新任务
// 当rs=SHUTDOWN时,此时可创建线程条件如下:
// 1. rs==SHUTDOWN,此时不再接受新任务,但可
// 2. firstTask为空,不可再继续提交任务
// 3. !workQueue.isEmpty(),为空的话则不再需要新线程
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 当前工作线程数
int wc = workerCountOf(c);
// 当线程数达到CAPACITY,
// 或者core为true时达到核心线程数,
// 或者core为false时达到最大线程数,
// 不再创建新线程
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 使用CAS增加workerCount
if (compareAndIncrementWorkerCount(c))
// 成功修改workerCount,跳出最外层for循环
break retry;
// 重新获取线程池状态
c = ctl.get(); // Re-read ctl
// 线程池状态已被修改,继续外传for循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根据firstTask创建新worker
w = new Worker(firstTask);
// 拿到当前worker的线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// rs<SHUTDOWN即为RUNNING,线程池处于运行状态
// rs==SHUTDOWN时,因为可以继续执行队列中的任务,故允许添加无任务的worker
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// 更新getLargestPoolSize,线程池中出现过最大线程数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// worker添加成功,启动线程
t.start();
workerStarted = true;
}
}
} finally {
// worker启动失败,则roll back cleanly.
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
t.start()
即为worker.thread.start()
。Worker
类本身实现了Runnable
接口,在Worker初始化时,会执行this.thread = getThreadFactory().newThread(this);
。也就是t.start()-->worker.run()
。
2.2.3 Worker类
线程池中的线程都会被封装成Worker实例,Worker继承于AbstractQueuedSynchronizer(AQS)
,实现了Runnalbe
接口。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
// ThreadFactory.newThread创建的线程实例
final Thread thread;
/** Initial task to run. Possibly null. */
// 要运行的初始任务,可能为空
Runnable firstTask;
/** Per-thread task counter */
// 每个线程都会记录的完成任务数
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 禁止中断,直到runWorker方法执行
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 通过ThreadFactory创建线程实例,Worker自身作为Runnable来创建线程实例
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
// t.start() 将会调用该方法
public void run() {
// 执行任务核心方法
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
2.2.4 runWorker方法
主工作线程循环,不断的从队列获取任务并且执行他们。
代码语言:javascript复制final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// unlock 将state由-1置为0。
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 初始任务不为null,或者从队列中获取到了任务
while (task != null || (task = getTask()) != null) {
// worker加锁
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 若线程池正在停止,要保证当前线程是中断状态
// 若不是的话,则要保证当前线程不是中断状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 钩子函数,任务执行前执行
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务 =======>
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 钩子函数,任务执行后执行
afterExecute(task, thrown);
}
} finally {
// 任务执行完,清空task任务
task = null;
// 已完成任务 1
w.completedTasks ;
// worker解锁
w.unlock();
}
}
// 线程正常退出
completedAbruptly = false;
} finally {
// 线程退出 completedAbruptly标识正常退出/非正常退出
processWorkerExit(w, completedAbruptly);
}
}
Thread.interrupted()
判断线程是否中断,同时复位中断状态。
runWorker()方法的执行流程:
- while循环调用getTask()方法,从任务队列中获取任务(Worker新建时可能有firstTask)
- 若线程池正在停止,要保证当前线程是中断状态,否则要保证当前线程不是中断状态
- 调用
task.run()
执行任务 - 若task为null则退出循环,执行
processWorkerExit
runWorker
执行完,即代表Worker中run方法执行完毕,销毁线程
2.2.5 getTask方法
根据当前配置,阻塞或者超时等待任务。发生以下情况时,会返回null
。task为null时,则销毁线程。
- 当前线程池线程数超过最大线程数(maximumPoolSize)。调用
setMaximumPoolSize
方法。 - 线程池处于
STOP
状态。调用了shutdownNow
方法。 - 线程池处于
SHUTDOWN
状态,并且workQueue
队列为空。 - 当前worker获取任务等待超时(有可能是
allowCoreThreadTimeOut
或workerCount > corePoolSize
)。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// 获取当前线程池状态、运行状态
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// rs>=STOP,线程池调用`shutdownNow`后,不再处理新任务
// rs>=SHUTDOWN,workQueue消费完后,不再接受新任务
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 当前工作线程数
int wc = workerCountOf(c);
// Are workers subject to culling?
// 是否存在超时校验标识
// `allowCoreThreadTimeOut`允许核心线程超时 或 工作线程数大于核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// wc>maximumPoolSize,可能调用了`setMaximumPoolSize`,修改了最大线程数
// timed&&timedOut,当前线程需要进行超时控制,并且上次发生了超时
// 线程数大于1,或者阻塞队列为空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 尝试扣减工作线程
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 是否需要进行超时判断 timed
// 若为true,则需要进行超时判断,通过阻塞队列的poll方法来进行超时控制,超时则返回null
// 若为false,则通过take获取,阻塞队列直到workQueue不为空
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// r为null,已经超时,设置标识位
timedOut = true;
} catch (InterruptedException retry) {
// 发生了中断,未发生超时,设置标识位
timedOut = false;
}
}
}
getTask
方法返回null时,跳出循环,然后执行processWorkerExit
方法进行退出。
2.2.6 processWorkerExit方法
代码语言:javascript复制private void processWorkerExit(Worker w, boolean completedAbruptly) {
// completedAbruptly是否出现异常,将workerCount减1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 已完成任务数
completedTaskCount = w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
// 若线程处于RUNNING或SHUTDOWN状态时,若worker异常结束,则直接添加空任务worker
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 允许核心线程超时时,如果阻塞队列不为空,则至少保留一个worker
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 若不允许核心线程池超时,则workerCount不少于corePoolSize
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
2.2.7 shutdown方法
代码语言:javascript复制public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 安全策略校验
checkShutdownAccess();
// 切换状态为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断空闲线程
interruptIdleWorkers();
// 钩子
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试结束线程池
tryTerminate();
}
2.2.8 shutdownNow方法
代码语言:javascript复制public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
// 中断所有线程(空闲、非空闲)
interruptWorkers();
// 取出队列中所有未被执行的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
3 几种常见线程池
3.1 Exectors线程池
Executors
提供了几个静态方法来创建线程池。
3.1.1 newFixedThreadPool
代码语言:javascript复制public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
核心线程数量和总线程数相同,都是传入的参数nThreads,所以只能创建核心线程。因为LinkedBlockingQueue
的默认大小是Integer.MAX_VALUE
,故核心线程空闲则由其处理,否则入队等待直到核心线程空闲。keepAliveTime
设置为0L,多余的线程将会被立即停止。
适用于为满足资源管理需求,需要限制当前线程数量的应用场景,适用于负载比较高的服务器。
3.1.2 newSingleThreadExecutor
代码语言:javascript复制public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
核心线程数和最大线程数都为1,使用无界队列。任意时刻最多只有一个线程执行任务,多余任务会被缓冲至队列中。
适用于保证顺序的执行各任务;并且在任意时间点,不会有多线程是活动的应用场景。
3.1.3 newCachedThreadPool
代码语言:javascript复制public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心线程数为0,最大线程数为Integer.MAX_VALUE
,keepAliveTime
为60秒,空闲线程超时将会被终止。阻塞队列SynchronousQueue
是一个没有容量的阻塞队列,插入数据时必须等待一个线程来获取数据、否则就会阻塞。没有了队列的缓冲,提交的任务会被尽快的分配线程执行。
适用于很多短期的异步任务的小程序,或者是负载较轻的服务器。
3.1.4 newScheduledThreadPool
代码语言:javascript复制public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
支持定时或周期性的任务执行。阻塞队列使用DelayedWorkQueue
。
适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程数量的应用场景。
3.2 Exectors弊端
newFixedThreadPool
和newSingleThreadExecutor
:阻塞队列使用LinkedBlockingQueue
,其默认容量为Integer.MAX_VALUE
,若任务处理较慢,则会引起消息堆积问题,消耗大量内存甚至触发OOM。newCachedThreadPool
和newScheduledThreadPool
:最大线程数为Integer.MAX_VALUE
,可能会创建很多的线程,甚至导致OOM。
4 线程池注意事项
- 自定义线程工厂
ThreadFactory
,指定有意义的线程名称,方便出错时回溯。 - 使用
Exectors
时,避免出现任务堆积
或线程堆积
情况。最好使用ThreadPoolExecutor
显示的创建线程池。 - 若线程池中使用到
ThreadLocal
,必须主动回收。 - 最好有有效的监控、日志等记录信息。方便异常处理。
- 线程池大小设置要根据任务类型进行设置,根据任务运行情况、系统负载、资源利用率进行调整。