线程池源码解读

2023-10-17 15:28:23 浏览数 (1)

线程池的在 Java并发中使用最多的一种手段,也是性能和易用性相对来说比较均衡的方式,下面我们就一起探索先线程池的原理。

线程池分配线程流程

对于线程池的使用,在这篇文章中就不过多的赘述,首先我们先看下线程池的分配线程的逻辑。

我们知道,在创建线程池的有 7 个核心的参数:

corePoolSize:核心线程数

maximumPoolSize:最大线程数

keepAliveTime:空闲线程存活时间

TimeUnit: 单位

workQueue:阻塞队列

ThreadFactory: 线程工厂

RejectedExecutionHandler: 拒绝策略

在这 7 个参数中,其中我们最重要的几个参数是 corePoolSize,maximumPoolSize,workQueue ,这三个参数来决定线程池主要的线程数和任务队列长度。

具体的流程图如下(图片来自网上,侵删):

构造函数的理解

构造函数的是我们创建线程池的第一步,可以简单的看下,搞清楚内部的变量是如何赋值的。

代码语言:javascript复制
public ThreadPoolExecutor(int corePoolSize,
                             int maximumPoolSize,
                             long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue,
                             ThreadFactory threadFactory,
                             RejectedExecutionHandler handler) {
       if (corePoolSize < 0 ||
           maximumPoolSize <= 0 ||
           maximumPoolSize < corePoolSize ||
           keepAliveTime < 0)
           throw new IllegalArgumentException();
       if (workQueue == null || threadFactory == null || handler == null)
           throw new NullPointerException();
       this.acc = System.getSecurityManager() == null ?
               null :
               AccessController.getContext();
       this.corePoolSize = corePoolSize;
       this.maximumPoolSize = maximumPoolSize;
       this.workQueue = workQueue;
       // 这个是重点
       // Timeout in nanoseconds for idle threads waiting for work. Threads use this timeout when there are more than corePoolSize present or if allowCoreThreadTimeOut. Otherwise they wait forever for new work.
      // 等待工作的线程池的超时 NS 时间,当线程多于核心线程数据数时候或者 allowCoreThreadTimeOut==true,现成会用此次的线程超时时间。 否则 他们会永远等待
       this.keepAliveTime = unit.toNanos(keepAliveTime);
       this.threadFactory = threadFactory;
       this.handler = handler;
   }

几个特殊变量的含义

在阅读代码时候,会有几个变量的障碍,因为设计的过于巧妙,所以看起来稍微有点晦涩。在下面的代码里,已经注释了相关代码的含义:

代码语言:javascript复制
// ctl= 11100000 00000000 00000000 00000000|0= RUNNING
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Interger.SIZE 值是 32 这里的值是 32-3=29 
private static final int COUNT_BITS = Integer.SIZE - 3;
// 实际是 1<<29-1 
// 00100000 00000000 00000000 00000000 - 1
// 00011111 11111111 11111111 11111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// -1 << 29 
// 11111111 11111111 11111111 11111111 << 29
// 11100000 00000000 00000000 00000000
// 取前 3 位 111
private static final int RUNNING    = -1 << COUNT_BITS;
// 000 
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 010
private static final int STOP       =  1 << COUNT_BITS;
// 100
private static final int TIDYING    =  2 << COUNT_BITS;
// 110
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
// CAPACITY = 00011111 11111111 11111111 11111111
// ~CAPACITY= 11100000 00000000 00000000 00000000
// c & ~CAPACITY
// 因为~CAPACITY 头三位为 111 ,所以&运算都是它本身
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 同理 CAPACITY 也都是本身
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

核心分配线程逻辑

