一、基本概念
1 作用
线程池(Thread Pool)是一种基于池化思想管理线程的工具,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
2 线程池状态
3 线程池类关系
二、线程池
1 数据结构
代码语言:java复制public class ThreadPoolExecutor extends AbstractExecutorService {
//ctl是原子类型,32位,高3位是线程池状态,低29位是线程数量。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
//任务队列,不受mainlock保护,需要使用一个线程安全的容器
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
//线程
private final HashSet<Worker> workers = new HashSet<Worker>();
private final Condition termination = mainLock.newCondition();
/**
* Tracks largest attained pool size. Accessed only under
* mainLock.
*/
private int largestPoolSize;
/**
* Counter for completed tasks. Updated only on termination of
* worker threads. Accessed only under mainLock.
*/
private long completedTaskCount;
private volatile ThreadFactory threadFactory;
//当任务队列已满时采取的拒绝策略
private volatile RejectedExecutionHandler handler;
//非核心线程最大空闲时间,超过就终止
private volatile long keepAliveTime;
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;
//核心线程数量,allowCoreThreadTimeOut默认为false,核心线程常驻内存
private volatile int corePoolSize;
//最大线程数量
private volatile int maximumPoolSize;
/**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
}
2 execute
该方法继承自Execute接口,用于向线程池提交任务。
代码语言:java复制public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//1 判断线程数量是否达到核心线程数,如果未达到就添加线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) //添加线程
return;
c = ctl.get();
}
//2 判断任务队列是否已满,如果未满则入队,如果已满则尝试添加线程来处理
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//线程池此时非运行态,不接受任务,移除任务并拒绝
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) //任务队列未满,
reject(command);//3 线程达到最大线程数,此时执行拒绝策略
}
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
添加线程需要提供该线程执行的task,并指示是添加核心线程还是普通线程,这决定了是与核心线程数比较还是与最大线程数比较。添加核心线程addWorker(command, true)
,添加普通线程addWorker(command, false)
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//如果线程数量达到上限,或者达到核心线程数/最大线程数,则返回false,
//由上层执行入队或者拒绝策略
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//线程数加1并跳出双层for循环,往后执行创建worker流程
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//线程池状态发生改变,回到方法开头的"retry:"处往下重新执行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//线程池正常运行时,将新线程添加到workers中
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//此时thread还未执行start,不应该是alive状态
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;//largestPoolSize不影响线程池大小,只是记录
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//启动新线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//线程启动失败时收尾
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
//线程启动失败,需要将ctl中线程数减1,从workers中移除该线程
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
这个方法中的"retry:"是标签,类似于goto,可以无条件跳转,慎用?。 一般Java中的标签用于多层循环,可以从内层循环一次跳出所有循环。 continue retry表示跳到标签位置处,重新执行。 break retry表示跳出多层循环,然后继续往下执行,不需要回到"retry:"位置。
3 线程池关闭
3.1 shutdown
代码语言:java复制public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检测线程方法权限
checkShutdownAccess();
//将ctl修改为SHUTDOWN状态
advanceRunState(SHUTDOWN);
//将现有的线程中断
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//先与Worker线程抢锁,确保Worker线程没有处于某个任务执行中的状态
//将Worker线程中断
//worker线程每次从workQueue拿取任务后都会加锁,处理完任务再释放锁
//所以Worker线程没抢到锁就认为它是空闲的,疑惑???
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
3.2 shutdownNow
代码语言:java复制public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//将ctl设置为STOP状态
advanceRunState(STOP);
//中断所有Worker线程,不管是否处于正在执行task中
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
三、Worker
1 数据结构
工作者,封装了线程,当没有空闲核心线程且未达到最大线程数时就会创建新的worker。
Worker实现了Runnable接口,是thread执行的主任务,这个任务的主要逻辑是不断从任务队列拿task执行,并检测线程池状态,如果线程池为STOP/TIDYING状态,就调用interrupt中断自己。
代码语言:java复制private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
}
//构造器,初始化时state=-1,禁止interrupt
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
2 run
代码语言:java复制final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//Worker继承了AQS
w.unlock(); // 构造器中初始时将state=-1,防止interrupt
boolean completedAbruptly = true;
try {
//不断从任务队列取任务执行
while (task != null || (task = getTask()) != null) {
w.lock();
//线程池状态>=STOP,不得再处理任务,包括进队的任务,此时线程主动中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();//线程主动设置中断标志
try {
beforeExecute(wt, task);//钩子方法,未实现
Throwable thrown = null;
try {
task.run();//执行具体任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks ;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
3 为什么继承AQS
Worker继承了AQS,实现了一个非阻塞、不可中断的锁。当外部缩小线程池核心线程数时,线程池会interrupt现有workers列表所有线程。为了安全,应该等线程处理完当前正在执行的task后才能interrupt,所以外部线程需要和Worker内线程抢锁,获取锁后再中断Worker内线程。
代码语言:java复制public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
//如果核心线程数变小,则中断空闲线程
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
//如果核心线程数变大,则提前启动新线程
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
四、拒绝策略
当任务队列已满,线程数量达到最大时,对后续到达的任务采用预定的策略处理。
代码语言:java复制final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
官方实现了四种常见策略,如下:
代码语言:java复制//抛出异常
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " r.toString()
" rejected from "
e.toString());
}
}
//直接丢弃,置之不理
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
//丢弃最旧的任务空出位置,然后将该任务重新投入线程池
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
//由线程池外的调用者线程直接执行该任务
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}