Java中关于线程池的几道面试题

2023-10-18 16:30:51 浏览数 (1)

Java中关于线程池的几道面试题

一、介绍

以前就讲过线程池的使用,本文中介绍深挖线程池中的几道面试题

Java线程池 | 半月无霜 (banmoon.top)

在上面可以找到

  • 线程池的核心参数都有什么,代表什么含义?
  • 线程池的拒绝策略有哪些?

二、线程池

1)任务添加流程

当一个线程池在添加一个任务时,它是怎么分配线程去执行这个任务的

代码语言:javascript复制
public class ThreadPoolExecutor extends AbstractExecutorService {
    
    public void execute(Runnable command) {
        // 判断是否为空
        if (command == null)
            throw new NullPointerException();
        
        // 判断当前正在运行的线程数是否小于核心线程数
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            // 添加任务至线程执行,成功添加则结束
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果核心线程都有在运行,将任务放至队列中
        if (isRunning(c) && workQueue.offer(command)) {
            // 如果成功推入队列,将再次检查线程状态,有线程死亡则将当前任务添加至线程执行
            int recheck = ctl.get();
           	// 检查线程状态是不是RUNNING,如果不是将会拒绝此任务
            if (!isRunning(recheck) && remove(command))
                reject(command);
            // 检查当前的工作线程数是否为0
            else if (workerCountOf(recheck) == 0)
                // 添加一个null的任务
                addWorker(null, false);
        }
        // 如果队列推入任务失败了,那将直接添加至线程执行
        else if (!addWorker(command, false))
            // 如果任务添加至线程失败,则将进行拒绝策略
            reject(command);
    }
    
    /**
     * 会从线程工厂获取线程,并添加执行任务
     * @param firstTask 执行的任务
     * @param core 是否可以添加至核心线程
     * @return true:成功添加至线程执行
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        // ...
    }
}

2)线程池的状态有哪些

线程池的状态有哪些,状态是如何进行转换的?

注意是在提问线程池的状态,而不是线程的状态


这是ThreadPoolExecutor.java中的源码

代码语言:javascript复制
   /**
    * 主池控制状态ctl是一个原子整数,包含两个概念性字段:
    * workerCount表示实际线程数,runState表示是运行中、正在关闭等状态。
    * 为了将它们打包成一个整数,我们将workerCount限制为(2^29)-1(大约5亿)个线程,而不是(2^31)-1(可表示20亿)。
    * 如果将来出现了问题,该变量可以改为AtomicLong,并且下面的移位/掩码常量需要调整。但在需要之前,使用int类型会更快,更简单。
    * 
    * workerCount是已被允许启动且未被允许停止的工作线程数。
    * 该值暂时可能与实际的活动线程数不同,例如当ThreadFactory无法按要求创建线程时,或者退出线程在终止之前仍在执行簿记操作。用户可见的池大小报告为工作线程集合的当前大小。
    * 
    * runState提供了主要的生命周期控制,接受以下值:
    * RUNNING:接受新任务并处理队列中的任务;
    * SHUTDOWN:不接受新任务,但处理队列中的任务;
    * STOP:不接受新任务,不处理队列中的任务,并中断正在处理的任务;
    * TIDYING:所有任务都已终止,workerCount为零,转换到TIDYING状态的线程将运行terminated()钩子方法;
    * TERMINATED:terminated()已完成。这些值之间的数值顺序很重要,才能允许有序比较。
    * 
    * runState随时间单调递增,但不一定达到每个状态。转换如下:
    * RUNNING->SHUTDOWN:调用shutdown()时,可能是隐式的(RUNNING或SHUTDOWN状态);
    * RUNNING或SHUTDOWN->STOP:调用shutdownNow()时;
    * SHUTDOWN->TIDYING:当队列和池都为空时;
    * STOP->TIDYING:当池为空时;
    * TIDYING -> TERMINATED:当terminated()钩子方法完成时。等待在awaitTermination()中的线程将在状态到达TERMINATED时返回。
    * 由于在SHUTDOWN状态下队列可能在非空和空之间变化,因此检测从SHUTDOWN到TIDYING的转换不是很直观,但我们只有在看到它为空时,才能看到workerCount为0而终止(有时需要重新检查-见下文)。
    */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
   private static final int COUNT_BITS = Integer.SIZE - 3;
   private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState的状态,RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
   private static final int RUNNING    = -1 << COUNT_BITS;
   private static final int SHUTDOWN   =  0 << COUNT_BITS;
   private static final int STOP       =  1 << COUNT_BITS;
   private static final int TIDYING    =  2 << COUNT_BITS;
   private static final int TERMINATED =  3 << COUNT_BITS;

// 运行状态
private static int runStateOf(int c) { 
       return c & ~CAPACITY;
   }
// 实际线程数
   private static int workerCountOf(int c) { 
   	return c & CAPACITY;
   }
// ctl控制数
private static int ctlOf(int rs, int wc) { 
       return rs | wc; 
   }

由上面源码可知,线程的状态一共有5

