Java高并发:线程池源码解析

2023-03-03 22:13:17 浏览数 (2)

一、基本概念

1 作用

线程池(Thread Pool)是一种基于池化思想管理线程的工具,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

2 线程池状态

线程池状态线程池状态

3 线程池类关系

类关系类关系

二、线程池

1 数据结构

代码语言:java复制
public class ThreadPoolExecutor extends AbstractExecutorService {

    //ctl是原子类型,32位,高3位是线程池状态,低29位是线程数量。
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    //任务队列,不受mainlock保护,需要使用一个线程安全的容器
    private final BlockingQueue<Runnable> workQueue;
	
    private final ReentrantLock mainLock = new ReentrantLock();

    //线程
    private final HashSet<Worker> workers = new HashSet<Worker>();

    private final Condition termination = mainLock.newCondition();

    /**
     * Tracks largest attained pool size. Accessed only under
     * mainLock.
     */
    private int largestPoolSize;

    /**
     * Counter for completed tasks. Updated only on termination of
     * worker threads. Accessed only under mainLock.
     */
    private long completedTaskCount;

   
    private volatile ThreadFactory threadFactory;

    //当任务队列已满时采取的拒绝策略
    private volatile RejectedExecutionHandler handler;

    //非核心线程最大空闲时间,超过就终止
    private volatile long keepAliveTime;

    /**
     * If false (default), core threads stay alive even when idle.
     * If true, core threads use keepAliveTime to time out waiting
     * for work.
     */
    private volatile boolean allowCoreThreadTimeOut;


    //核心线程数量,allowCoreThreadTimeOut默认为false,核心线程常驻内存
    private volatile int corePoolSize;

    //最大线程数量
    private volatile int maximumPoolSize;

    /**
     * The default rejected execution handler
     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

}

2 execute

该方法继承自Execute接口,用于向线程池提交任务。

代码语言:java复制
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    //1 判断线程数量是否达到核心线程数,如果未达到就添加线程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true)) //添加线程
            return;
        c = ctl.get();
    }
	//2 判断任务队列是否已满,如果未满则入队,如果已满则尝试添加线程来处理
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //线程池此时非运行态,不接受任务,移除任务并拒绝
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false)) //任务队列未满,
        reject(command);//3 线程达到最大线程数,此时执行拒绝策略
}
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

添加线程需要提供该线程执行的task,并指示是添加核心线程还是普通线程,这决定了是与核心线程数比较还是与最大线程数比较。添加核心线程addWorker(command, true),添加普通线程addWorker(command, false)

代码语言:java复制
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            //如果线程数量达到上限,或者达到核心线程数/最大线程数,则返回false,
            //由上层执行入队或者拒绝策略
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //线程数加1并跳出双层for循环,往后执行创建worker流程
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            //线程池状态发生改变,回到方法开头的"retry:"处往下重新执行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
            	//线程池正常运行时,将新线程添加到workers中
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    //此时thread还未执行start,不应该是alive状态
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;//largestPoolSize不影响线程池大小,只是记录
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //启动新线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //线程启动失败时收尾
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
//线程启动失败,需要将ctl中线程数减1,从workers中移除该线程
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        decrementWorkerCount();
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

这个方法中的"retry:"是标签,类似于goto,可以无条件跳转,慎用?。 一般Java中的标签用于多层循环,可以从内层循环一次跳出所有循环。 continue retry表示跳到标签位置处,重新执行。 break retry表示跳出多层循环,然后继续往下执行,不需要回到"retry:"位置。

3 线程池关闭

3.1 shutdown

代码语言:java复制
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //检测线程方法权限
        checkShutdownAccess();
        //将ctl修改为SHUTDOWN状态
        advanceRunState(SHUTDOWN);
        //将现有的线程中断
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            //先与Worker线程抢锁,确保Worker线程没有处于某个任务执行中的状态
            //将Worker线程中断
            //worker线程每次从workQueue拿取任务后都会加锁,处理完任务再释放锁
            //所以Worker线程没抢到锁就认为它是空闲的,疑惑???
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

3.2 shutdownNow

代码语言:java复制
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //将ctl设置为STOP状态
        advanceRunState(STOP);
        //中断所有Worker线程,不管是否处于正在执行task中
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}
void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

三、Worker

1 数据结构

工作者,封装了线程,当没有空闲核心线程且未达到最大线程数时就会创建新的worker。

Worker实现了Runnable接口,是thread执行的主任务,这个任务的主要逻辑是不断从任务队列拿task执行,并检测线程池状态,如果线程池为STOP/TIDYING状态,就调用interrupt中断自己。

代码语言:java复制
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
    }
//构造器,初始化时state=-1,禁止interrupt
Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

2 run

代码语言:java复制
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
	//Worker继承了AQS
    w.unlock(); // 构造器中初始时将state=-1,防止interrupt
    boolean completedAbruptly = true;
    try {
        //不断从任务队列取任务执行
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //线程池状态>=STOP,不得再处理任务,包括进队的任务,此时线程主动中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();//线程主动设置中断标志
            try {
                beforeExecute(wt, task);//钩子方法,未实现
                Throwable thrown = null;
                try {
                    task.run();//执行具体任务
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks  ;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

3 为什么继承AQS

Worker继承了AQS,实现了一个非阻塞、不可中断的锁。当外部缩小线程池核心线程数时,线程池会interrupt现有workers列表所有线程。为了安全,应该等线程处理完当前正在执行的task后才能interrupt,所以外部线程需要和Worker内线程抢锁,获取锁后再中断Worker内线程。

代码语言:java复制
public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
	//如果核心线程数变小,则中断空闲线程
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        // We don't really know how many new threads are "needed".
        // As a heuristic, prestart enough new workers (up to new
        // core size) to handle the current number of tasks in
        // queue, but stop if queue becomes empty while doing so.
        //如果核心线程数变大,则提前启动新线程
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}
public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();

    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();           // Just to set the interrupt flag
            b.interrupt(this);
            return;
        }
    }
    interrupt0();
}

四、拒绝策略

当任务队列已满,线程数量达到最大时,对后续到达的任务采用预定的策略处理。

代码语言:java复制
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

public interface RejectedExecutionHandler {

    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

官方实现了四种常见策略,如下:

代码语言:java复制
//抛出异常
public static class AbortPolicy implements RejectedExecutionHandler {
    /**
     * Creates an {@code AbortPolicy}.
     */
    public AbortPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task "   r.toString()  
                                             " rejected from "  
                                             e.toString());
    }
}
//直接丢弃,置之不理
public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardPolicy}.
     */
    public DiscardPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}
//丢弃最旧的任务空出位置,然后将该任务重新投入线程池
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy() { }
    
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}
//由线程池外的调用者线程直接执行该任务
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code CallerRunsPolicy}.
     */
    public CallerRunsPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

0 人点赞