线程池
【死磕Java并发】—–J.U.C之线程池:ThreadPoolExecutor
池化技术的好处
1、降低资源消耗:可以重复利用已创建的线程降低线程创建和销毁造成的消耗。
2、提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。
3、提高线程的可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
线程池的类图为:
主要实现为ThreadPoolExecutor和ScheduledThreadPoolExecutor(定时线程池)
ThreadPoolExecutor
基本概念:
代码语言:javascript复制//线程池状态控制位,共32位,高3位表示线程池的状态(runState),低29位表示线程的个数(workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
//线程池主锁 创建线程时需要加锁
private final ReentrantLock mainLock = new ReentrantLock();
//线程持有集合
private final HashSet<Worker> workers = new HashSet<Worker>();
线程池的状态变化图如下:
创建一个线程池:
代码语言:javascript复制public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
参数含义:
corePoolSize
:线程池中核心线程的数量。当提交一个任务时,线程池会新建一个线程来执行任务,直到当前线程数等于corePoolSize。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。
maximumPoolSize
:线程池中允许的最大线程数。线程池的阻塞队列满了之后,如果还有任务提交,如果当前的线程数小于maximumPoolSize,则会新建线程来执行任务。注意,如果使用的是无界队列,该参数也就没有什么效果了。
keepAliveTime
:线程空闲的时间。线程的创建和销毁是需要代价的。线程执行完任务后不会立即销毁,而是继续存活一段时间:keepAliveTime。默认情况下,该参数只有在线程数大于corePoolSize时才会生效。
unit
:keepAliveTime的单位。TimeUnit。
workQueue
:用来保存等待执行的任务的阻塞队列,等待的任务必须实现Runnable接口。如:
- ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO。
- LinkedBlockingQueue:基于链表结构的有界阻塞队列,FIFO。
- SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作,反之亦然。
- PriorityBlockingQueue:具有优先界别的阻塞队列。
threadFactory
:用于设置创建线程的工厂。该对象可以通过Executors.defaultThreadFactory()创建
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-"
poolNumber.getAndIncrement()
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
ThreadFactory的左右就是提供创建线程的功能的线程工厂。他是通过newThread()方法提供创建线程的功能,newThread()方法创建的线程都是“非守护线程”而且“线程优先级都是Thread.NORM_PRIORITY”。
handler
:RejectedExecutionHandler,线程池的拒绝策略。所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。当向线程池中提交任务时,如果此时线程池中的线程已经饱和了,而且阻塞队列也已经满了,则线程池会选择一种拒绝策略来处理该任务。
线程池提供了四种拒绝策略:
AbortPolicy:直接抛出异常,默认策略;
CallerRunsPolicy:用调用者所在的线程来执行任务;
DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
DiscardPolicy:直接丢弃任务;
当然我们也可以实现自己的拒绝策略,例如记录日志等等,实现RejectedExecutionHandler接口即可。
线程池的工作流程如下:
线程池的执行方法:execute和submit
有无返回结果参考:异步Future机制
代码语言:javascript复制//可异步返回执行结果
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
//无返回结果
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//线程数小于核心线程数,则创建新的线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//线程数达到了核心线程数,则放入到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
//二次校验线程池状态
int recheck = ctl.get();
//非runnable,移除队列该任务
if (! isRunning(recheck) && remove(command))
reject(command);
//没有线程了,添加一个非核心线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//阻塞队列已满,添加非核心线程失败,就直接拒绝了
else if (!addWorker(command, false))
reject(command);
}
创建线程的过程addWorker
代码语言:javascript复制 private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取当前线程状态
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 内层循环,worker 1
for (;;) {
// 线程数量
int wc = workerCountOf(c);
// 如果当前线程数大于线程最大上限CAPACITY return false
// 若core == true,则与corePoolSize 比较,否则与maximumPoolSize ,大于 return false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// worker 1,成功跳出retry循环
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS add worker 失败,再次读取ctl
c = ctl.get();
// 如果状态不等于之前获取的state,跳出内层循环,继续去外层循环判断
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 新建线程:Worker
w = new Worker(firstTask);
// 当前线程
final Thread t = w.thread;
if (t != null) {
// 获取主锁:mainLock
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 线程状态
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN ==> 线程处于RUNNING状态
// 或者线程处于SHUTDOWN状态,且firstTask == null(可能是workQueue中仍有未执行完成的任务,创建没有初始任务的worker线程执行)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 当前线程已经启动,抛出异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一个HashSet<Worker>
workers.add(w);
// 设置最大的池大小largestPoolSize,workerAdded设置为true
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
// 启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 线程启动失败
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker与当前线程的绑定过程,也就是worker的创建过程
代码语言:javascript复制 private final class Worker extends AbstractQueuedSynchronizer
implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
// task 的thread
final Thread thread;
// 运行的任务task
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
//设置AQS的同步状态private volatile int state,是一个计数器,大于0代表锁已经被获取
setState(-1);
this.firstTask = firstTask;
// 利用ThreadFactory和 Worker这个Runnable创建的线程对象
this.thread = getThreadFactory().newThread(this);
}
// 任务执行
public void run() {
runWorker(this);
}
}
从Worker的源码中我们可以看到Woker继承AQS,实现Runnable接口,所以可以认为Worker既是一个可以执行的任务,也可以达到获取锁释放锁的效果。这里继承AQS可方便线程的中断处理。这里注意两个地方:构造函数、run()。构造函数主要是做三件事:1.设置同步状态state为-1,同步状态大于0表示就已经获取了锁,2.设置将当前任务task设置为firstTask,3.利用Worker本身对象this和ThreadFactory创建线程对象。
当线程thread启动(调用start()方法)时,其实就是执行Worker的run()方法,内部调用runWorker()。
代码语言:javascript复制 public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
// 当前线程
Thread wt = Thread.currentThread();
// 要执行的任务
Runnable task = w.firstTask;
w.firstTask = null;
// 释放锁,运行中断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//线程一直在尝试从队列中获取任务
while (task != null || (task = getTask()) != null) {
// worker 获取锁
w.lock();
// 确保只有当线程是stoping时,才会被设置为中断,否则清楚中断标示
// 如果线程池状态 >= STOP ,且当前线程没有设置中断状态,则wt.interrupt()
// 如果线程池状态 < STOP,但是线程已经中断了,再次判断线程池是否 >= STOP,如果是 wt.interrupt()
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 自定义方法,父类模板方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 自定义方法,父类模板方法
afterExecute(task, thrown);
}
} finally {
task = null;
// 完成任务数 1
w.completedTasks ;
// 释放锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask
代码语言:javascript复制private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// 线程池状态
int c = ctl.get();
int rs = runStateOf(c);
// 线程池中状态 >= STOP 或者 线程池状态 == SHUTDOWN且阻塞队列为空,则worker - 1,return null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 判断是否需要超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 从阻塞队列中获取task
// 如果需要超时控制,则调用poll(),否则调用take()
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
在runWorker()方法中,无论最终结果如何,都会执行processWorkerExit()方法对worker进行退出处理。
代码语言:javascript复制private void processWorkerExit(Worker w, boolean completedAbruptly) {
// true:用户线程运行异常,需要扣减
// false:getTask方法中扣减线程数量
if (completedAbruptly)
decrementWorkerCount();
// 获取主锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount = w.completedTasks;
// 从HashSet中移出worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 有worker线程移除,可能是最后一个线程退出需要尝试终止线程池
tryTerminate();
int c = ctl.get();
// 如果线程为running或shutdown状态,即tryTerminate()没有成功终止线程池,则判断是否有必要一个worker
if (runStateLessThan(c, STOP)) {
// 正常退出,计算min:需要维护的最小线程数量
if (!completedAbruptly) {
// allowCoreThreadTimeOut 默认false:是否需要维持核心线程的数量
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min ==0 或者workerQueue为空,min = 1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果线程数量大于最少数量min,直接返回,不需要新增线程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 添加一个没有firstTask的worker
addWorker(null, false);
}
}
尝试关闭线程池
代码语言:javascript复制 final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 线程池处于Running状态
// 线程池已经终止了
// 线程池处于ShutDown状态,但是阻塞队列不为空
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 执行到这里,就意味着线程池要么处于STOP状态,要么处于SHUTDOWN且阻塞队列为空
// 这时如果线程池中还存在线程,则会尝试中断线程
if (workerCountOf(c) != 0) {
// /线程池还有线程,但是队列没有任务了,需要中断唤醒等待任务的线程
// (runwoker的时候首先就通过w.unlock设置线程可中断,getTask最后面的catch处理中断)
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 尝试终止线程池
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
// 线程池状态转为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
在关闭线程池的过程中,如果线程池处于STOP状态或者处于SHUDOWN状态且阻塞队列为null,则线程池会调用interruptIdleWorkers()方法中断所有线程,注意ONLY_ONE== true,表示仅中断一个线程。
线程池ThreadPoolExecutor提供了shutdown()和shutDownNow()用于关闭线程池。
shutdown():按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。
shutdownNow() :尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。
总结:
线程池原理关键技术:锁(lock,cas)、阻塞队列、hashSet(资源池)
线程池类型:
代码语言:javascript复制public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
ScheduledThreadPoolExecutor
1.通过DelayedWorkQueue保证任务的排序
2.线程的执行run不断自检获得可执行的task
代码语言:javascript复制 public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
1.调用isPeriodic()获取该线程是否为周期性任务标志,然后调用canRunInCurrentRunState()方法判断该线程是否可以执行,如果不可以执行则调用cancel()取消任务。
2.如果当线程已经到达了执行点,则调用run()方法执行task,该run()方法是在FutureTask中定义的。
3.否则调用runAndReset()方法运行并充值,调用setNextRunTime()方法计算任务下次的执行时间,重新把任务添加到队列中,让该任务可以重复执行。