  1. RUNNING:运行状态,线程池正在接受、处理任务
  2. SHUTDOWN:当RUNNING状态调用shutdown()方法时,进入此状态;
代码语言:txt复制
1. 不再接受新的任务
2. 正在运行中的任务和队列中的任务会等待其执行完毕
代码语言:txt复制
1. 不再接受新的任务
2. 中断运行中的任务,销毁队列中的任务
  1. TERMINATED:由TIDYING状态进入,terminated()方法执行完毕

如果是画图的话,是下面这个样子的


terminated()方法默认什么都不做,线程池提供这个方法,交给子类来进行扩展

代码语言:javascript复制
protected void terminated() { }

3)线程池如何去执行任务的

添加任务的流程我们已经讲述完毕,那么线程池是如何分配线程去执行任务的呢?

在第一节中,有一段...addWorker()方法,这里面就是执行任务的逻辑

代码语言:javascript复制
  private boolean addWorker(Runnable firstTask, boolean core) {
      // 外部循环标识retry
      retry:
      for (;;) {
          // 当前的状态值ctl及runState,第二节有讲过
          int c = ctl.get();
          int rs = runStateOf(c);

          // 检查运行状态是不是大于等于SHUTDOWN
          if (rs >= SHUTDOWN &&
              // 且 (运行状态不等于SHUTDOWN 或 任务不为空 或 队列中是否有值)
              !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
              return false;
          // 上面这段,主要就是为了检测线程池的状态,队列中是否有任务
          // 如果线程处于RUNNING,就会跳过此处的return false
          // 如果处于SHUTDOWN,还要额外判断当前任务是否为有值,有值也会return false
          // 如果处于SHUTDOWN,且当前任务为null,还要判断当前队列是否有值;队列中没有值的话,也会return false

          // 内部循环
          for (;;) {
              // 当前运行的线程数
              int wc = workerCountOf(c);
              // 判断运行的线程数是否大于(容量最大),根据是否核心,判断是否大于核心线程数 或者 最大线程数
              if (wc >= CAPACITY ||
                  wc >= (core ? corePoolSize : maximumPoolSize))
                  // 返回false
                  return false;
              // 使用CAS自旋锁,添加ctl的运行线程数
              // 成功添加则会,跳出外层的循环
              if (compareAndIncrementWorkerCount(c))
                  break retry;
              // 如果添加没有成功,重新获取ctl
              c = ctl.get();
              // 得到当前线程池状态,与外部循环的线程池做一个对比;如果不一致,则退回到外部循环,重新进行loop
              if (runStateOf(c) != rs)
                  continue retry;
		// 如果状态是相等的,则在内部循环进行loop即可
          }
      }
// 当上面的自旋锁添加运行线程数成功后,才会进入此处
      
      boolean workerStarted = false;
      boolean workerAdded = false;
      Worker w = null;
      try {
          // 构建一个Worker对象,里面就默认分配了一个线程
          w = new Worker(firstTask);
          final Thread t = w.thread;
          if (t != null) {
              // 加锁
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try {
                  // 再次获取线程池的运行状态
                  int rs = runStateOf(ctl.get());
			
                  // 运行状态是RUNNING 或者 (运行状态是SHUTDOWN 且 当前任务为null)
                  if (rs < SHUTDOWN ||
                      (rs == SHUTDOWN && firstTask == null)) {
                      // 预先检查线程是否为启动状态
                      if (t.isAlive())
                          // 如果是启动状态,就有问题了,要抛出异常
                          throw new IllegalThreadStateException();
                      // 将任务添加至workers容器中,这个容器包括了运行线程的状态
                      // largestPoolSize 最大线程池数量
                      workers.add(w);
                      int s = workers.size();
                      if (s > largestPoolSize)
                          largestPoolSize = s;
                      // 标志位改为true,代表worker已经添加至workers
                      workerAdded = true;
                  }
              } finally {
                  // 解锁
                  mainLock.unlock();
              }
              // 判断标志位
              if (workerAdded) {
                  // 启动线程,执行任务
                  t.start();
                  // 启动线程标志位,设置为true
                  workerStarted = true;
              }
          }
      } finally {
          // 是否启动线程的标志位
          if (! workerStarted)
              // 添加一个worker失败的处理
              addWorkerFailed(w);
      }
      // 返回是否启动线程的标志位
      return workerStarted;
  }

4)为什么线程池中会把null作为任务添加

