全方位解析-Android中的线程池

2022-03-29 15:19:39 浏览数 (1)

笔记文章,没有废话,句句关键

线程池的优点

  1. 重用线程池里的线程,避免创建和销毁线程所带来的性能开销
  2. 有效控制最大并发数,避免造成线程间抢占系统资源而造成阻塞
  3. 提高线程可管理性,可以统一进行分配,调优和监控的能力

Android中的线程池

复用Java中的Executor接口,具体实现类为ThreadPoolExecutor,它有以下几个参数

参数

说明

注释

corePoolSize

线程池中核心线程数量

一直存活,即使处于闲置状态

maximumPoolSize

最大能创建的线程数量(非核心线程,包含核心线程个数)

达到这个值后,后续任务会阻塞

keepAliveTime

非核心线程最大存活时间

当设置allowCoreThreadTimeOut=true 同样会做用于核心线程,但通常不会这么做

unit

keepAliveTime的时间单位

TimeUnit中的枚举(时间单位)

workQueue

等待队列。execute 方法提交的Runnable存储在其中

如果线程池中的线程数量大于等于corePoolSize的时候,把该任务放入等待队列

threadFactory

线程创建工程厂(用来创建线程的)

默认使用Executors.defaultThreadFactory() 来创建线程,线程具有相同的NORM_PRIORITY优先级并且是非守护线程

handler

线程池的饱和拒绝策略(不常用)

阻塞队列已且没有空闲的线程,此时继续提交任务,就需要采取一种策略处理该任务,默认会抛出异常

常用与关键方法
  • void execute(Runnable run)//提交任务,交由线程池调度
  • void shutdown()//关闭线程池,等待任务执行完成
  • void shutdownNow()//关闭线程池,不等待任务执行完成
  • int getTaskCount()//返回线程池找中所有任务的数量 (已完成的任务 阻塞队列中的任务)
  • int getCompletedTaskCount()//返回线程池中已执行完成的任务数量 (已完成的任务)
  • int getPoolSize()//返回线程池中已创建线程数量
  • int getActiveCount()//返回当前正在运行的线程数量
  • void terminated() 线程池终止时执行的策略
线程池的运行状态

线程存在运行状态,如下:

状态

说明

NEW

初始状态,线程被新建,还没调用start方法

RUNNABLE

运行状态,把"运行中"和"就绪"统称为运行状态

BLOCKED

阻塞状态,表示线程阻塞于锁

WAITING

等待状态,需要其他线程通知唤醒

TIME_WAITING

超时等待状态,表示可以在指定的时间超时后自行返回

TERMINATED

终止状态,表示当前线程已经执行完毕

在这里插入图片描述

根据线程的运行状态思想,我们来展开线程池的运行状态:

状态

说明

RUNNABLE

表示当前线程池,存在正在运行的线程

SHUTDOWN

关闭线程池,不在执行新的任务,但会执行完线程池正在运行的任务,和添加到队列中的任务,对应shutDown()方法

STOP

立即关闭线程池,打断正在运行的任务,且不再处理等待队列中已添加的任务,对应shutDownNow()方法

TIDYING

shutDown() / shutDownNow()后进入此状态,表示队列和线程池为空,之后进入TERMINATED状态

TERMINATED

终止状态,表示当前线程已经执行完毕,并调用 terminated() 通知外界

在这里插入图片描述

安卓中常用的四种线程池

Executors.newFixedThreadPool()

代码语言:javascript复制
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  1. 线程数量固定的线程池
  2. 核心线程与非核心线程数量相等,意味着只有核心线程。
  3. keepAliveTime = 0L,即使线程池中的线程空闲,也不会被回收。除非调用shutDown()或shutDownNow()去关闭线程池。
  4. 可以更快响应外界的请求,且任务阻塞队列为无边界队列(LinkedBlockingQueue()链表结构的阻塞队列),意味着任务可以无限加入线程池

Executors.newScheduledThreadPool()

