Java线程池

2021-04-23 13:02:31 浏览数 (1)

摘要

  1. 线程池任务执行机制
  2. 任务调度
  3. 任务缓冲
  4. 任务申请
  5. 拒绝策略
  6. Worker线程为什么要采用AQS实现
  7. Worker线程初始化
  8. Worker线程如何工作
  9. Worker线程获取任务
  10. Worker线程如何销毁
  11. 线程池关闭

1. 线程池任务执行机制

作为一个开发初始化线程池通常会使用Executors类,然后调用newFixedThreadPool或者其他方法来初始化一个线程池,方法如下:

代码语言:javascript复制
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

Executors中其实最终是初始了ThreadPoolExecutor类,上一篇Java线程池前传

已经讲了ThreadPoolExecutor线程池的实现类。

代码语言:javascript复制
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

ThreadPoolExecutor的构造方法中需要指定一些参数,并且这些参数会被线程池的一些属性所使用,这些我们会在后续的剖析线程池中都会提到。

1.1 任务调度

任务调度是整个线程池的入口,当客户端提交了一个任务以后便进入整个阶段,整个任务的调度过程由execute方法完成,如下:

代码语言:javascript复制
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        // 1 & 2. addWorker方法会检查线程池是否是RUNNING状态
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 1 & 3
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

上述代码的执行过程大致如下:

  1. 首先检查线程池是否是RUNNING状态,如果不是RUNNING状态,则拒绝任务
  2. 如果工作线程数(workerCount)小于核心线程数(corePoolSize),直接开启新的线程去执行任务
  3. 如果工作线程数(workerCount)大于等于核心线程数 (corePoolSize),并且阻塞任务队列为满时,将任务放入阻塞队列
  4. 如果工作线程数(workerCount)大于等于核心线程数 (corePoolSize),并且阻塞任务队列已满,但工作线程数小于最大线程数(maximumPoolSize),则创建并启动一个新的线程来执行
  5. 如果工作线程数(workerCount)大于等于最大线程数(maximumPoolSize),并且阻塞任务队列已满,则执行具体的RejectedExecutionHandler策略。

其中workerCountOf(recheck) == 0这一步也很关键,这一步主要是为了确保线程池中至少有一个线程去执行任务。

在上述流程中我们提到了阻塞任务队列(用于任务缓冲)、addWorker方法(任务申请)、以及reject方法(任务拒绝策略),下面我们再来分析一下这三个关键点。

1.2 任务缓冲

线程池的本质是对线程和任务的管理,为了做到这一点必须要将线程和任务解耦,不再直接关联,通过缓冲队列恰好可以解决这一点。线程池中的缓冲队列类似于生产者消费者模式,客户端线程往缓冲队列里提交任务,线程池中的线程则从缓冲队列中获得任务去执行。

目前Java线程中的默认缓冲队列是阻塞队列模式,主要有以下几种,这些缓冲队列必须要实现BlockingQueue接口:

队列

描述

ArrayBlockingQueue

使用数组实现的有界阻塞队列,先进先出,支持公平锁和非公平锁

LinkedBlockingQueue

使用链表实现的有界阻塞队列,先进先出,默认链表长度Integer.MAX_VALUE

PriorityBlockingQueue

一个使用数组实现支持线程优先级排序的无界队列,默认自然排序,也可以自定义实现Comparator来进行排序

DelayQueue

一个采用PriorityBlockingQueue实现的延迟队列(组合的方式),在创建该对象中,可以指定任务添加至队列后才能被获取

SynchronousQueue

一个不存储元素的阻塞队列,每一个任务入队操作必须等待一个任务的出队,否则不能添加新的任务

LinkedTransferQueue

一个使用链表实现的无解阻塞队列,该队列支持将任务立即传递给消费者

LinkedBlockingDeque

一个由双向链表实现的有界阻塞队列,队头队尾都可以添加任务消费任务

1.3 任务拒绝

当工作线程数(workerCount)大于等于最大线程数(maximumPoolSize),并且阻塞任务队列已满,线程池会执行具体的RejectedExecutionHandler策略。目前Java默认的拒绝策略主要有以下几种:

策略

描述

AbortPolicy

丢弃任务并抛出RejectedExecutionException异常

DiscardPolicy

直接丢弃任务

DiscardOldestPolicy

丢弃阻塞队列队头的任务,并重新提交被拒绝的任务