看下图,很多调用addWorker()方法都传递了一个null,这是为什么呢,有什么用?

首先说结论,这是为了更快的启动队列中的任务

大家通过上面第一节的任务添加流程就会发现,有一些任务在添加进入任务阻塞队列后就没有声音了。

那么就要看看,如果添加一个为null的任务会出现什么情况把。

代码语言:javascript复制
// 这中间代码省略了,第一部分是判断状态和添加任务数,第二部分是判断状态和启动任务
// 如果线程池的状态是RUNNING,那么一个任务是大概率都是可以添加成功的
private boolean addWorker(Runnable firstTask, boolean core) {
	// ... 省略了,关心下面t.start();做了什么即可
       if (workerAdded) {
           t.start();
           workerStarted = true;
       }
   }

// 主要还是要看启动Worker做了什么
   private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
       
       // 首先构造方法
       Worker(Runnable firstTask) {
           setState(-1);
           // 传入一个任务后,作为自己的属性
           this.firstTask = firstTask;
           // 将自己作为任务构建了一个线程作为自己的属性。他自己也实现了Runnable接口
           this.thread = getThreadFactory().newThread(this);
       }
       
       // 当上面t.start();启动的是Worker的run方法
       public void run() {
           runWorker(this);
       }

       // 上面run();方法调用过来的
       final void runWorker(Worker w) {
           // 线程池里面的,当前的线程
           Thread wt = Thread.currentThread();
           // 当前真正要执行的任务,可能为null,本小节直接定义null
           Runnable task = w.firstTask;
           // 将属性变为null
           w.firstTask = null;
           w.unlock();
           boolean completedAbruptly = true;
           try {
               // 重点在这里,当task==null时,它会去getTask();方法中去获取task进行判断
               // 如果getTask();方法返回的是null,那么说明本次循环结束,任务运行完成
               // 如果getTask();方法返回的是队列中的任务,那么进入循环体,执行任务
               while (task != null || (task = getTask()) != null) {
                   w.lock();
				// 下面就是一些判断状态和执行任务的代码了
                   if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                         runStateAtLeast(ctl.get(), STOP))) &&
                       !wt.isInterrupted())
                       wt.interrupt();
                   try {
                       beforeExecute(wt, task);
                       Throwable thrown = null;
                       try {
                           // 这里才是真正执行我们任务的地方
                           task.run();
                       } catch (RuntimeException x) {
                           thrown = x; throw x;
                       } catch (Error x) {
                           thrown = x; throw x;
                       } catch (Throwable x) {
                           thrown = x; throw new Error(x);
                       } finally {
                           afterExecute(task, thrown);
                       }
                   } finally {
                       task = null;
                       w.completedTasks  ;
                       w.unlock();
                   }
               }
               completedAbruptly = false;
           } finally {
               processWorkerExit(w, completedAbruptly);
           }
       }

       // 这是从上面runWorker();方法调用而来
       private Runnable getTask() {
           // 最后poll()方法是否超时
           boolean timedOut = false;

           for (;;) {
               // 获取线程池状态
               int c = ctl.get();
               int rs = runStateOf(c);

               // 检查阻塞队列是否为空
               // 当状态是RUNNING时,false
               // 当状态是SHUTDOWN时,判断队列是否为空,如果有值,false
               // 如果状态是后面几种状态时,无论队列是否有值,true
               if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                   decrementWorkerCount();
                   // 这里返回null,就代表task=null了
                   return null;
               }

               // 当前线程池运行的线程数
               int wc = workerCountOf(c);

               // allowCoreThreadTimeOut,这个讲一个,这个布尔值代表,核心线程数是否也可以被回收
               // 如果为true,空闲时会保证keepAliveTime的时候,过期销毁
               // 如果为false(默认),那么在空闲时也会保持活动
               // 这里主要判断是否允许超时保留核心线程,用来确定下面阻塞队列的阻塞时间
               boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

               // 加这个判断,主要是想留一个线程在这循环阻塞,加快从队列中取任务的流程步骤
               if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
                   if (compareAndDecrementWorkerCount(c))
                       return null;
                   continue;
               }

               try {
                   // 获取任务
                   Runnable r = timed ?
                       workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                       workQueue.take();
                   // 返回
                   if (r != null)
                       return r;
                   // 没有就一直处在循环之中,并配合上面的107行判断使用
                   timedOut = true;
               } catch (InterruptedException retry) {
                   timedOut = false;
               }
           }
       }

   }

三、最后

当前线程池的解读就到这里了,如果不是为了造火箭,谁会看这么底层的东西。

有一说一,这线程池写起来真的很优雅!!!

我是半月,你我一同共勉!

0 人点赞