代码语言:javascript复制
   public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

  public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              10L, MILLISECONDS,
              new DelayedWorkQueue());
    }
  1. 核心线程数量固定,非核心线程是无限的,但非核心线程存活时间非常短(10毫秒),闲置后会被回收
  2. 适合执行定时任务和固定周期的重复任务
  3. DelayedWorkQueue()优先级队列(堆结构),会根据定时/延时时间进行排序,延时少,先执行

Executors.newSingleThreadExecutor() 常用

代码语言:javascript复制
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  1. 核心线程数为1的线程池。确保所有任务在同一线程执行,因此不用考虑线程同步问题
  2. 阻塞队列是无界的,可以一直接收任务 、

Executors.newCachedThreadPool()

代码语言:javascript复制
  public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  1. 没有核心线程,非核心线程无限,但执行完任务后,线程存活60秒
  2. SynchronousQueue() 是个特殊的队列,可以理解为无法存储元素的队列,因此线程池接收到任务就会创建线程去执行。当整个线程都处于空闲状态,60秒后,线程池中的线程都会因超时而被停止,不占用系统资源。
  3. 根据以上两点,此线程池适合执行耗时较少但频繁的任务执行
线程池源码分析

在分析源码前,先把线程池的工作流程图放出来。这样再去阅读源码会更加深刻

在这里插入图片描述

阅读源码的目的:

  • 了解线程池如何复用
  • 如何巧妙的使用线程池,根据需求去自定义线程池,以达到暂停/恢复,任务具有优先级,监控,分配,调优等
  • 学习线程池优秀的设计思想
  • 面试吹牛逼

源码分析

分析源码前看下ThreadPoolExecutor类的头注释:

代码语言:javascript复制
public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * The main pool control state, ctl, is an atomic integer packing
     * two conceptual fields
     *   workerCount, indicating the effective number of threads
     *   runState,    indicating whether running, shutting down etc
     *
     * In order to pack them into one int, we limit workerCount to
     * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
     * billion) otherwise representable. If this is ever an issue in
     * the future, the variable can be changed to be an AtomicLong,
     * and the shift/mask constants below adjusted. But until the need
     * arises, this code is a bit faster and simpler using an int.
     *
     * The workerCount is the number of workers that have been
     * permitted to start and not permitted to stop.  The value may be
     * transiently different from the actual number of live threads,
     * for example when a ThreadFactory fails to create a thread when
     * asked, and when exiting threads are still performing
     * bookkeeping before terminating. The user-visible pool size is
     * reported as the current size of the workers set.
     *
     * The runState provides the main lifecycle control, taking on values:
     *
     *   RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *   STOP:     Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed
     *
     * The numerical order among these values matters, to allow
     * ordered comparisons. The runState monotonically increases over
     * time, but need not hit each state. The transitions are:
     *
     * RUNNING -> SHUTDOWN
     *    On invocation of shutdown(), perhaps implicitly in finalize()
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TIDYING
     *    When both queue and pool are empty
     * STOP -> TIDYING
     *    When pool is empty
     * TIDYING -> TERMINATED
     *    When the terminated() hook method has completed
     *
     * Threads waiting in awaitTermination() will return when the
     * state reaches TERMINATED.
     *
     * Detecting the transition from SHUTDOWN to TIDYING is less
     * straightforward than you'd like because the queue may become
     * empty after non-empty and vice versa during SHUTDOWN state, but
     * we can only terminate if, after seeing that it is empty, we see
     * that workerCount is 0 (which sometimes entails a recheck -- see
     * below).
     */

这里面有几段比较重要 :

代码语言:javascript复制
     * The main pool control state, ctl, is an atomic integer packing
     * two conceptual fields
     *   workerCount, indicating the effective number of threads
     *   runState,    indicating whether running, shutting down etc
     *
     * In order to pack them into one int, we limit workerCount to
     * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
     * billion) otherwise representable. If this is ever an issue in
     * the future, the variable can be changed to be an AtomicLong,
     * and the shift/mask constants below adjusted. But until the need
     * arises, this code is a bit faster and simpler using an int.

