史上最全ThreadPoolExecutor梳理(下篇)

2020-08-20 15:07:56 浏览数 (1)

五、主流程

execute()方法

ThreadPoolExecutor的顶级父类是Executor接口,它只有一个方法就是execute(),我们也就是通过它来向线程池提交任务去执行的。

代码语言:javascript复制
// 提交一个任务
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 如果小于核心线程数,创建Worker,并启动里面的Thread
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // 如果处于RUNNING态,将任务放入队列成功
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 二次状态检查,非RUNNING态,从队列移除
        if (! isRunning(recheck) && remove(command))
            // 执行拒绝策略
            reject(command);
            // 如果线程池为0了,重新创建一个新的线程
            // 为什么会这样?因为设置allowCoreThreadTimeOut,核心线程因空闲全部回收了
        else if (workerCountOf(recheck) == 0)
            // 创建Worker,并启动里面的Thread,为什么传null,线程启动后会自动从阻塞队列拉任务执行
            addWorker(null, false);
    }
    // 尝试往最大线程数创建线程
    else if (!addWorker(command, false))
        reject(command);
}
  • 若当前线程数小于corePoolSize,则创建一个新的线程来执行任务
  • 若当前线程数大于等于corePoolSize,且阻塞队列未满,则将任务添加到队列中
  • 如果阻塞队列已满,但当前线程数小于maximumPoolSize,则创建一个“临时”线程来执行任务
  • 若当前线程数大于等于maximumPoolSize,且阻塞队列已满,此时会执行拒绝策略

注意点:

  • 在往队列中添加任务后会对线程池状态 double check,这是因为在并发情况下,从上次判断线程池状态到现在线程池可能会被关闭,由于线程池关闭后不能再继续添加任务了,此时就需要回滚刚才的添加任务到队列中的操作,并执行拒绝策略
  • addWorker(null, false),只是创建一个新的Thread,但是没有传入任务,这是因为前面已经将任务添加到队列中了

addWorker()方法

addWorker 方法主要是创建一个Thread并封装到Worker中。Worker实现了Runnable接口,本身也是一个线程任务。

代码语言:javascript复制
 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
    ....
        Worker(Runnable firstTask) {
            setState(-1); 
            this.firstTask = firstTask;
            // 用Worker自身任务做为入参,构造Thread
            this.thread = getThreadFactory().newThread(this);    
        }

        public void run() {
            runWorker(this);
        }
    .....
}

该方法接收两个参数firstTask和core,firstTask参数用于指定新增的线程执行的第一个任务,如果firstTask为空的话只创建线程。

core参数:

  • true,表示新增线程时,判断当前线程数是否少于corePoolSize
  • false,表示新增线程时,判断当前线程数是否少于maximumPoolSize
代码语言:javascript复制
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 如果队列为空,跳出
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 线程数
            int wc = workerCountOf(c);
            // 如果线程数超过限制,跳出
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // ctl 线程计数 1 。成功,跳出最外层的for循环 
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 计数失败,判断状态是否改变,如果改变,重新执行最外层的for循环 
            c = ctl.get();  
            if (runStateOf(c) != rs)
                continue retry;

        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 构建 Worker 对象
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {

                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 把Worker放入HashSet集合,后面关闭时,线程中断会用到
                    workers.add(w);
                    int s = workers.size();
                    // 记录,历史上曾经创建过的最大线程数
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 将Thread 启动起来
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 如果添加Worker失败,把Worker从HashSet集合移除,并对线程计数减1
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

重点:

Worker本身实现了Runnable接口,t.start() 这个语句启动时,会调用Worker类中的run方法。内部调用runWorker()方法,开限循环模式从阻塞队列中拉取任务来执行。

runWorker()方法

代码语言:javascript复制
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 如果有firstTask,先执行。否则从阻塞队列拉取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 二次检查,如果状态停止,确保线程是中断的
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 前置扩展
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 最最最核心,Runnable任务执行
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 后置扩展
                    afterExecute(task, thrown);
                }
            } finally {
                // task复位
                task = null;
                // Worker中已完成的任务计数 1
                w.completedTasks  ;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 清理 Worker
        processWorkerExit(w, completedAbruptly);
    }
}

分析源码可以得知runWorker方法的执行过程:

  • while循环不断地通过getTask()方法,从阻塞队列拉取任务;
  • 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
  • 前置扩展
  • 调用task.run()执行任务;
  • 后置扩展
  • 如果task为null,则跳出循环,执行processWorkerExit()方法;
  • runWorker方法执行完毕,代表着Worker中的run方法执行完毕,Runnable任务执行完毕,然后线程销毁

processWorkerExit()方法

代码语言:javascript复制
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 突然结束,completedAbruptly为true,会对线程计数减1。
    // 对于因没有任务而结束,completedAbruptly为false,getTask()方法中会执行减1操作
    if (completedAbruptly) 
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 将Worker中记录的已完成任务数,合并到ThreadPoolExecutor的全局字段中
        completedTaskCount  = w.completedTasks;
        // 把Worker从HashSet集合移除
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    // 二次检查,如果RUNNING或SHUTDOWN,非正常结束,需要重新创建线程,容错
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; 
        }
        addWorker(null, false);
    }
}

