问题回顾
最近由于业务需求使用到了线程池对数据进行异步处理,上线后系统正常运行了两天多突然收到了一波Full GC的告警,赶紧dump了堆信息并回滚了服务。分析dump文件后发现了一个LinkedBlockingQueue类型的大对象,就想到是上次改的线程池的问题了,因为对线程池使用的不熟悉,导致了线上问题。当时错误的写法如下:
代码语言:javascript复制private ThreadPoolExecutor executorService =
new ThreadPoolExecutor(0, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQeque<>());
上述代码核心线程数设置为0,默认会创建一个线程进行任务的处理,但是BlockingQueue使用的是LinkedBlockingQeque是一个无界的队列。
代码语言:javascript复制public LinkedBlockingQeque() {
this(Integer.MAX_VALUE);
}
当核心线程数满了的时候后续的任务会优先插入队列中,只有当队列满了才会在最大线程数的范围内新增线程,然而因为是无界队列,所以此时设置的最大线程数就无效了。
这个出问题的业务,由于需要异步执行的任务耗时比较久而且任务量较大,只有一个线程根本消费不完,队列就持续地在增长,最终使得BlockingQueue成了一个大对象导致频繁的Full GC。
解决方案
重新调整了线程池的核心线程数与最大线程数,并将无界队列改为了有界队列防止大对象的生成。
代码语言:javascript复制private ThreadPoolExecutor executorService =
new ThreadPoolExecutor(4, 16, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));
反思-线程池深入理解
在处理好这个bug后,赶紧借这个机会完整地学习一下线程池原理与使用。
ThreadPoolExecutor的核心参数解释
ThreadPoolExecutor是线程池中最核心的一个类,我们详细地来了解一下这个类的实现。下述代码基于jdk1.8
构造方法
ThreadPoolExecutor继承自AbstractExecutorService,并提供了四个构造方法,不过其余几个都是下面这个的重载且最终都会调用这个构造方法。
代码语言:javascript复制public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
核心参数
•corePoolSize:核心线程数,默认情况下在创建线程池后,线程池中并没有线程,直到到来时才会创建新线程,不过可以通过调用prestartCoreThread()或者prestartAllCoreThread()方法来进行初始化。•maximumPoolSize:最大线程数,即线程池中最多能创建的线程•keepAliveTime:存活时间,默认情况下,线程池中的线程数大于corePoolSize时,线程空闲时间达到keepAliveTime则会终止,直到线程池中的线程数不超过corePoolSize,如果线程池设置了allowCoreThreadTimeOut的话,线程池中的线程数不大于corePoolSize时keepAliveTime也会生效,直到线程池中的线程数为0。•unit:keepAliveTime的时间单位。•workQueue:保存等待执行的任务的阻塞队列,当线程池中的线程数大于corePoolSize时,会把待执行的任务封装成Worker对象放入队列。不同的BlockingQueue决定了线程池的排队策略。 a)SynchronousQueue 并不是一个真正的队列,而是一种在线程之间进行移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接受这个元素。如果没有线程正在等待,并且线程池的当前大小小于最大值,那么ThreadPoolExecutor将会创建一个新的线程,否则会根据饱和策略拒绝掉这个任务。 b)LinkedBlockingQueue 基于链表的FIFO队列,默认队列大小为Integer.MAX_VALUE,因此是无界队列。当活跃线程等于corePoolSize时,新添加的任务都会被放入队列等待,因此maximumPoolSize就无效了。无界队列可以用来处理瞬时的高并发情况。
c)ArrayBlockingQueue 基于数组的FIFO队列,创建时必须指定大小。使用有界队列有助于防止资源耗尽。
•threadFactory:线程工厂,每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。默认的线程工厂方法将会创建一个新的、非守护的线程,并且不包含特殊的配置信息。在ThreadFactory中只定义了一个方法newThread,每当线程池需要创建一个新线程时都会调用这个方法。通常情况下我们都需要去使用自定义的线程工厂方法,自定义的线程工厂可以提供如下功能:
a)为线程取一个有意义的名字,便于后续排查问题。
b)为线程指定UncaughtExceptionHandler来处理线程执行过程中未被捕获的异常。
c)修改线程优先级或者守护状态。
•handler:它是RejectedExecutionHandler类型的变量,表示线程池的拒绝策略。如果阻塞队列满了且线程数达到了maximumPoolSize,此时继续提交任务就会触发线程池的拒绝策略,JUC提供了四种不同的策略
a)CallerRunsPolicy 由调用方线程执行b)AbortPolicy 直接丢出一个RejectedExecutionException,默认策略c)DiscardPolicy 直接丢弃当前任务d)DiscardOldestPolicy 利用FIFO的特性,丢弃队列中最靠前的任务,并再次尝试执行execute
在构造方法中还对入参做了一些校验,校验失败则直接抛出异常。
ThreadPoolExecutor的几个重要字段
代码语言:javascript复制private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
ctl是线程池的核心状态控制字段,本身是一个AtomicInteger,用来保证对ctl的操作都是线程安全的。这里利用位运算巧妙地将一个int(一个int 4个字节 即32位)拆成了两部分,高3位用来表示线程的状态,剩下的29位则表示工作线程数。这里就可以得知工作线程的数量上限即CAPACITY,大约有5亿。
这五种状态转换成二进制后如下所示:
• RUNNING: 0b11100000_00000000_00000000_00000000
能接受新提交的任务,并且也能处理阻塞队列中的任务
• SHUTDOWN: 0b00000000_00000000_00000000_00000000 关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态)
• STOP: 0b00100000_00000000_00000000_00000000
不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态
• TIDYING: 0b01000000_00000000_00000000_00000000
如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态
• TERMINATED: 0b01100000_00000000_00000000_00000000
在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。
这五种状态中,只有RUNNING的最高位是1,为负数,所以只需要判断ctl是否大于0就能得知线程是否处于该状态。
代码语言:javascript复制 private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
至于为什么ThreadPoolExecutor要利用位运算用一个AtomicInteger来表示状态跟工作线程数量,是为了在多线程环境下保证运行状态与线程数量的统一,利用AtomicInteger的原子性操作保证一致性
ThreadPoolExecutor提供了两个方法通过位运算来判断状态与工作线程数
代码语言:javascript复制private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
ThreadPoolExecutor执行流程分析
execute方法
ThreadPoolExecutor的顶级父类是Executor接口,它只有一个方法就是execute(),我们也就是通过它来向线程池提交任务去执行的。
代码语言:javascript复制 public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
通过源码分析可以得知,当向线程池提交任务时处理流程如下
• 若当前线程数小于corePoolSize,则创建一个新的线程来执行任务
• 若当前线程数大于等于corePoolSize,且阻塞队列未满,则将任务添加到队列中
• 若当前线程数大于等于corePoolSize且小于maximumPoolSize,同时阻塞队列已满,则创建一个“临时”线程来执行任务 • 若当前线程数大于等于maximumPoolSize,且阻塞队列已满,此时就会执行拒绝策略
这里有两点需要注意的是
1.在往队列中添加任务后会对线程池进行double check,这是因为在并发情况下,从上次判断线程池状态到现在线程池可能会被关闭,由于线程池关闭后不能再继续添加任务了,此时就需要回滚刚才的添加任务到队列中的操作并通过拒绝策略拒绝该任务。2.addWorker(null, false),这个方法执行时只是创建了一个新的线程,但是没有传入任务,这是因为前面已经将任务添加到队列中了
addWorker方法
addWorker方法主要是在线程池中创建一个线程并封装成Worker对象来执行任务。该方法接收两个参数firstTask和core,firstTask参数用于指定新增的线程执行的第一个任务,如果firstTask为空的话只创建线程,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize
代码语言:javascript复制 private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
注意一下这里的t.start()
这个语句,启动时会调用Worker类中的run方法,Worker本身实现了Runnable接口,所以一个Worker类型的对象也是一个线程。
runWorker方法
上面提到了调用addWorker方法的时候如果添加成功的话就会调用Worker的run方法
代码语言:javascript复制class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
...
public void run() {
runWorker(this);
}
...
}
Worker对象实现了Runnable,所以线程启动的时候执行run方法并最终调用runWorker来执行任务,下面看一下runWorker的源码
代码语言:javascript复制 final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks ;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker方法是工作线程的核心循环,分析源码可以得知runWorker方法的执行过程:
1.while循环不断地通过getTask()方法获取任务;2.getTask()方法从阻塞队列中取任务;3.如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;4.调用task.run()
执行任务;5.如果task为null则跳出循环,执行processWorkerExit()方法;6.runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。
ThreadPoolExecutor拓展方法
这里有两个方法的实现是空的
代码语言:javascript复制protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
主要是用于ThreadPoolExecutor的子类自己实现一些自定义功能,比如线程监控,ThreadPoolExecutor提供了以下方法可以用来监控线程状态
代码语言:javascript复制long getTaskCount()//线程池已经执行的和未执行的任务总数;
long getCompletedTaskCount()//线程池已完成的任务数量,该值小于等于taskCount;
int getLargestPoolSize()//线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize;
int getPoolSize()//线程池当前的线程数量;
int getActiveCount()//当前线程池中正在执行任务的线程数量。
getTask方法
getTask方法用于从阻塞队列中获取任务,源码如下
代码语言:javascript复制 private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask方法首先对线程池状态进行判断,如果线程池为非RUNNING状态且满足以下条件
1.rs >= STOP,线程池是否正在stop2.阻塞队列是否为空
则将workerCount减1并返回null,这是因为当线程池状态为SHUTDOWN或以上时,不允许再往队列中添加任务。
timed变量用来判断是否进行超时控制,allowCoreThreadTimeOut默认是false即核心线程不会因为超时被回收。
代码语言:javascript复制try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
这部分代码为获取任务的核心逻辑,当timed为true时通过poll进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null,否则会通过take方法阻塞获取任务。
总结
在这次线上事故的教训下,深入了解了一下线程池的工作流程,从线程池的核心参数到执行流程分析,对今后能够正确地使用线程池有很大的帮助,希望今后不要再犯这种低级错误,对于不太理解原理的功能不能太想当然。