JDK中ThreadPoolExecutor有coreSize、maxSize,只有当线程数到coreSize且队列满后才会增加线程数到maxSize.
想要达到的效果是线程数到maxSize后再放入队列。
方案一
覆写ThreadPoolExecutor的execute()
List-1
代码语言:javascript复制public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//workerCountOf()获取线程数,线程worker数只是int类型二进制位的前几位
int c = ctl.get();
//1
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3
else if (!addWorker(command, false))
reject(command);
}
线程数的表示,是用int类型的二进制的左边几位标示的,所以才需要调用workCountOf()方法来获取
- 如果线程数小于coreSize,那么增加worker线程
- 将任务放入到队列中,如果成功表示队列没有满
- 队列满了,此时尝试去增加worker线程数,让worker线程数达到maxSize
如果我们自己定义一个CustomThreadPoolExecutor,然后覆写execute(),那么你会发现ctl、workerCountOf、addWorker都是private的,子类上根本访问不了,所以这个方案是不行的
方案二
自己定义一个CustomThreadPoolExecutor,之后将JDK中ThreadPoolExecutor中的内容全部拷贝过来,之后再改写execute()的实现,但是这个成本很大。
最重要的,连拒绝策略也不能使用JDK里面的了,因为如下List-2所示第二个参数是ThreadPoolExecutor
List-2
代码语言:javascript复制public interface RejectedExecutionHandler {
void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
}
此方案不行
方案三
- 自定义queue,改写offer逻辑
- 覆写ThreadPoolExecutor的execute()
List-3
代码语言:javascript复制public class TaskQueue<R extends Runnable> extends ArrayBlockingQueue<Runnable> {
private EagerThreadPoolExecutor executor;
public TaskQueue(int size, boolean fair) {
super(size, fair);
}
@Override
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
int currentPoolThreadSize = executor.getPoolSize();
// have free worker. put task into queue to let the worker deal with task.
//1
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
// return false to let executor create new worker.
//2
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
// currentPoolThreadSize >= max
//3
return super.offer(runnable);
}
/**
* retry offer task
*
* @param o task
* @return offer success or not
* @throws RejectedExecutionException if executor is terminated.
*/
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
return super.offer(o, timeout, unit);
}
public void setExecutor(EagerThreadPoolExecutor executor){
this.executor = executor;
}
}
TaskQueue的offer方法是,放入队列时被调用
- 如果当前提交的task数少于线程池中线程是数量,那么直接调用父类的offer,将task放入队列,不新建线程,因此此时肯定有空闲的线程
- 此时线程池中没有空闲的线程,而且线程数量少于设置的maxSize,此时返回false,让线程池去创建新的线程
- 此时线程数量大于等于maxSize,将task放入任务队列中
EagerThreadPoolExecutor继承ThreadPoolExecutor,改写execute()的实现:
List-4
代码语言:javascript复制public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
public EagerThreadPoolExecutor(int coreSize, int maxSize, long l, TimeUnit timeUnit, BlockingQueue<Runnable> blockingQueue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
super(coreSize, maxSize, l, timeUnit, blockingQueue, threadFactory, rejectedExecutionHandler);
}
@Override
protected void beforeExecute(Thread thread, Runnable runnable) {
//可以结合afterExecute统计执行耗时
}
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// do not increment in method beforeExecute!
//1
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
// retry to offer the task into queue.
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
//2
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
//3
submittedTaskCount.decrementAndGet();
throw t;
}
}
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
//ThreadPoolExecutor的勾子方法,在task执行完后需要将池中已提交的任务数 - 1
//afterExecute和beforeExecute是在runWorker中调用,即使有异常,也不会抛出RejectedExecutionException异常
submittedTaskCount.decrementAndGet();
if (throwable!=null) {
LOG.error("线程池中线程执行出错", throwable);
}
}
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}
}
- execute()开始时,将提交的任务数加1,之后调用父类的execute()方法,如List-1中,当线程数达到coreSize后,就会调用queue.offer(),即List-3中的offer(),我们会判断线程数是否少于maxSize,如果少于那么返回false,之后ThreadPoolExecutor.execute()方法会去新增线程
- 2处如果被拒绝了,说明队列满了而且线程数达到了maxSize,此时我们再重试一次,将task放入队列中,为什么还要重试一次?List-3中offer中做了一些操作,有可能这期间队列就有空了,所以要重试下。
- 3处捕获到任何的异常,之后将task数减去1,3处的Throwable是捕获不到2处抛出的RejectedExecutionException的
为什么afterExecute()方法中还要将task数减去1呢?
ThreadPoolExecutor中,beforeExecute()和afterExecute()是在runWorker的run()中被调用的,分别在Runnable.run()的前后被调用,而且线程池中抛出异常,在线程池外面是捕获不到的,所以外面需要的afterExecute()中将task数减去1
改进:我们可以将List-4中使用的AtomicInteger改为JDK8的LongAddr以提高性能
Reference
- https://github.com/apache/dubbo/blob/fb5761683729f63c715e39c78a8b7b75278050c9/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutor.java