五、主流程
execute()方法
ThreadPoolExecutor的顶级父类是Executor接口,它只有一个方法就是execute(),我们也就是通过它来向线程池提交任务去执行的。
代码语言:javascript复制// 提交一个任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果小于核心线程数,创建Worker,并启动里面的Thread
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果处于RUNNING态,将任务放入队列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 二次状态检查,非RUNNING态,从队列移除
if (! isRunning(recheck) && remove(command))
// 执行拒绝策略
reject(command);
// 如果线程池为0了,重新创建一个新的线程
// 为什么会这样?因为设置allowCoreThreadTimeOut,核心线程因空闲全部回收了
else if (workerCountOf(recheck) == 0)
// 创建Worker,并启动里面的Thread,为什么传null,线程启动后会自动从阻塞队列拉任务执行
addWorker(null, false);
}
// 尝试往最大线程数创建线程
else if (!addWorker(command, false))
reject(command);
}
- 若当前线程数小于corePoolSize,则创建一个新的线程来执行任务
- 若当前线程数大于等于corePoolSize,且阻塞队列未满,则将任务添加到队列中
- 如果阻塞队列已满,但当前线程数小于maximumPoolSize,则创建一个“临时”线程来执行任务
- 若当前线程数大于等于maximumPoolSize,且阻塞队列已满,此时会执行拒绝策略
注意点:
- 在往队列中添加任务后会对线程池状态 double check,这是因为在并发情况下,从上次判断线程池状态到现在线程池可能会被关闭,由于线程池关闭后不能再继续添加任务了,此时就需要回滚刚才的添加任务到队列中的操作,并执行拒绝策略
- addWorker(null, false),只是创建一个新的Thread,但是没有传入任务,这是因为前面已经将任务添加到队列中了
addWorker()方法
addWorker 方法主要是创建一个Thread并封装到Worker中。Worker实现了Runnable接口,本身也是一个线程任务。
代码语言:javascript复制 private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
....
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
// 用Worker自身任务做为入参,构造Thread
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
.....
}
该方法接收两个参数firstTask和core,firstTask参数用于指定新增的线程执行的第一个任务,如果firstTask为空的话只创建线程。
core参数:
- true,表示新增线程时,判断当前线程数是否少于corePoolSize
- false,表示新增线程时,判断当前线程数是否少于maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果队列为空,跳出
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 线程数
int wc = workerCountOf(c);
// 如果线程数超过限制,跳出
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// ctl 线程计数 1 。成功,跳出最外层的for循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 计数失败,判断状态是否改变,如果改变,重新执行最外层的for循环
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 构建 Worker 对象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 把Worker放入HashSet集合,后面关闭时,线程中断会用到
workers.add(w);
int s = workers.size();
// 记录,历史上曾经创建过的最大线程数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 将Thread 启动起来
t.start();
workerStarted = true;
}
}
} finally {
// 如果添加Worker失败,把Worker从HashSet集合移除,并对线程计数减1
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
重点:
Worker本身实现了Runnable接口,t.start() 这个语句启动时,会调用Worker类中的run方法。内部调用runWorker()方法,开限循环模式从阻塞队列中拉取任务来执行。
runWorker()方法
代码语言:javascript复制final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果有firstTask,先执行。否则从阻塞队列拉取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 二次检查,如果状态停止,确保线程是中断的
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 前置扩展
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 最最最核心,Runnable任务执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 后置扩展
afterExecute(task, thrown);
}
} finally {
// task复位
task = null;
// Worker中已完成的任务计数 1
w.completedTasks ;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 清理 Worker
processWorkerExit(w, completedAbruptly);
}
}
分析源码可以得知runWorker方法的执行过程:
- while循环不断地通过getTask()方法,从阻塞队列拉取任务;
- 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
- 前置扩展
- 调用task.run()执行任务;
- 后置扩展
- 如果task为null,则跳出循环,执行processWorkerExit()方法;
- runWorker方法执行完毕,代表着Worker中的run方法执行完毕,Runnable任务执行完毕,然后线程销毁。
processWorkerExit()方法
代码语言:javascript复制private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 突然结束,completedAbruptly为true,会对线程计数减1。
// 对于因没有任务而结束,completedAbruptly为false,getTask()方法中会执行减1操作
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将Worker中记录的已完成任务数,合并到ThreadPoolExecutor的全局字段中
completedTaskCount = w.completedTasks;
// 把Worker从HashSet集合移除
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
// 二次检查,如果RUNNING或SHUTDOWN,非正常结束,需要重新创建线程,容错
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}
getTask()方法
getTask方法用于从阻塞队列中获取任务,源码如下
代码语言:javascript复制private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果rs >=STOP或者(rs 为 SHUTDOWN且队列为空)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 线程计数减1
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 标记:大于核心线程数 或 允许核心线程被回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 线程计数减1
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 采用超时方式来获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 采用阻塞方式来获取任务
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
- getTask方法首先对线程池状态进行判断,如果线程池为非RUNNING状态且满足以下条件,则将workerCount减1并返回null
- 1、rs >= STOP
- 2、rs 为 SHUTDOWN且队列为空。
- 说明:当线程池状态为SHUTDOWN或以上时,不允许再往队列中添加任务。
- timed变量用来判断是否进行超时控制
- allowCoreThreadTimeOut默认是false,当线程数量降到corePoolSize时,会采用阻塞方式从队列拉取任务
- 其它情况,采用超时方式来获取任务
- 如果达到keepAliveTime最大空闲时间,仍拿不到任务,线程计数减1,返回null
- 如果设置allowCoreThreadTimeOut为true,空闲时,线程池数最小可能会为0
advanceRunState()方法
更改线程池状态
代码语言:javascript复制private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
// 如果当前状态在目标状态之后
if (runStateAtLeast(c, targetState) ||
// 将目标状态 线程数,合成一个字段
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
- 如果当前状态在目标状态之后,跳出循环,不做任何处理
- 否则,将目标状态 线程数,合成一个字段,更新到ctl
六、线程池关闭
关闭线程池。他们的原理是遍历线程池的工作线程,然后逐个调用线程的interrupt方法来中断线程。
shutdown()方法
代码语言:javascript复制public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 标记线程池状态为 SHUTDOWN
advanceRunState(SHUTDOWN);
// 把所有空闲线程中断
interruptIdleWorkers();
// 空实现,为ScheduledThreadPoolExecutor预留的扩展
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdownNow()方法
代码语言:javascript复制public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 标记线程池状态为 STOP
advanceRunState(STOP);
// 把所有线程中断
interruptWorkers();
// 把阻塞队列中的所有任务提取到List集合中,并返回
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
区别:
- shutdown方法,不再接收新的任务,已提交的任务会执行完
- shutdownNow方法,比较粗暴,它将尝试中断所有运行中的任务,并且不再启动队列中尚未开始执行的任务。
只要调用了这两个关闭方法中的任意一个,isShutdown()方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminated()方法会返回true。
采用哪一种方法来关闭线程池,由业务特性决定,大部分是采用shutdown()方法来关闭线程池。如果任务不强求一定要执行完,可以调用shutdownNow()方法。
七、扩展
ThreadPoolExecutor提供扩展方法:通过继承ThreadPoolExecutor
,重写beforeExecute、afterExecute、terminated方法。在执行任务的线程中将调用beforeExecute和afterExecute等方法,在这些方法中还可以添加日志、计时、监视或者统计信息收集的功能。
public class MonitorableThreadPoolExecutor extends ThreadPoolExecutor {
private ThreadLocal<Context> taskExecutionTimer = new ThreadLocal<>();
。。。
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
Context context = new Context();
context.setStartTime(System.currentTimeMillis());
taskExecutionTimer.set(context);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
Context context = taskExecutionTimer.get();
long start;
if (context != null) {
start = context.getStartTime();
} else {
start = System.currentTimeMillis();
}
long cost = System.currentTimeMillis() - start;
RedAlertLogUtils.threadPoolMonitorLog(name, getCorePoolSize(), getActiveCount(), getMaximumPoolSize(), getQueue().size(), cost);
}
。。。。
}
八、Executors 工具类方法
•newFixedThreadPool, 有固定长度(nThreads)的线程数组,忙不过来时会把任务放到无限长的队列里,这是因为LinkedBlockingQueue 默认是一个无界队列。
代码语言:javascript复制public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
•newSingleThreadExecutor,创建单线程的线程池。队列长度为无限
代码语言:javascript复制public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
•newCachedThreadPool 的 maximumPoolSize 参数值是Integer.MAX_VALUE ,因此它对线程个数不做限制,忙不过来时无限创建临时线程,闲下来时再回收。它的任务队列是SynchronousQueue,表明队列长度为 0。
代码语言:javascript复制public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
•newScheduledThreadPool,创建核心线程数为corePoolSize的延时任务线程池
代码语言:javascript复制public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
•newSingleThreadScheduledExecutor,创建核心线程数为1的延时任务线程池
代码语言:javascript复制public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}