文章目录
- 一、线程池中的 Worker ( 工作者 )
- 二、线程池中的工作流程 runWorker
- 三、线程池任务队列中获取任务 getTask
在博客 【Android 异步操作】线程池 ( 线程池 execute 方法源码解析 ) 中 , 讲解 线程池 ThreadPoolExecutor 的 execute 方法时 , 有两个重要的核心方法 ;
两个核心的操作 :
- 添加任务 : addWorker(command, true) , 第二个参数为 true 是添加核心线程任务 , 第二个参数为 false 是添加非核心线程任务 ;
- 拒绝任务 : reject(command)
在上一篇博客 【Android 异步操作】线程池 ( 线程池 reject 拒绝任务 | 线程池 addWorker 添加任务 ) 介绍了 addWorker 添加任务 , reject 拒绝任务 的源码细节 ;
本博客中介绍 Worker ( 工作者 ) 的相关源码
一、线程池中的 Worker ( 工作者 )
工作者 Worker 主要 为线程执行任务 , 维护终端控制状态 , 同时记录其它信息 ;
该类扩展了 AbstractQueuedSynchronizer , 目的是 简化 每个任务执行时 获取和释放锁的过程 ;
该操作可以防止中断用于唤醒等待任务的工作线程 , 不会中断一个正在运行的线程 ;
Worker 代码及相关注释说明 :
代码语言:javascript复制public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* 工作者类主要为线程执行任务 , 维护终端控制状态 , 同时记录其它信息 ;
* 该类扩展了 AbstractQueuedSynchronizer , 目的是简化 每个任务执行时 获取和释放锁的过程 ;
* 该操作可以防止中断用于唤醒等待任务的工作线程 , 不会中断一个正在运行的线程 ;
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* 该类不会被序列化, 提供该常量用于支持 Java 文档警告 .
*/
private static final long serialVersionUID = 6138294804551838833L;
/** 该工作者运行的线程 , 如果线程工厂创建失败 , 就为空. */
final Thread thread;
/** 线程初始任务 , 可能为空. */
Runnable firstTask;
/** 每个线程的任务计数 */
volatile long completedTasks;
/**
* 使用线程工厂 , 根据给定的初始任务 , 创建工作者
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 线程是在构造函数中 , 使用线程工厂创建的
this.thread = getThreadFactory().newThread(this);
}
/** 将主要的循环操作委托给了外部的 runWorker , 本博客下面有该方法的解析 . */
public void run() {
runWorker(this);
}
// 锁相关方法
//
// 0 代表未锁定状态 .
// 1 代表锁定状态 .
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
}
二、线程池中的工作流程 runWorker
代码语言:javascript复制 /**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 拿到了一个任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 第一次循环 task 不为空 , 直接就进入了循环体
// 第二次循环 task 为空 , 此时执行 || 后的逻辑 (task = getTask()) != null
// 该逻辑中从线程池任务队列中获取任务 , 然后执行该任务
// 此处一直循环读取线程池任务队列中的任务并执行
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks ;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
三、线程池任务队列中获取任务 getTask
getTask 从 线程池 任务队列中 获取任务 , 该方法执行 阻塞 或 定时等待 任务 , 具体执行哪个需要根据当前的配置情况 ;
这里通过 线程数 判断该线程是 核心线程 , 还是 非核心线程 ;
非核心线程 :
- 判定条件 : 如果当前执行的线程 大于 核心线程数 , 就是非核心线程
- 获取方法 : 非核心线程 调用 poll 方法从任务队列中取任务
- 线程回收 : 如果超过 keepAliveTime 时间还取不到任务 , 非核心线程 空闲时间 超过了一定时间 , 此时需要回收
核心线程 :
- 获取方法 : 如果该线程是核心线程 , 那么就会调用 take 方法 , 而不是 poll 方法
- 阻塞方法 : take 方法是阻塞的
- 不会被回收 : 核心线程不会回收 , 非核心线程超过一定时间会被回收
如果出现下面 4 中情况 , 工作者必须退出 , 该方法返回 null :
- 工作者数量超过线程池个数
- 线程池停止
- 线程池关闭 , 任务队列清空
- 该工作者等待时间超过空闲时间 , 需要被回收 ; 前提是该线程是非和核心线程 ;
getTask 相关源码 :
代码语言:javascript复制 /**
* 执行阻塞或定时等待任务 , 具体执行哪个需要根据当前的配置情况 ;
*
* 如果出现下面 4 中情况 , 工作者必须退出 , 该方法返回 null :
* 1 . 工作者数量超过线程池个数
* 2 . 线程池停止
* 3 . 线程池关闭 , 任务队列清空
* 4 . 该工作者等待时间超过空闲时间 , 需要被回收 ; 前提是该线程是非和核心线程 ;
*
* @return 返回要执行的任务 ; 如果返回空 , 说明该 工作者 Worker 必须退出 , 工作者计数 -1
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 如果 wc 大于 核心线程数 , 说明本线程是非核心线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 这里进行了时间判断
// 如果当前执行的线程 大于 核心线程数 , 就是非核心线程
// 调用 poll 方法从任务队列中取任务, 如果超过 keepAliveTime 时间还取不到任务 ,
// 非核心线程 空闲时间 超过了一定时间 , 此时需要回收
// 如果该线程是核心线程 , 那么就会调用 take 方法 , 而不是 poll 方法
// take 方法是阻塞的
// 因此核心线程不会回收 , 非核心线程超过一定时间会被回收
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}