代码语言:javascript复制
public void execute(Runnable command) {
       if (command == null)
           throw new NullPointerException();
       int c = ctl.get();
       // 1. 如果小于核心线程数,直接addWork,第二个参数 true 代表是核心线程
       if (workerCountOf(c) < corePoolSize) {
           if (addWorker(command, true))
               return;
           c = ctl.get();
       }
       // 2. 
       // 如果当前线程池是 RUNNING 状态,且能够 offer 进队列则进行 recheck(为什么要进行 recheck)
       // 如果 recheck 线程池不是 RUNNING 状态,且能移除当前 command对象成功,则直接 reject 
       // - 为了防止 add任务后,线程池调用了 shutdown 方法。
       // 否则 判断当前数量为 0 直接 addWorker 一个空的任务。 
       // isRuning c<SHUTDOWN 只有 runing
       if (isRunning(c) && workQueue.offer(command)) {
           int recheck = ctl.get();
           //如果出现刚入队列,线程池就被 shutdown了,任务就会被移除。
           if (! isRunning(recheck) && remove(command))
               reject(command);
           //这里是什么场景?核心线程数为0了?
           // 这就是前面说的我们可以设置核心线程数完成任务后就被销毁,那么核心线程数就为0了,
           //那么刚刚队列中的任务怎么执行呢,就需要使用使用创建非核心线程数来执行任务了(可以忽略,因为不会这么设置)
           //addWorker 会同时创建任务和线程,这个 addWorker(null,false) ,代表只开线程处理任务,不添加新任务。
           else if (workerCountOf(recheck) == 0)
               addWorker(null, false);
       }
       //3. 如果添加任务失败,则直接 reject 
       else if (!addWorker(command, false))
           reject(command);
   }

addWork 方法

这个是线程池添加任务的核心线程

代码语言:javascript复制
private boolean addWorker(Runnable firstTask, boolean core) {
      retry:
      for (;;) {
          int c = ctl.get();
          int rs = runStateOf(c);

          // Check if queue empty only if necessary.
         // 1. rs >= SHUTDOWN 代表不是 running 状态
         // 转换 rs >= SHUTDOWN &&  ( rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
         // 表示当前线程池处于SHUTDOWN 状态后,新增的任务不为空的,直接返回,代表添加任务失败
         // 表示当前线程池处于SHUTDOWN 状态后,核心线程数为0,新增非核心线程数来处理任务,但是队列为空,直接返回,代表添加任务失败.

         // 这里返回 false 有可能会触发 reject 方法
          if (rs >= SHUTDOWN &&
              ! ( rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()) )
              return false;

         
          for (;;) {
             // 此处保证,如果是核心线程和非核心线程都会返回 false ,但是如果是核心线程则不需要校验返回值
              int wc = workerCountOf(c);
              if (wc >= CAPACITY ||
                  wc >= (core ? corePoolSize : maximumPoolSize))
                  return false;
              // CAS 更新线程池的数量 ,更新成功完后会跳出 retry 
              if (compareAndIncrementWorkerCount(c))
                  break retry;
              // 这里是 CAS 更新失败逻辑
              c = ctl.get();  // Re-read ctl
              if (runStateOf(c) != rs)
                  continue retry;
              // else CAS failed due to workerCount change; retry inner loop
          }
      }

      boolean workerStarted = false;
      boolean workerAdded = false;
      Worker w = null;
      try {
          // 创建一个 Workder 对象,Worker 继承自 Runabler 对象
          w = new Worker(firstTask);
          final Thread t = w.thread;
          if (t != null) {
              //这里为什么需要一个 CAS 锁?
              //1. 避免 HashSet 不安全 ,锁为什么不加到  workers.add(w); 这里?
              //2. 
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try {
                  //再次校验线程状态
                  // 这里为什么再次校验,防止调用了 addWorker 未完成,就直接调用了 shutdown()
                  int rs = runStateOf(ctl.get());
  
                  if (rs < SHUTDOWN ||
                      (rs == SHUTDOWN && firstTask == null)) {
                      // 如果线程刚分配就已经在运行了,说明是不合法的流程
                      if (t.isAlive()) // precheck that t is startable
                          throw new IllegalThreadStateException();
                      workers.add(w);
                      int s = workers.size();
                      // largestPoolSize 标记当前线程池最大的线程数
                      if (s > largestPoolSize)
                          largestPoolSize = s;
                      workerAdded = true;
                  }
              } finally {
                  mainLock.unlock();
              }
              if (workerAdded) {
                 // 启动线程,调用 worker 中的 run 方法
                  t.start();
                  workerStarted = true;
              }
          }
      } finally {
          // 哪些场景会添加失败?
          //1. 上面 recheck 的时候。
          if (! workerStarted)
              addWorkerFailed(w);
      }
      return workerStarted;
  }