CallerRunsPolicy

直接由调用线程处理被拒绝的任务

1.4 任务申请

在工作线程池数未达到最大线程数并且阻塞队列未满时,我们可以将任务提交至线程池(有可能是开启新的线程,也有可能是将任务提交至阻塞队列)等待执行。其中addWorker方法便是开启新的线程执行任务。下面我们来看一下addWorker方法:

代码语言:javascript复制
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 线程池如果不是运行状态,线程池根据以下条件来决定是否增加Work线程
        // 1. 如果线程池不是SHUTDOWN状态,那么不允许在增加任何线程,返回false
        // 2. 如果线程池是SHUTDOWN状态(不允许接受新的任务),如果firstTask不为空表明是新的任务,不应该接受,所以返回false
        // 3. 如果线程池是SHUTDOWN状态,并且队列已空,此时也不需要增加线程所以返回false
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 获取工作线程的数量
            int wc = workerCountOf(c);
            // 工作线程数与线程池容量比较,超过不允许增加线程
            // core为true,工作与核心线程数比较,超过不允许增加线程
            // core为false,工作与最大线程数比较,超过不允许增加线程
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // CAS增加工作线程数,增加成功跳出retry循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // CAS增加工作线程数失败,则重新获取ctl的值
            c = ctl.get();  // Re-read ctl
            // 判断线程池状态是否改变,如果线程池状态已经改变,则重新执行retry循环,否则执行内部循环,尝试增加线程数
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 根据firstTask来创建Worker对象
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 重新获取线程池的状态,防止在mainLock的lock方法执行之前,线程池状态改变
                int rs = runStateOf(ctl.get());

                // 只有线程池是运行状态或者线程池是shutdown并且任务是来自阻塞队列中(firstTask==null)才可以向线程池中增加线程
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 增加worker,workers是一个HashSet
                    workers.add(w);
                    // 更新largestPoolSize,largestPoolSize代表了线程池中曾经出现过的最大线程数
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 如果worker创建或启动失败,修正线程池中的worker和ctl值
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

上述代码的核心逻辑就是根据线程池当前状态来决定是否开启新的线程来执行任务,线程具体的实现方式是采用一个Worker类来进行封装。

2. Worker线程管理

Worker实现了Runnable接口,并继承了AbstractQueuedSynchronizer(AQS)。

不熟悉AQS的读者可以戳这里

2.1 Worker线程的基本属性

代码语言:javascript复制
final Thread thread;

Runnable firstTask;

volatile long completedTasks;

Worker中存储了真实的线程(Thread)、该线程需要执行的第一个任务(firstTask)以及线程执行的任务数(completedTasks)。

2.2 Worker线程为什么要采用AQS实现

代码语言:javascript复制
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    // 使用ThreadFactory创建线程
    this.thread = getThreadFactory().newThread(this);
}


protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
}