线程池控制状态:clt是一个int类型的原子数,它表示两个概念,workerCount表示有效线程,笼统来讲,相当于线程池运行的线程个数,runState,表示线程池运行的状态,如RUNNING SHUTDOWN等等。 将这两个字段打包成一个int,int为4字节,有32位,后29位表示线程池的数量,最大可为5个亿,如果不够可以使用AtomicLong代替。当然,运行线程池根本达不到这个数量。

代码语言:javascript复制
     *   RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *   STOP:     Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed

五种状态的含义,上面提到过。

代码语言:javascript复制
     * RUNNING -> SHUTDOWN
     *    On invocation of shutdown(), perhaps implicitly in finalize()
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TIDYING
     *    When both queue and pool are empty
     * STOP -> TIDYING
     *    When pool is empty
     * TIDYING -> TERMINATED
     *    When the terminated() hook method has completed

线程池状态间的变化。等于上面我们画的图:

在这里插入图片描述

结论:阅读源码可以适当看下类头说明,尤其是Android源码的类头。可以帮我们更好的理解源码

从线程池入口进入,理解前面提到ctl是什么?

代码语言:javascript复制
 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
                                
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS; 
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

Integer.SIZE为32,因此 COUNT_BITS = Integer.SIZE - 3 = 29 CAPACITY表示线程池的容量:

(1 << COUNT_BITS) - 1 ---> 100000000000000000000000000000(30位) -1 -->11111111111111111111111111111(29位)

这代表int32位中,前3位表示线程池状态后面29位表示容量 计算结果的值如下:

在这里插入图片描述

总结来说,状态值自上而下,数值越来越大

AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 初始状态为RUNNING,线程数为0

ctlOf(int rs, int wc) 用来获取int中的值,用来调用下面两个方法 :

private static int runStateOf(int c) { return c & ~CAPACITY; } 获取线程池状态 private static int workerCountOf(int c) { return c & CAPACITY; } 获取线程池中线程的有效线程数量

计算方式如下:

在这里插入图片描述

理解上面的计算方式后,开始真正进入源码阅读

==入口函数:== Executor.execute()

代码语言:javascript复制
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        int num=workerCountOf(c);
        //第一步
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //第二步
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }//第三步
        else if (!addWorker(command, false))
            reject(command);
    }
  // firstTask 待执行的任务 core 是否要启动核心线程
 private boolean addWorker(Runnable firstTask, boolean core) 

注释翻译: 线程池的执行分3步

代码语言:javascript复制
 if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
  • 如果小于corePoolSize核心线程数,直接创先新的线程,并将任务作为该线程的第一个任务(firstTask),因为是往新线程执行任务,肯定是第一个任务addWorker()方法的返回值,表示是否添加任务成功,core = true 启动核心线程 ,并直接return返回,不再向下执行
代码语言:javascript复制
//  if (workerCountOf(c) < corePoolSize)不满足,也就是运行线程大于等于核心线程,
//  此时可能为核心线程达到最大,或核心与非核心共存
//  那么继续判断,线程池为运行状态,同时加入队列是否成功,
  if (isRunning(c) && workQueue.offer(command)) {
            // 成功! 进行二次校验,因为此时线程池中的线程可能销毁
            // (比如非核心到达keepAliveTime指定存活时间而销毁)或某个线程调用shutDown()方法
            int recheck = ctl.get();
            // 如果不是RUNNING状态 ,删除刚才offer进队列的任务
            if (!isRunning(recheck) && remove(command))
                // 执行拒绝策略 调用创建时的handle,执行 handler.rejectedExecution(command, this); ,默认抛出异常
                reject(command);
                // 如果线程池 没有正在运行的线程,那么这个线程可能调用了shutDown()方法,要关闭了,那么就加入个null任务,
                //也就是不执行 新任务,去创建非核心线程 core = false 执行等待队列中的任务
            else if (workerCountOf(recheck) == 0)
                // 添加个null任务
                addWorker(null, false);
        }//(isRunning(c) && workQueue.offer(command))不满足,表示等待队列已满,添加任务失败,那么就创建非核心线程
        //如果创建失败。执行拒绝策略
         else if (!addWorker(command, false))
            reject(command);