getTask()方法

getTask方法用于从阻塞队列中获取任务,源码如下

代码语言:javascript复制
private Runnable getTask() {
    boolean timedOut = false; 

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 如果rs >=STOP或者(rs 为 SHUTDOWN且队列为空)
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 线程计数减1
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 标记:大于核心线程数 或 允许核心线程被回收
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 线程计数减1
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 采用超时方式来获取任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                // 采用阻塞方式来获取任务
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
  • getTask方法首先对线程池状态进行判断,如果线程池为非RUNNING状态且满足以下条件,则将workerCount减1并返回null
    • 1、rs >= STOP
    • 2、rs 为 SHUTDOWN且队列为空。
    • 说明:当线程池状态为SHUTDOWN或以上时,不允许再往队列中添加任务。
  • timed变量用来判断是否进行超时控制
    • allowCoreThreadTimeOut默认是false,当线程数量降到corePoolSize时,会采用阻塞方式从队列拉取任务
    • 其它情况,采用超时方式来获取任务
      • 如果达到keepAliveTime最大空闲时间,仍拿不到任务,线程计数减1,返回null
    • 如果设置allowCoreThreadTimeOut为true,空闲时,线程池数最小可能会为0

advanceRunState()方法

更改线程池状态

代码语言:javascript复制
private void advanceRunState(int targetState) {
    for (;;) {
        int c = ctl.get();
        // 如果当前状态在目标状态之后
        if (runStateAtLeast(c, targetState) ||
            // 将目标状态 线程数,合成一个字段
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}
  • 如果当前状态在目标状态之后,跳出循环,不做任何处理
  • 否则,将目标状态 线程数,合成一个字段,更新到ctl

六、线程池关闭

关闭线程池。他们的原理是遍历线程池的工作线程,然后逐个调用线程的interrupt方法来中断线程。

shutdown()方法

代码语言:javascript复制
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 标记线程池状态为 SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 把所有空闲线程中断
        interruptIdleWorkers();
        // 空实现,为ScheduledThreadPoolExecutor预留的扩展
        onShutdown(); 
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

shutdownNow()方法

代码语言:javascript复制
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 标记线程池状态为 STOP
        advanceRunState(STOP);
        // 把所有线程中断
        interruptWorkers();
        // 把阻塞队列中的所有任务提取到List集合中,并返回
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

区别:

  • shutdown方法,不再接收新的任务,已提交的任务会执行完
  • shutdownNow方法,比较粗暴,它将尝试中断所有运行中的任务,并且不再启动队列中尚未开始执行的任务。

只要调用了这两个关闭方法中的任意一个,isShutdown()方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminated()方法会返回true。

采用哪一种方法来关闭线程池,由业务特性决定,大部分是采用shutdown()方法来关闭线程池。如果任务不强求一定要执行完,可以调用shutdownNow()方法。

七、扩展

ThreadPoolExecutor提供扩展方法:通过继承ThreadPoolExecutor重写beforeExecute、afterExecute、terminated方法。在执行任务的线程中将调用beforeExecute和afterExecute等方法,在这些方法中还可以添加日志、计时、监视或者统计信息收集的功能。

代码语言:javascript复制
public class MonitorableThreadPoolExecutor extends ThreadPoolExecutor {

private ThreadLocal<Context> taskExecutionTimer = new ThreadLocal<>();

。。。

  @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        Context context = new Context();
        context.setStartTime(System.currentTimeMillis());
        taskExecutionTimer.set(context);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        Context context = taskExecutionTimer.get();
        long start;
        if (context != null) {
            start = context.getStartTime();
        } else {
            start = System.currentTimeMillis();
        }
        long cost = System.currentTimeMillis() - start;
        RedAlertLogUtils.threadPoolMonitorLog(name, getCorePoolSize(), getActiveCount(), getMaximumPoolSize(), getQueue().size(), cost);
    }
。。。。

}

八、Executors 工具类方法

•newFixedThreadPool, 有固定长度(nThreads)的线程数组,忙不过来时会把任务放到无限长的队列里,这是因为LinkedBlockingQueue 默认是一个无界队列。

代码语言:javascript复制
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                 new LinkedBlockingQueue<Runnable>());
}

•newSingleThreadExecutor,创建单线程的线程池。队列长度为无限

代码语言:javascript复制
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

•newCachedThreadPool 的 maximumPoolSize 参数值是Integer.MAX_VALUE ,因此它对线程个数不做限制,忙不过来时无限创建临时线程,闲下来时再回收。它的任务队列是SynchronousQueue,表明队列长度为 0。

代码语言:javascript复制
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

•newScheduledThreadPool,创建核心线程数为corePoolSize的延时任务线程池

代码语言:javascript复制
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

•newSingleThreadScheduledExecutor,创建核心线程数为1的延时任务线程池

代码语言:javascript复制
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

0 人点赞