public void lock()        { acquire(1); }
public boolean tryLock()  { return tryAcquire(1); }
public void unlock()      { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

Worker线程采用AQS实现,使用AQS的独占锁功能,通过其tryAcquire方法可以看出Worker线程是不允许重入的,Worker线程有以下特点:

  • 通过lock方法成功获取锁以后,则表示Worker线程正在执行任务
  • 如果正在执行任务,则不应该中断线程
  • 如果该线程现在不是独占锁状态(也就是空闲状态),说明该线程没有任务处理,可以对线程进行中断

构造方法中为什么要执行setState(-1)方法 ?

setState是AQS中的方法,默认值为0,tryAcquire方法是根据state是否是0来判断的,所以将state设置为-1是为了禁止在执行任务前对线程进行中断,不明白的读者可以看一下AQS的acquire(int arg)方法,如下:

代码语言:javascript复制
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

构造方法中的getThreadFactory().newThread(this)作用是什么?

ThreadFactory是在我们构造ThreadPoolExecutor时传入的,通过ThreadFactory我们可以设置线程的分组、线程的名字、线程的优先级、以及线程是否是daemon线程等相关信息。

2.3 Worker线程工作

代码语言:javascript复制
public void run() {
    runWorker(this);
}

Worker线程获取任务工作是通过调用ThreadPoolExecutor中的runWorker方法,该方法的参数是Worker本身,下面我们看一下Worker线程的具体工作原理:

代码语言:javascript复制
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 将AQS中的state修改为0
    w.unlock(); // allow interrupts
    // 线程在执行任务中是否异常
    boolean completedAbruptly = true;
    try {
        // 获取立即执行的任务,或者从阻塞队列中获取
        while (task != null || (task = getTask()) != null) {
            // 获取独占锁
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 任务执行前的操作
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 任务执行
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 任务执行后的操作
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks  ;
                // 释放独占锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

线程工作的大致流程是:

  1. 通过getTask方法获取任务执行
  2. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
  3. 通过task.run()方法执行任务
  4. 如果阻塞队列中没有任务,则跳出循环执行processWorkerExit方法
  5. runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。

这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。

completedAbruptly变量来表示在执行任务过程中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。

此部分代码的流程图如下:

2.4 Worker线程获取任务(getTask方法)

代码语言:javascript复制
private Runnable getTask() {
    // timeOut变量的值表示上次从阻塞队列中取任务时是否超时
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        /*
         * 如果线程池状态是非RUNNING状态,需要进行以下判断:
         * 1. rs >= STOP,线程池是否正在stop;
         * 2. 阻塞队列是否为空。
         * 如果以上条件有一个满足,则将workerCount减1并返回null。
         * 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // timed变量用于判断是否需要进行超时控制。
        // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
        // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
        // 对于超过核心线程数量的这些线程,需要进行超时控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        /*
         * 重点
         * wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
         * timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
         * 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
         * 如果减1失败,则返回重试。
         * 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
         */
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            /*
             * 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null;
             * 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。
             * 
             */
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 如果 r == null,说明已经超时,timedOut设置为true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
            timedOut = false;
        }
    }
}

这里重要的地方是第二个if判断,目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。

什么时候会销毁?runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。

getTask方法返回null时,在runWorker方法中会跳出while循环,然后会执行processWorkerExit方法。

获取任务的流程图如下:

2.5 Worker退出(processWorkerExit方法)

代码语言:javascript复制
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果是Worker线程正常结束,工作数量-1
    if (completedAbruptly)
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 线程池的任务完成数量增加该Worker线程的任务完成数量
        completedTaskCount  = w.completedTasks;
        // 从线程池维护的线程中移除该线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    
    // 根据线程池状态进行判断是否结束线程池
    tryTerminate();

    int c = ctl.get();
    /*
     * 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;
     * 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
     * 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
     */
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

3. 线程池关闭(shutdown)

代码语言:javascript复制
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

在runWorker方法中,执行任务时对Worker对象w进行了lock操作,为什么要在执行任务的时候对每个工作线程都加锁(lock)呢?

  • 在getTask方法中,如果这时线程池的状态是SHUTDOWN并且workQueue为空,那么就应该返回null来结束这个工作线程,而使线程池进入SHUTDOWN状态需要调用shutdown方法;
  • shutdown方法会调用interruptIdleWorkers来中断空闲的线程,interruptIdleWorkers持有mainLock,会遍历workers来逐个判断工作线程是否空闲。但getTask方法中没有mainLock;
  • 在getTask中,如果判断当前线程池状态是RUNNING,并且阻塞队列为空,那么会调用workQueue.take()进行阻塞;
  • 如果在判断当前线程池状态是RUNNING后,这时调用了shutdown方法把状态改为了SHUTDOWN,这时如果不进行中断,那么当前的工作线程在调用了workQueue.take()后会一直阻塞而不会被销毁,因为在SHUTDOWN状态下不允许再有新的任务添加到workQueue中,这样一来线程池永远都关闭不了了;

由上可知,shutdown方法与getTask方法(从队列中获取任务时)存在竞态条件。

  • 解决这一问题就需要用到线程的中断,也就是为什么要用interruptIdleWorkers方法。在调用workQueue.take()时,如果发现当前线程在执行之前或者执行期间是中断状态,则会抛出InterruptedException,解除阻塞的状态;
  • 但是要中断工作线程,还要判断工作线程是否是空闲的,如果工作线程正在处理任务,就不应该发生中断;

所以Worker继承自AQS,在工作线程处理任务时会进行lock,interruptIdleWorkers在进行中断时会使用tryLock来判断该工作线程是否正在处理任务,如果tryLock返回true,说明该工作线程当前未执行任务,这时才可以被中断。

本期的Java 线程池介绍到这,我是shysh95,我们下期再见!!!

0 人点赞