线程池实现原理-2

2019-08-13 10:13:40 浏览数 (1)

前言

线程池实现原理-1

addWorker实现

在看addWorker方法之前,我们先看一个例子,了解一下retry的使用

  1. break retry 跳到retry处,且不再进入循环
  2. continue retry 跳到retry处,且再次进入循环
代码语言:javascript复制
public static void main(String[] args) {
    breakRetry();
    continueRetry();
}

private static void breakRetry() {
    int i = 0;
    retry:
    for (; ; ) {
        System.out.println("start");
        for (; ; ) {
            i  ;
            if (i == 4)
                break retry;
        }
    }
    //start 进入外层循环
    //4
    System.out.println(i);
}

private static void continueRetry() {
    int i = 0;
    retry:
    for(;;) {
        System.out.println("start");
        for(;;) {
            i  ;
            if (i == 3)
                continue retry;
            System.out.println("end");
            if (i == 4)
                break retry;
        }
    }
    //start 第一次进入外层循环
    //end i=1输出
    //end i=2输出
    //start 再次进入外层循环
    //end i=4输出
    //4 最后输出
    System.out.println(i);
}

这里说一下Runnable 参数的含义

  1. firstTask != null 说明任务被添加了,我们需要启动一个线程去执行它
  2. fistTask == null 说明我只想启动一个线程去消费阻塞队列中的任务
代码语言:javascript复制
// core为ture表示是核心线程,否则非核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        /**
         * 将条件改为如下形式,方便理解
         * rs >= SHUTDOWN && (rs != SHUTDOWN || fistTask != null || workQueue.isEmpty)
         * 1.如果当前线程池的状态>SHUTDOWN,addWorker返回false,添加任务失败
         * 2.如果当前线程池的状态=SHUTDOWN,分为如下2种情况
         * (1)workQueue为空,fistTask == null 和fistTask != null的任务都不能
         * (2)workQueue不为空,可以添加fistTask != null的任务
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                // 1.是核心线程 >= corePoolSize
                // 2.非核心线程 >= maximumPoolSize
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 成功将线程数 1,跳到retry处,并且不再进入死循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 否则重新读取ctl
            c = ctl.get();  // Re-read ctl
            // 线程状态发生改变,跳到retry处,并且进入死循环
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    // 线程是否启动的标志位
    boolean workerStarted = false;
    // 线程封装成Worker对象,是否添加到线程池中的标志位
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                // 1.rs < SHUTDOWN 即 rs = RUNNING
                // 2.rs == SHUTDOWN && firstTask == null
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    // 刷新了largestPoolSize
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 留心一下这里,后面会从这里开始讲起
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

仔细理解一下这段代码,其实就能理解,当线程池处于RUNNING 接受新任务,并且处理进入队列的任务,处于SHUTDOWN 不接受新任务,处理进入队列的任务,剩余状态都不会处理任务,上面代码中的注释有详细解释

代码语言:javascript复制
if (rs >= SHUTDOWN &&
    ! (rs == SHUTDOWN &&
       firstTask == null &&
       ! workQueue.isEmpty()))
    return false;

线程池在执行任务的时候,会把任务对象包装成一个Worker对象,Worker对象是ThreadPoolExecutor的一个内部类,继承了AbstractQueuedSynchronizer,实现了一个独占锁,status值为0表示未锁定状态,status值为1表示锁定状态,实现了Runnable接口,在执行run方法的时候,它执行完初始化的firstTask后,还会从workQueue中取出任务执行,这样就不用新建一个线程执行任务,而是在一个线程中执行了好几个任务

Worker内部类

代码语言:javascript复制
// 省略了一部分对锁的操作,简单的对AQS的一个实现
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

}

runWorker实现

当t.start()被执行后,run方法会执行runWorker方法,来看runWorker方法

