JDK中线程池满后再放入队列

2020-07-21 10:56:31 浏览数 (1)

    JDK中ThreadPoolExecutor有coreSize、maxSize,只有当线程数到coreSize且队列满后才会增加线程数到maxSize.

    想要达到的效果是线程数到maxSize后再放入队列。

方案一

    覆写ThreadPoolExecutor的execute()

List-1

代码语言:javascript复制
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //workerCountOf()获取线程数,线程worker数只是int类型二进制位的前几位
    int c = ctl.get();
    //1
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //2
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //3
    else if (!addWorker(command, false))
        reject(command);
}

线程数的表示,是用int类型的二进制的左边几位标示的,所以才需要调用workCountOf()方法来获取

  1. 如果线程数小于coreSize,那么增加worker线程
  2. 将任务放入到队列中,如果成功表示队列没有满
  3. 队列满了,此时尝试去增加worker线程数,让worker线程数达到maxSize

    如果我们自己定义一个CustomThreadPoolExecutor,然后覆写execute(),那么你会发现ctl、workerCountOf、addWorker都是private的,子类上根本访问不了,所以这个方案是不行的

方案二

    自己定义一个CustomThreadPoolExecutor,之后将JDK中ThreadPoolExecutor中的内容全部拷贝过来,之后再改写execute()的实现,但是这个成本很大。

    最重要的,连拒绝策略也不能使用JDK里面的了,因为如下List-2所示第二个参数是ThreadPoolExecutor

List-2

代码语言:javascript复制
public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
}

    此方案不行

方案三

  1. 自定义queue,改写offer逻辑
  2. 覆写ThreadPoolExecutor的execute()

List-3

代码语言:javascript复制
public class TaskQueue<R extends Runnable> extends ArrayBlockingQueue<Runnable> {
    private EagerThreadPoolExecutor executor;

    public TaskQueue(int size, boolean fair) {
        super(size, fair);
    }

    @Override
    public boolean offer(Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }
        int currentPoolThreadSize = executor.getPoolSize();
        // have free worker. put task into queue to let the worker deal with task.
        //1
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(runnable);
        }
        // return false to let executor create new worker.
        //2
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }
        // currentPoolThreadSize >= max
        //3
        return super.offer(runnable);
    }

    /**
     * retry offer task
     *
     * @param o task
     * @return offer success or not
     * @throws RejectedExecutionException if executor is terminated.
     */
    public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if (executor.isShutdown()) {
            throw new RejectedExecutionException("Executor is shutdown!");
        }
        return super.offer(o, timeout, unit);
    }

    public void setExecutor(EagerThreadPoolExecutor executor){
        this.executor = executor;
    }
}

TaskQueue的offer方法是,放入队列时被调用

  1. 如果当前提交的task数少于线程池中线程是数量,那么直接调用父类的offer,将task放入队列,不新建线程,因此此时肯定有空闲的线程
  2. 此时线程池中没有空闲的线程,而且线程数量少于设置的maxSize,此时返回false,让线程池去创建新的线程
  3. 此时线程数量大于等于maxSize,将task放入任务队列中

EagerThreadPoolExecutor继承ThreadPoolExecutor,改写execute()的实现:

List-4

代码语言:javascript复制
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    public EagerThreadPoolExecutor(int coreSize, int maxSize, long l, TimeUnit timeUnit, BlockingQueue<Runnable> blockingQueue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
        super(coreSize, maxSize, l, timeUnit, blockingQueue, threadFactory, rejectedExecutionHandler);
    }

    @Override
    protected void beforeExecute(Thread thread, Runnable runnable) {
        //可以结合afterExecute统计执行耗时
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        //1
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // retry to offer the task into queue.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                //2
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.", rx);
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            //3
            submittedTaskCount.decrementAndGet();
            throw t;
        }
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
        //ThreadPoolExecutor的勾子方法,在task执行完后需要将池中已提交的任务数 - 1
        //afterExecute和beforeExecute是在runWorker中调用,即使有异常,也不会抛出RejectedExecutionException异常
        submittedTaskCount.decrementAndGet();
        if (throwable!=null) {
            LOG.error("线程池中线程执行出错", throwable);
        }
    }

    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }
}
  1. execute()开始时,将提交的任务数加1,之后调用父类的execute()方法,如List-1中,当线程数达到coreSize后,就会调用queue.offer(),即List-3中的offer(),我们会判断线程数是否少于maxSize,如果少于那么返回false,之后ThreadPoolExecutor.execute()方法会去新增线程
  2. 2处如果被拒绝了,说明队列满了而且线程数达到了maxSize,此时我们再重试一次,将task放入队列中,为什么还要重试一次?List-3中offer中做了一些操作,有可能这期间队列就有空了,所以要重试下。
  3. 3处捕获到任何的异常,之后将task数减去1,3处的Throwable是捕获不到2处抛出的RejectedExecutionException的

    为什么afterExecute()方法中还要将task数减去1呢?

ThreadPoolExecutor中,beforeExecute()和afterExecute()是在runWorker的run()中被调用的,分别在Runnable.run()的前后被调用,而且线程池中抛出异常,在线程池外面是捕获不到的,所以外面需要的afterExecute()中将task数减去1

    改进:我们可以将List-4中使用的AtomicInteger改为JDK8的LongAddr以提高性能

Reference

  1. https://github.com/apache/dubbo/blob/fb5761683729f63c715e39c78a8b7b75278050c9/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutor.java

0 人点赞