boolean addWorker(Runnable firstTask, boolean core) 方法逻辑:

代码语言:javascript复制
private boolean addWorker(Runnable firstTask, boolean core) {
        retry: //双层for循环 retry外层循环终止标识
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 如果rs >= SHUTDOWN不再添加新执行 后面的判断条件可以不关心
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                //获取线程池中,正在执行任务的线程
                int wc = workerCountOf(c);
                // 大于容量,不可能,不用看, wc >= (core ? corePoolSize : maximumPoolSize 这个判断是
                // core = true wc >= corePoolSize 核心线程到达上限 还要创建核心线程,不可以 返回false
                // core = false wc >= maximumPoolSize 达到非核心(包含核心)线程上限 还要创建线程 返回false
                if (wc >= CAPACITY || 
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                    //workerCount  1 
                if (compareAndIncrementWorkerCount(c))
                   //返回外层循环 ,循环结束
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 创建 Worker包装我们的任务 firstTask  
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                     // rs < SHUTDOWN 表示是RUNNING   状态 
                     // (rs == SHUTDOWN && firstTask == null) 表示SHUTDOWN状态 且没有不再执行新的任务
                    //2种情况都会继续执行
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                            //上面2中情况 都会加入workers集合
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                            // 设置 workerAdded = true;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //workerAdded = true; 上面设置过了
                if (workerAdded) {
                // 启动线程 
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
代码语言:javascript复制
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
       
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            //赋值 firstTask 并用getThreadFactory()得到我们创建时传入的线程工厂调用newThread(this);创建线程,并将自己作为Runnable传入
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
}

Worker是个Runnable,上面的w = new Worker(firstTask); 方法其实就是对任务的包装 在构造方法中赋值 firstTask 并用getThreadFactory()得到我们传入的线程工厂调用newThread(this);创建线程,并将自己作为Runnable传入。当addWorker()方法调用 t.start() 就会执行Worker类中的run()方法。

Worker类

代码语言:javascript复制
 /** Delegates main run loop to outer runWorker. */
public void run() {
            runWorker(this);
        }
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //赋值firstTask给task
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //要执行的任务不为空直接执行,如果等于空调用getTask()去等待队列取出任务
            //对应了最开始 (workerCountOf(recheck) == 0) addWorker(null, false); 这段判断条件
            //之前提过 添加空任务的目的就是不再执行新任务,去执行等待队列的任务。因此getTask()得以执行 
            
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //执行任务前调用此时已经在线程中运行 ,因为这是在Worker中的run方法中
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    //执行我们传入的Runnable
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                    //执行任务后调用
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks  ;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask()

代码语言:javascript复制
 private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            //allowCoreThreadTimeOut 表示非核心线程的超时时间keepAliveTime是否作用于核心线程,默认是false
            // 所以只看后面的判断。  wc > corePoolSize;表示大于核心线程 true
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
            //wc > corePoolSize;表示大于核心线程 true 那么就是创建非核心线程,非核心线程取等待队列的任务有超时时间 
            //keepAliveTime时间内阻塞。取不到 继续向下执行 任务完成 非核心线程执行完销毁
            // wc <= corePoolSize; 小于或等于核心线程 workQueue.take();一直阻塞,当队列有任务时立马执行,这也是为什么核心线程一直存在的原因
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

总结:

  • 核心线程一直存在的原因的是workQueue.take();,非核心线程keepAliveTime时间后销毁的原因是workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
  • 从以下代码可以看出。线程池中的任务达到corePoolSize线程数时,所有任务都会先添加到队列,当调用workQueue.offer(command)时,被 workQueue.take();阻塞的核心线程就会拿到任务,然后去执行,这很关键
  • if (isRunning(c) && workQueue.offer(command)) 里的条件当都都不满足时,什么都不会做,之后会往队列中添加元素。当核心线程有空间是getTask中take()阻塞的核心线程会立马取队列取出个任务执行
  • beforeExecute(wt, task);afterExecute(task, thrown);会在任务执行前后执行
  • 核心线程与非核心线程没有本质上的区别。只是在获取任务时,通过此判断boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;,非核心超时会结束,而非核心则会take()阻塞住
代码语言:javascript复制
int c = ctl.get();
        int num=workerCountOf(c);
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);

通过源码 得出前面提到的结论

在这里插入图片描述

实战

根据学习到的源码,封装自己的线程池工具类,它有一下几个特点:

代码语言:javascript复制
/**
 * 1. 支持按任务的优先级去执行,
 * 2. 支持线程池暂停.恢复(批量文件下载,上传) ,
 * 3. 异步结果主动回调主线程
 * todo 线程池能力监控,耗时任务检测,定时,延迟,
 *
 * 根据源码分析 :当工作线程小于核心线程是不会进入队列
 * 当核心线程启动后 每次都会进入队列
 */
object MyExecutor {
    private var hiExecutor: ThreadPoolExecutor

    //暂停线程表示:hint:这里为啥不用volatile修饰?
    private  var  isPause: Boolean = false

    private val lock: ReentrantLock = ReentrantLock()

    private val condition: Condition = lock.newCondition()

    //主线程回调能力的Handler
    private val handler: Handler = Handler(Looper.getMainLooper())
    private val seq = AtomicLong()

    /**
     * 初始化构建参数
     */
    init {
        //获取cpu数
        val cpuCount = Runtime.getRuntime().availableProcessors()
        //核心线程数为 cpu 1
        val corePoolSize = cpuCount   1
        //非核心线程数
        val maxPoolSize = corePoolSize * 2   1
        //非核心线程超时销毁时间
        val keepAliveTime = 30L
        //超时时间单位 s
        val unit = TimeUnit.SECONDS
        //相当于默认实现 创建一个线程
        val threadFactory = ThreadFactory() {
            val thread = Thread(it)
            //hi-executor-0
            thread.name = "hi-executor-"   seq.getAndIncrement()
            return@ThreadFactory Thread(it)
        }



        val priorityBlockingQueue: PriorityBlockingQueue<out Runnable> = PriorityBlockingQueue()
        hiExecutor = object : ThreadPoolExecutor(
            corePoolSize,
            maxPoolSize,
            keepAliveTime,
            unit,
            priorityBlockingQueue as BlockingQueue<Runnable>,
            threadFactory
        ) {
            override fun beforeExecute(t: Thread?, r: Runnable?) {
                priorityBlockingQueue.size
                if (isPause) {
                    try {
                        //条件唤醒线程标准写法
                        lock.lock()
                        condition.await()
                    } finally {
                        lock.unlock()
                    }

                }
            }

            override fun afterExecute(r: Runnable?, t: Throwable?) {
                //监控线程池耗时任务,线程创建数量,正在运行的数量
                HiLog.e("已执行完的任务的优先级是:"   (r as PriorityRunnable).priority.toString()   Thread.currentThread().name)
                priorityBlockingQueue.size
            }
        }
    }

    /**
     * priority越小 优先级越高
     * @IntRange 优先级范围
     *  @JvmOverloads 重载方法 生成多种重载方法
     *  无返回结果 与正常线程池一样
     */
    @JvmOverloads
    fun execute(@IntRange(from = 0, to = 10) priority: Int = 0, runnable: Runnable) {
        hiExecutor.execute(PriorityRunnable(priority, runnable))
    }


    @JvmOverloads
    fun execute(@IntRange(from = 0, to = 10) priority: Int = 0, callable: Callable<*>) {
        val newFixedThreadPool = Executors.newCachedThreadPool()

        hiExecutor.execute(PriorityRunnable(priority, callable))
    }

    /**
     * 提供任务执行后返回结果
     * 并回调主线程的能力
     *
     * <T> 返回结果类型
     */
    abstract class Callable<T> : Runnable {

        override fun run() {
            handler.post {
                onPrepare()
            }
            //真正在线程中执行
            val t = onBackground()

            handler.post {
                onCompleted(t)
            }
        }

        /**
         * UI线程执行
         */
        open fun onPrepare() {
            //预处理 加载动画
        }

        /**
         * 线程池中执行
         */
        abstract fun onBackground(): T

        /**
         * UI线程执行
         * 执行完成
         */
        abstract fun onCompleted(t: T)
    }

    /**
     * 重写的意义在于 PriorityBlockingQueue 这个具有排序的堆结构 每个元素要有比较大小的能力
     * priority 线程优先级 runnable执行任务的
     */
    class PriorityRunnable(val priority: Int, private val runnable: Runnable) : Runnable,
        Comparable<PriorityRunnable> {
        override fun run() {
            runnable.run()
        }

        override fun compareTo(other: PriorityRunnable): Int {
            //倒序 priority越小 优先级越高
            return if (this.priority < other.priority) 1 else if (this.priority > other.priority) -1 else 0
        }

    }

    //可能多个线程调用此方法
    @Synchronized
    fun pause() {
        isPause = true
        HiLog.e("hiexecutor is paused")

    }

    @Synchronized
    fun resume() {
        isPause = false
        lock.lock()
        try {
            condition.signalAll()
        } finally {
            lock.unlock()
        }
        HiLog.e("hiexecutor is resume")
    }
}

测试代码:

代码语言:javascript复制
    private void priorityExecutor() {
        for (int priority = 0; priority < 10; priority  ) {
            int finalPriority = priority;
            HiExecutor.INSTANCE.execute(priority, () -> {
                try {
                    Thread.sleep(1000 - finalPriority * 100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }

利用源码分析后,我们可以起送封装自己的线程池,它具有以下几种能力:

  1. 支持按任务的优先级去执行,
  2. 支持线程池暂停.恢复(批量文件下载,上传) ,
  3. 异步结果主动回调主线程
  4. 线程池能力监控,耗时任务检测,定时,延迟,

但执行中有2个问题:

  • 明明是使用优先队列,但为什么第一次的时候,任务优先级却是无序的,第二次就好了?
  • 使用pause()方法,线程还在执行?

对于问题1。

在这里插入图片描述

首先线程池的调度是,系统Cpu利用时间碎片随机去调度的。我们的设置的只能尽可能的去满足,按照优先级去执行,但不能保证。更重要的是。通过源码得知。当线程池中,核心线程数未到最大值时(测试例子中是5),是不会加入到队列的,因此也就不会排序。当第二次执行任务,线程池就会先加入队列。然后被take()阻塞的队列取到任务,然后执行,此时就有了优先级。 解决:可以在应用启动白屏页启动线程。异步去初始化第三方库,或其他不需要优先级的任务。这样之后的任务就都具有优先级了

对于问题2:

在这里插入图片描述

通过源码分析:

代码语言:javascript复制
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks  ;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    
我们重写的方法
override fun beforeExecute(t: Thread?, r: Runnable?) {
                priorityBlockingQueue.size
                if (isPause) {
                    try {
                        //条件唤醒线程标准写法
                        lock.lock()
                        condition.await()
                    } finally {
                        lock.unlock()
                    }

                }
            }

beforeExecute(wt, task);中的方法,只能阻塞当前线程,和后续要执行的线程,已经在线程中开始执行的任务是无法暂停的。 这其实不是问题。只是种误解

总结

通过学习源码,我们的收获:

  • 学习了int字段的多重利用,试想下,如果不采用源码中的方式,我们至少要维护这几种字段:sunStatusrunningWorkCountmaxWorkCountshutDownWorkCount,还要保证他们的线程安全。这是种值得借鉴的优秀思想。ARouter中也用到类似思想(extars)
  • 清楚线程池的状态和工作流程,理解线程池如何复用与非核心线程的销毁
  • 找到插手线程池的思路,扩展自己的线程池
  • 通过学习源码,还解决了2个看似不合理,但却在源码中有答案的问题。

0 人点赞