代码语言:javascript复制
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 允许中断
    w.unlock(); // allow interrupts
    // 标识线程是不是异常终止的
    boolean completedAbruptly = true;
    try {
        // 先执行初始化的fistTask,执行完成后还会无限循环获取workQueue里的任务来执行
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 配合shutdownNow 方法
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 线程开始开始执行之前执行此方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 线程执行后执行
                    afterExecute(task, thrown);
                }
            } finally {
                // 运行过的置为null
                task = null;
                w.completedTasks  ;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

这个方法需要注意的就是

  1. getTask()从阻塞队列中获取任务,如果队列中没有任务会被阻塞,并不会占用CPU资源
  2. 可以根据业务需要自定义beforeExecute和afterExecute方法

getTask实现

代码语言:javascript复制
private Runnable getTask() {
    // 标记获取头结点并且移除头结点的方法是否超时
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 1.rs >= STOP
        // 2.rs == SHUTDOWN && workQueue.isEmpty()
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 用CAS将线程池中的数量-1,直到成功才会退出
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // 1.核心线程允许被销毁
        // 2.核心线程数 > corePoolSize
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 1.timeOut为true,表示超时获取,workQueue没有任务,说明线程应该被销毁,但是还是要 && timed
        // 2.wc > maximumPoolSize肯定要删除线程了
        // 3.workQueue为空可以销毁线程,此时有可能所有线程都被销毁了
        // 4.workQueue不为空,只有wc > 1才能被删除
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // timed为true,超过keepAliveTime还是没有任务,返回null
            // timed为false,则一直阻塞等待任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

processWorkerExit实现

线程执行完毕执行的方法

代码语言:javascript复制
// processWorkerExit在runWorker结束之后被调用
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果是异常终止,或者被中断,减少workerCount
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount  = w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // Transitions to TERMINATED state if either (SHUTDOWN and pool
    // and queue empty) or (STOP and pool empty)
    tryTerminate();

    int c = ctl.get();
    // 状态为RUNNING或者SHUTDOWN
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 目前核心线程已经够用了,不用再创建
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 增加一个消费的线程
        addWorker(null, false);
    }
}

shutdown实现

代码语言:javascript复制
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 检查能否操作线程
        checkShutdownAccess();
        // 确保状态 >= SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中断所有的空闲线程
        interruptIdleWorkers();
        // ScheduledThreadPoolExecutor会重写这个方法,做一些其他的运算
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
代码语言:javascript复制
// 中断空闲线程
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}
代码语言:javascript复制
// onlyOne为true则只中断一个空闲线程,否则全部中断
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // 遍历Worker并执行中断操作,w.tryLock()保证了正在执行的Worker不会被中断
            // 因为正在运行的Worker再次获取锁会失败
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

这里需要注意的是不会中断正在运行的线程,因为正在运行的线程w.tryLock()会返回false

shutdownNow实现

代码语言:javascript复制
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 确保状态 >= STOP
        advanceRunState(STOP);
        // 中断所有线程
        interruptWorkers();
        // 获取所有没有执行完成的task
        // 即将阻塞队列中的任务放到tasks中 
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
代码语言:javascript复制
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}
代码语言:javascript复制
// 这个是Worker内部类的方法
void interruptIfStarted() {
    Thread t;
  // state的初始值为-1,运行到runWorker才允许中断
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

shutdownNow会中断所有的线程,因为和shutdown相比在中断之前,不用获取锁

tryTerminate实现

代码语言:javascript复制
// 将状态转换到TERMINATED
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // 以下几种状态不能转换为TERMINATED
        // 1.RUNNING状态
        // 2.TIDYING或TERMINATED
        // 3.SHUTDOWN状态,但是workQueue不为空
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 让子类去实现,做一些操作
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

从上面可看出状态转换的条件

  1. SHUTDOWN想转化为TIDYING,需要workQueue为空,同时workerCount为0
  2. STOP转化为TIDYING,需要workerCount为0

0 人点赞