Java中的线程池是运用场景最多的并发组件,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来至少以下几个好处:降低资源消耗、提高响应速度、提高线程可管理性和异步代码解耦等。
当提交一个新任务到线程池时,线程池的处理流程如下:
- 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
- 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
- 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤也需要获取全局锁)。
- 如果创建新线程将使当前运行的线程数超出maximumPoolSize,该任务将被拒绝,并调用相应的拒绝策略来处理
(RejectedExecutionHandler.rejectedExecution()方法,线程池默认的饱和策略是AbortPolicy,也就是抛异常)
ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor
完成预热之后(当前运行的线程数等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。
线程池任务 拒绝策略包括抛异常、直接丢弃、丢弃队列中最老的任务、将任务分发给调用线程处理。
线程池的实现主要包括2部分,一个是线程管理(这里的线程管理只包括线程计数、线程信息存储等,不包括线程的阻塞/唤醒),另一个是阻塞队列(包括线程的排队/阻塞/唤醒)。
线程池使用示例如下:
代码语言:javascript复制ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10,
60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
executor.execute(() -> System.out.println("hello world"));
public void execute(Runnable command) {
/*
* 1. 运行的线程少于corePoolSize,创建新线程,注意,在addWorker中对mainLock进行lock/unlock操作
* 2. 成功加入任务后,判断是否需要增加一个线程
* 3. 如果添加任务失败,尝试创建新线程,如果超过了maxPoolSize,(根据拒绝策略)拒绝任务
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
任务是包裹在Worker对象中的,Worker对象的run方法主要逻辑是:第一次执行任务firstTask,以后都从调用getTask从阻塞队列中获取任务来执行。也就是说,任务线程第一次启动执行任务,不是从阻塞队列中获取的,而是直接执行任务的。
注意,一个Worker对象对应一个线程(Thread),新的Worker可视为新线程,Worker继承了Runable和AQS,继承Runable这个好理解,毕竟是任务,那么为什么要继承AQS呢?从javadoc的引用中可以看出:
我们实现了一个简单的非重入互斥锁而不是使用ReentrantLock,因为我们不希望工作任务在调用setCorePoolSize等池控制方法时能够重新获取锁。
代码语言:javascript复制这句话怎么理解呢?如果是ReentrantLock,同一个线程在第二次lock/tryLock是返回true的,那么可能会发生这种场景:
-- Worker线程
-> 调用了setCorePoolSize减少coreSize
-> 对多余线程进行interruptIdleWorkers
-> 对将要interruptId的线程进行tryLock()(这里成功!)
-> Thread.interrupt(该Worker线程自己)
那么问题来了,该Workder线程自己把自己中断了!!!具体可以看下interruptIdleWorkers方法代码,这里不再赘述。
接下来看下Worker 线程执行方法:
代码语言:javascript复制// class Worker extends AbstractQueuedSynchronizer implements Runnable
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// firstTask为创建线程第一次加入进来的任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 前置回调方法
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 后置回调方法
}
} finally {
task = null;
w.completedTasks ;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
调用getTask时,调用workQueue.poll(timeout)或者workQueue.take()
,从这里看出,线程的阻塞唤醒操作是由workQueue(阻塞队列)来做的,这里的线程阻塞唤醒实现原理请参考对应资料,这里不再具体展开。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int wc = workerCountOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
至此,我们已经知道了线程池的整个执行流程,那么最后一起回顾下:
线程池的实现主要包括2部分,一个是线程管理(这里的线程管理只包括线程计数、线程信息存储等,不包括线程的阻塞/唤醒),另一个是阻塞队列(包括线程的排队/阻塞/唤醒)。 线程池的任务是包裹在Worker对象中的,Worker对象的run方法主要逻辑是:第一次执行任务firstTask,以后都从调用getTask从阻塞队列中获取任务来执行。也就是说,任务线程第一次启动执行任务,不是从阻塞队列中获取的,而是直接执行任务的。后续线程的等待唤醒都是基于阻塞队列来的。