Woker 源码解析

worker 继承自 AbstractQueuedSynchronizerRunnable,本质还是一个线程对象

代码语言:javascript复制
final class Worker extends AbstractQueuedSynchronizer implements Runnable  

Worker(Runnable firstTask) {
          setState(-1); // inhibit interrupts until runWorker
          this.firstTask = firstTask;
          this.thread = getThreadFactory().newThread(this);
      }

      /** Delegates main run loop to outer runWorker  */
      public void run() {
          runWorker(this);
      }

runWorker代码

这个是线程调用了 start 方法,start 方法会调用 run 方法,run 方法会调用 task 中的 run 方法,进而间接的开线程调用了业务方法。

代码语言:javascript复制
final void runWorker(Worker w) {
       Thread wt = Thread.currentThread();
       Runnable task = w.firstTask;
       w.firstTask = null;
      // 这里为什么要先释放锁??
       w.unlock(); // allow interrupts
       boolean completedAbruptly = true;
       try {
           // 如果获取任务不为空,getTask()方法
           while (task != null || (task = getTask()) != null) {
               w.lock();
               // If pool is stopping, ensure thread is interrupted;
               // if not, ensure thread is not interrupted.  This
               // requires a recheck in second case to deal with
               // shutdownNow race while clearing interrupt
               if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                     &&!wt.isInterrupted())
                   wt.interrupt();
               try {
                   // 线程池 目前没有用,WorkerPoolExecutor会记录当前哪些 worker 列表
                   beforeExecute(wt, task);
                   Throwable thrown = null;
                   try {
                       //调用 run 方法,只是方法的调用,不是线程的启动
                       task.run();
                   } catch (RuntimeException x) {
                       thrown = x; throw x;
                   } catch (Error x) {
                       thrown = x; throw x;
                   } catch (Throwable x) {
                       thrown = x; throw new Error(x);
                   } finally {
                       afterExecute(task, thrown);
                   }
               } finally {
                   // 执行完成 ,任务赋值为空,下次循环就直接从队列中拿任务了
                   task = null;
                   w.completedTasks  ;
                   w.unlock();
               }
           }
           completedAbruptly = false;
       } finally {
           //异常跳出 while 循环,处理动作
           processWorkerExit(w, completedAbruptly);
       }
   }

getTask() 获取任务方法

这个getTask()方法是获取任务的方法,也是线程池线程能够复用的逻辑,在一个 while 循环中,一直拉取队列任务。

代码语言:javascript复制
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 如果线程池不是 RUNNING 且 状态> STOP 或者 队列为空
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);
            
             //如果允许核心线程数超时,或者是非核心线程 timed 才为 true.
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            //如果 (线程数大于最大线程 或者 已经超时)并且(线程数>1||队列为空)
           //如果允许核心线程超时且已经超时且队列中任务为空  则直接减少线程数据退出死循环返回空
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
               // 释放一个线程
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
               // 调用对应的 aqs 方法,拉取对应的 task
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

processWorkerExit

代码语言:javascript复制
private void processWorkerExit(Worker w, boolean completedAbruptly) {
      // 如果出现异常,则将线程池中线程数量-1
      if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
           decrementWorkerCount();

       final ReentrantLock mainLock = this.mainLock;
       mainLock.lock();
       try {
           //完成数  ,workers 移除
           completedTaskCount  = w.completedTasks;
           workers.remove(w);
       } finally {
           mainLock.unlock();
       }

       tryTerminate();

       int c = ctl.get();
      //如果线程池是RUNNING SHUTDOWN .
       if (runStateLessThan(c, STOP)) {
           //非用户任务异常,也就是手动执行的中断操作
           if (!completedAbruptly) {
               int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
               //如果队列中还有待执行的任务,那么必须要保证线程池中至少有一个线程,否则就创新一个新的非核心线程
               if (min == 0 && ! workQueue.isEmpty())
                   min = 1;
               if (workerCountOf(c) >= min)
                   return; // replacement not needed
           }
            // 开线程拉取任务处理
           addWorker(null, false);
       }
   }

整体的流程图

(图来自网上,侵删)

0 人点赞