技术专家带你彻底掌握线程池

2022-06-01 12:40:35 浏览数 (1)

◆ 1. 导读

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。说到线程池,几乎是项目中必备的、面试中必问的,但是很多人实际并没有彻底掌握这项技能。如生产如何设置核心线程与最大线程配比、线程池的拒绝策略取舍等等。

本文包含以下内容:

  • 什么是线程池?
  • 线程池相关类讲解
  • JDK 定义的四类线程池
  • 线程池的 7 大参数详解
  • Spring/Spring Boot 使用线程池
  • 根据设备 CPU 动态配置线程池
  • 常见面试题精讲

◆ 2. 什么是线程池?

◆ 2.1 基本概念

线程池,顾名思义,就是存放预先创建好的线程的池子,需要使用的时候直接从池子里拿即可。池化技术,可以类比数据库连接池,存放预先创建好的数据库连接的池子。

◆ 2.2 线程池优点

我们主张项目中,用线程池代替自己创建的线程,那么为什么这样建议呢?下面就来说一说,线程池的优点,为什么选择使用线程池。

合理分配

设想一下这样的情景,项目中使用到线程的地方,都是 new Thread 的方式,也就是说每次执行方法时都创建线程。那么当大量请求涌入,方法被疯狂调用,那么线程是不是也在疯狂地递增,这样,用不了多久服务器 CPU 就会被挤爆,而从导致宕机、瘫痪等问题。而这,显然不是我们愿意看到的,线程池的出现,很好的解决了这个问题。

线程池可以指定核心线程数和最大线程数,以及任务队列,限制了线程不能被无限创建,集中由线程池进行分配,避免了可能由线程引发的资源耗尽问题。

线程预热

项目启动,线程池就会预先创建一部分线程以供使用。需要使用时,直接使用即可,减少了创建线程所需要的时间。

资源复用

线程的创建到销毁是比较消耗 CPU 资源的,使用线程池,线程可以重复使用,提高了资源利用率。

◆ 2.3 进程和线程

本来想省略此节,但是由于面试中经常会提问,我们还是拿出来说一说。

  • 进程:一个正在执行的计算机程序就是一个进程
  • 线程:CPU 调度的最小单位,一个进程由一个或多个线程组成

◆ 2.4 线程的状态

此小节为高频面试点,最好做到倒背如流。线程拥有生命周期,生命周期的各个阶段就是线程的状态。

线程有以下状态:

  • 新建
  • 就绪
  • 阻塞
  • 等待(等待/等待超时)
  • 终止

线程状态源码

源码位置:java.lang.Thread

代码语言:javascript复制
public enum State {
        /**
         * 线程被创建但还未启动
         */
        NEW,

        /**
         * 线程为就绪(可运行)状态,在 jvm 中执行,但是可能需要等待其他操作系统资源执行
         */
        RUNNABLE,

        /**
         * 线程被监控器锁阻塞
         */
        BLOCKED,

        /**
         * 线程处于等待状态,需要被唤醒才能继续执行
         */
        WAITING,

        /**
         * 等待超时,正在等待的线程超过了指定的等待时间。
         */
        TIMED_WAITING,

        /**
         * 线程终止,线程执行完成
         */
        TERMINATED;
    }

◆ 2.5 并发和并行

记得有一次面试问到过这个问题,在这里也给大家分享一下,并发和并行。

  • 并发:多个线程访问同一个资源
  • 并行:同一时间执行多个任务

◆ 2.6 创建线程的几种方式

  • new Thread 类
  • 实现 Runnable 接口(无返回值)
  • 实现 Callable 接口(有返回值)
  • 使用线程池

◆ 3. 线程池相关类讲解

◆ 3.1 简单但有设计的 Executor 接口

线程池顶层接口是 Executor,它提供了一个 execute 执行方法。Executor 顶层接口的设计,用户只需要提供实现 Runnable 接口的实现类即可,不需要关心线程的创建和具体的执行。任务提交与创建和执行进行了解耦。

代码语言:javascript复制
public interface Executor {

    void execute(Runnable command);
}

◆ 3.2 进一步增强的 ExecutorService 接口

ExecutorService 在 Executor 的基础上,增加了一些能力:

  • 停止和关闭任务线程
  • 批量执行或指定执行用户提交的任务
  • 提交一个用户执行的 Runnable 的任务

常用方法

代码语言:javascript复制
// 提交 Runnable 任务
submit(Callable<T> task);
submit(Runnable task);
submit(Runnable task, T result);
// 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行
awaitTermination(long timeout, TimeUnit unit);
// 启动一次顺序关闭,执行以前提交的任务,但不接受新任务
shutdown();
// 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表
shutdownNow();
// 批量执行给定的任务
invokeAll(Collection<? extends Callable<T>> tasks);
// 执行单个指定的任务
invokeAny(Collection<? extends Callable<T>> tasks);

◆ 3.3 AbstractExecutorService 抽象类

AbstractExecutorService 抽象类比较简单,其大部分方法都继承于 ExecutorService,在此基础上增加了两个 protected 方法,供子类重写。

代码语言:javascript复制
// 为给定可运行任务和默认值返回一个 RunnableFuture。
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value);
// 为给定可运行任务返回一个 RunnableFuture。
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)

从 3.1~3.3 都是一些接口和抽象类的设计,并没有具体实现,可见设计在前、实现在后的重要性。

◆ 3.4 主角 ThreadPoolExecutor 类

从图 1 中,我们看到了 ThreadPoolExecutor 的继承关系图,即 ThreadPoolExecutor 实现了以上所有的接口和抽象类所具备的能力。我们平时说的 Java 线程池的真身,其实就是 ThreadPoolExecutor。

◆ 3.4.1 ThreadPoolExecutor 的运行原理图

线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。

◆ 3.4.2 任务提交执行流程

用户提交任务,ThreadPoolExecutor 进行任务分配,分以下四种种情况:

  • 存在空闲核心线程,线程分配核心线程直接执行
  • 无空闲核心线程,阻塞队列未满,则缓冲执行。此时如果核心线程有空闲了,线程分配核心线程从阻塞队列中获取任务执行
  • 无空闲核心线程,阻塞队列队满,则线程分配新的线程执行任务。新线程的数量上限即最大线程数
  • 无空闲核心线程,阻塞队列队满,已到达最大线程数,则会执行饱和策略(任务拒绝)

看图更好理解:

◆ 3.4.3 线程池生命周期

线程的生命周期和线程池的生命周期是有区别的,ThreadPoolExecutor 的运行状态有 5 种,分别为:

状态的转换流程图如下:

线程池的状态,不是用户显示设置的,而是由线程池内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量(workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量(workerCount)两个关键参数的维护放在了一起。

代码如下:

代码语言:javascript复制
// 原子整形, 底层采用 CAS 原理控制并发
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl 变量是用于控制线程池状态和有效线程数量的一个字段,它包含两部分信息:线程池的运行状态(runState)和线程池内有效线程的数量(workerCount),高 3 位保存 runState,低 29 位保存 workerCount,两个变量之间互不干扰。用一个变量存储两个值的设计,可以避免在做出相关决策时出现不一致的情况,不必为了维护两者的一致,而占用锁资源。

ctl 变量的相关计算是使用位运算来完成的,相比于基础运算,位运算速度较快。

代码语言:javascript复制
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// 计算当前运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 计算当前线程数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 通过状态和线程数生成 ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }

◆ 3.4.4 添加线程源码讲解

代码语言:javascript复制
private boolean addWorker(Runnable firstTask, boolean core) {
       //相当于 goto
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 如果线程池的状态到了 SHUTDOWN 或者之上的状态时候,只有一种情况还需要继续添加线程,
            // 那就是线程池已经 SHUTDOWN,但是队列中还有任务在排队,而且不接受新任务(firstTask 为 null)
            // 这里还继续添加线程的原因是加快执行等待队列中的任务,尽快让线程池关闭
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
               // 传入的 core 的参数,唯一用到的地方,如果线程数超过理论最大容量,如果 core 是 true 跟最大核心线程数比较,否则跟最大线程数比较
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 通过 CAS 自旋,增加线程数 1,增加成功跳出双层循环,继续往下执行
                if (compareAndIncrementWorkerCount(c))
                    break retry;
               // 检测当前线程状态如果发生了变化,则继续回到 retry,重新开始循环
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 走到这里,说明我们已经成功的将线程数 1 了,但是真正的线程还没有被添加
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
           // 添加线程,Worker 是继承了 AQS,实现了 Runnable 接口的包装类
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
               // 加锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    // 检查线程状态, 逻辑和之前一样
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                       // 线程只能被 start 一次
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                      // workers 是一个 HashSet,添加我们新增的 Worker
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                  // 启动 Worker
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

◆ 3.4.5 Worker 的工作流程

◆ 3.5 线程工具类 Executors

Executors 是线程的工具类,用于帮助用户快速创建线程池。此类包含所定义的 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 类的工厂和实用方法。此类支持以下各种方法:

  • 创建并返回设置有常用配置字符串的 ExecutorService 的方法。
  • 创建并返回设置有常用配置字符串的 ScheduledExecutorService 的方法。
  • 创建并返回“包装的”ExecutorService 方法,它通过使特定于实现的方法不可访问来禁用重新配置。
  • 创建并返回 ThreadFactory 的方法,它可将新创建的线程设置为已知的状态。
  • 创建并返回非闭包形式的 Callable 的方法,这样可将其用于需要 Callable 的执行方法中。

具体的应用我们将在下一节详细讲解,剩下相关的线程池相关类,我们将在后续逐步讲解。

◆ 4. JDK 定义的四类线程池

小建议:建议先阅读第五节——线程池 7 大参数详解,这样有助于大家阅读理解。

JDK 中定义了四类线程池:

  • 固定数量线程池
  • 单线程线程池
  • 带缓存的线程池
  • 定时任务线程池

下面我们将来一步步解析这四类线程池,这四类线程池可直接使用 3.5 节中的 Executors 创建。

◆ 4.1 固定数量线程池

◆ 4.1.1 创建固定数量线程池

代码语言:javascript复制
    /** 使用 Executors 工具类创建固定数量线程池 */
    private ExecutorService executorService = Executors.newFixedThreadPool(3);

    public void start() {
        // 提交一个 Runnable 任务
        executorService.submit(() -> {
            System.out.println("hello word");
        });
    }

◆ 4.1.2 固定数量线程池源码解读

代码语言:javascript复制
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

固定数量线程池的底层就是我们第 3 节讲解的 ThreadPoolExecutor 类,通过构造方法设置参数。特点:核心线程数和最大线程数相等,存活时间为 0,即始终活跃,阻塞队列使用的是 LinkedBlockingQueue。

◆ 4.1.3 阻塞队列 LinkedBlockingQueue

上一小节提到,固定数量线程池使用的是 LinkedBlockingQueue 作为阻塞队列,那么 LinkedBlockingQueue 队列有什么特点呢?为什么选择它作为阻塞队列呢?

  • 由链表结构组成的队列,队列中的元素按 FIFO(先进先出的原则对元素进行排序)
  • 排在队列头部的元素是时间最长的元素,排在队尾的元素是时间最短的元素
  • 链接队列的吞吐量通常要高于基于数组的队列

缺点:

  • 如果指定容量,则可以在一定程度上防止队列过度拓展,队满时无法插入。如果不指定容量,则使用 Integer.MAX_VALUE 作为默认容量。

由于设定了固定数量的线程,那么用户提交的任务很可能就超出了核心线程数,此时任务队列对插入和取出的要求就比较高,链表结构在插入和删除的效率较高,故选择此队列。

◆ 4.2 单线程线程池

◆ 4.2.1 创建单线程线程池

代码语言:javascript复制
    /** 使用 Executors 工具类创建 */
    private ExecutorService executorService = Executors.newSingleThreadExecutor();

    public void start() {
        // 提交一个 Runnable 任务
        executorService.submit(() -> {
            System.out.println("hello word");
        });
    }

◆ 4.2.2 单线程线程池源码

代码语言:javascript复制
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

单线程线程池指定核心线程和最大线程均为一,即从始至终线程池中只会存在一个线程,线程始终活跃,阻塞队列为 LinkedBlockingQueue

◆ 4.3 带缓存的线程池

◆ 4.3.1 创建带缓存的线程池

代码语言:javascript复制
    private ExecutorService executorService = Executors.newCachedThreadPool();

    public void start() {
        executorService.submit(() -> {
            System.out.println("hello word");
        });
    }

◆ 4.3.2 带缓存线程池源码

代码语言:javascript复制
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

带缓存的线程池,这里的“缓存”不是指的数据缓存,而是指按需创建线程,并设置了存活时间,在存活时间内线程还可以处理其他任务。我们可以看到,它设置的核心线程数是 0,最大线程数是 Integer.MAX_VALUE。也就是说,线程池创建时,不初始化存放线程,当用户提交任务时,只要任务数小于 Integer.MAX_VALUE,则直接创建线程执行。线程执行完成后并不会立即销毁,而会缓存存活 60 秒,在 60 秒内,如果还有用户任务提交,且任务数小于等于存活的线程数,则由存活的线程执行。如果大于存活线程数,且小于 Integer.MAX_VALUE,则创建 任务数 - 存活线程数 的差值个线程,进行处理。

◆ 4.3.3 阻塞队列 SynchronousQueue

我们发现,带缓存的线程池没有使用 LinkedBlockingQueue 阻塞队列,而是使用的 SynchronousQueue 队列。

特点:队列中的元素插入和移出必须是同时操作的,也就是说一个任务被取出的同时,也要有一个任务被插入。二者同时进行,是一个同步队列。

同步队列类似于 CSP 和 Ada 中使用的 rendezvous 信道。它非常适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。

支持公平和非公平,看源码。

代码语言:javascript复制
    /**
     * Creates a {@code SynchronousQueue} with the specified fairness policy.
     *
     * @param fair if true, waiting threads contend in FIFO order for
     *        access; otherwise the order is unspecified.
     */
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

当指定为公平队列时,会创建一个 FIFO 的有序队列,否则顺序是未指定的。

◆ 4.4 定时任务线程池

◆ 4.4.1 创建一个定时任务线程池

代码语言:javascript复制
    // 创建一个定时任务线程池, 并指定核心线程数
    private ExecutorService executorService = Executors.newScheduledThreadPool(10);

    public void start() {
        // 提交一个 Runnable 任务
        executorService.submit(() -> {
            System.out.println("hello word");
        });
    }

◆ 4.4.2 定时任务线程池源码

代码语言:javascript复制
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

底层使用的时 ScheduledThreadPoolExecutor, 我们追踪进去看一下,发现它是 super 调用父类的构造方法。

代码语言:javascript复制
    /**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given core pool size.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

其父类就是 ThreadPoolExecutor。创建的是一个指定核心线程数,最大线程数为 Integer.MAX_VALUE,阻塞队列为 DelayedWorkQueue 的线程池。

DelayedWorkQueue 基于堆的数据结构 类似于 DelayQueue 和 PriorityQueue,每个 ScheduledFutureTask 将其索引记录到 堆数组。这弥补了查找任务的损失的效率 ,大大加快删除速度(从 O(n) 到 O(log n))。

◆ 5. 线程池的 7 大参数详解

从上一节我们知道,JDK 自带的四类线程池都是根据配置 ThreadPoolExecutor 而得到的。不同的参数组合诞生不同线程池,这 7 大参数几乎是面试中的必考题,也是实际生产中必须要使用到的。掌握它,让你的线程池使用游刃有余。

代码语言:javascript复制
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

七大参数分别是:

  • 核心线程数
  • 最大线程数
  • 线程存活时间
  • 存活时间单位
  • 阻塞队列
  • 线程工厂
  • 拒绝策略

◆ 5.1 核心线程数

核心线程数指的是初始化时就需要创建的线程,核心线程始终活跃,不管有没有需要执行的任务,核心线程都不会销毁。可以理解为,随时待命!

◆ 5.2 最大线程数

顾名思义,线程池中最多允许存在多少个线程。当核心线程繁忙,队列队满的情况下,如果“最大线程数 - 核心线程数 > 0”,线程池则会新建线程执行任务。

◆ 5.3 线程存活时间

当线程数大于核心数时,这是多余的空闲线程(即存活于蓝色区域的线程)在终止前等待新任务的最长时间。和时间单位参数连用。

◆ 5.4 存活时间单位

和线程存活时间一起使用,指定的是一段时间。常用单位有:

  • TimeUnit.NANOSECONDS 纳秒
  • TimeUnit.MILLISECONDS 毫秒,1 秒 = 1000 毫秒
  • TimeUnit.SECONDS 秒
  • TimeUnit.MINUTES 分

举例:30,TimeUnit.SECONDS ==> 存活时间:30 秒

◆ 5.5 阻塞队列

线程池中的阻塞队列类型也挺多的,特性也不尽相同,这也提升了线程池的灵活及多样性。参数类型是 BlockingQueue,BlockingQueue 是一个接口,它的实现类都可以使用。

实现类有:ArrayBlockingQueue、DelayQueue、LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue。

它们之间的特性:

BlockingQueue 的四类方法:

这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。

◆ 5.6 线程工厂

线程工厂可以用户自定义,也可以使用默认的线程工程。线程工厂就是用来创建线程的。

使用默认的线程工厂: Executors.defaultThreadFactory()

源码如下:

代码语言:javascript复制
   static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            // 从安全管理器中拿到线程组
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            // 指定线程的名字
            namePrefix = "pool-"  
                          poolNumber.getAndIncrement()  
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix   threadNumber.getAndIncrement(),
                                  0);
            // 设置用户进程
            if (t.isDaemon())
                t.setDaemon(false);
            // 设置优先级
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

默认的线程工厂主要是设置了一些线程名称规则,用户线程,以及线程默认的优先级。

当然,你也可以自定义线程工厂,参照默认的线程工厂的实现就可以,这样你自己创建的线程的名称,优先级等等都是可以按照你自己的规范来。

◆ 5.7 拒绝策略(饱和策略)

任务的拒绝策略,也可以叫饱和策略,就是当阻塞队列队满时,剩下提交的任务的处理策略。

JDK 中提供了四种拒绝策略,默认使用的是饱和丢弃策略。

代码语言:javascript复制
    /**
     * 源码中默认使用的是 AbortPolicy 策略 
     * The default rejected execution handler
     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

饱和策略详解:

具体选择哪种策略,需要根据实际的业务场景来考量

◆ 6. Spring/Spring Boot 使用线程池

如果大家已经很熟悉能够使用线程池,则可以直接跳过本节。

◆ 6.1 Spring 使用线程池

◆ 6.1.1 创建 maven 工程,导入相关依赖

代码语言:javascript复制
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>4.3.12.RELEASE</version>
        </dependency>

◆ 6.1.2 创建并配置线程池

Spring 有两种方式,一种是使用配置类的形式,一种是在 bean.xml 中配置。我们演示使用配置类的。

创建一个线程池配置类,并配置好 7 大参数。

代码语言:javascript复制
/**
 * @author 九月长安
 * @version $Id: MyThreadPoolConfig.java, v 0.1 2021-08-03 18:41 九月长安 Exp $$
 */
@Configuration
public class MyThreadPoolConfig {

    // 指定注入的 bean 名称
    @Bean(name = "executorService")
    public ExecutorService getThreadPool() {
        return new ThreadPoolExecutor(2,
            20,
            30L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue(),
            Executors.defaultThreadFactory(),
            new AbortPolicy());
    }
}

◆ 6.1.3 使用线程池

代码语言:javascript复制
/**
 * @author 九月长安
 * @version $Id: UserService.java, v 0.1 2021-08-03 11:16 九月长安 Exp $$
 */
@Service
public class UserService {

    @Autowired
    private UserDao userDao;

    // 注入线程池
    @Autowired
    private ExecutorService executorService;

    public void getUserInfo() throws ExecutionException, InterruptedException {
        // 提交有返回值的任务
        Future<Person> future = executorService.submit(new Callable<Person>() {
            @Override
            public Person call() throws Exception {
                return userDao.getPerson();
            }
        });
        // 获取返回结果
        Person p = future.get();
        // 打印
        System.out.println(p);
    }

}

打印结果:

至此,您已掌握 Spring 线程池的基本使用。实际开发中,很多任务是可以异步执行的,这些任务使用线程池能够大大地提升速度。例如向用户推送消息,我们没必要去等待全部推送完再返回,我们只需要将执行结果记录一下,过段时间去查询一下执行情况即可。

◆ 6.2 Spring Boot 使用线程池

◆ 6.2.1 创建一个 Spring Boot 项目

大家可以使用 IDE 创建,也可以使用 Spring 官网提供的初始化向导 地址:https://start.spring.io/。

◆ 6.2.2 创建并配置线程池

Spring Boot 线程池创建与配置和 Spring 几乎一样。

代码语言:javascript复制
@Configuration
public class MyThreadPoolConfig {

    // 指定注入的 bean 名称
    @Bean(name = "executorService")
    public ExecutorService getThreadPool() {
        return new ThreadPoolExecutor(2,
            20,
            30L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue(),
            Executors.defaultThreadFactory(),
            new AbortPolicy());
    }
}

◆ 6.2.3 线程池的使用

Spring Boot 线程池使用和 Spring 没有太大的区别,一样是注入然后使用。可参考 6.1 节。详细操作方法可查阅 Java API,地址:

https://tool.oschina.net/apidocs/apidoc?api=jdk-zh

◆ 7. 根据设备 CPU 动态配置线程池

追求线程池配置的最佳合理参数,是大家共同的夙愿,我们先来看一看由于配置不合理导致出现问题的实际案例。

◆ 7.1 实际案例

案例 1:页面大量产生接口服务降级

原因:没有预估好调用的流量,导致最大核心数设置偏小,大量抛出 RejectedExecutionException,导致队满而抛出异常,从而产生降级。

案例 2:自身作为上游服务,执行时间过长,导致整体服务超时,影响下游服务大量调用失败

原因:阻塞队列设置过长,最大线程数设置太小,导致请求数量增加时,大量任务堆积在队列中,任务执行时间过长,最终导致调用超时失败。

7.2 追求最佳参数配置

那么有没有一个参数是最佳参数配置呢?这个还在不断地讨论和实践中,因为实际的服务器环境和业务要求复杂且多样,IO 密集型和 CPU 密集型的任务运行起来的情况差异非常大,但是追求完美依然是我们要做的。

美团技术团队针对以上方案,也没有得出一个最佳通用的配置,没有一个通用的公式可以解决这一问题。

◆ 7.3 较为常用的配比

其实我们最难确定的就是核心线程和最大线程的配比,那么有没有一些配比是较为常用的呢?其实是有的。

  • CPU 密集型:核心线程数 = CPU 核数 1
  • IO 密集型:核心线程数 = CPU 核数 * 2,最大线程数 = CPU 核数/(1- 阻塞系数),阻塞系数:0.8~0.9

例如:8 核,则 8/(1-0.9) = 80,及最大线程数为 80。

◆ 7.4 动态化线程池

线程池既然那么重要,而且参数不能最佳适配业务场景,那么能不能设计一个动态化的线程池?例如现在业务负载过大,动态的调整核心线程数,那么是不是就能完美的解决这一问题呢?我们来看一下美团技术团队的实践架构:

个人觉得已经是相当的不错,包含申请,动态调参,监控告警,让线程池始终处于最佳状态。想要设计自己的线程池架构的小伙伴,可以参考此架构设计。

◆ 8. 常见面试题精讲

此节希望大家学习完成后时常来温习,做到胸有成竹最好了。

创建线程有哪几种方式?

答:new Thread 类,实现 Runnable 接口,实现 Callable 接口,使用线程池。

使用线程池有什么好处?

答:资源合理分配,提高资源复用,提升执行效率,线程创建执行与任务提交解耦。

线程池 7 大参数有哪些?

答:核心线程数、最大线程数、存活时间、存活时间单位、阻塞队列、线程工厂、拒绝策略。

如果核心线程数满了,那么此时提交的任务怎么处理?

如果核心线程数满了,则将任务提交至阻塞队列等待执行,如果阻塞队列也满了,且最大线程数 - 核心线程数 > 0 则创建新的线程执行提交的任务。

线程池的拒绝策略有哪些?

  • AbortPolicy 丢弃任务并抛出异常。
  • DiscardPolicy 丢弃任务,不抛异常。
  • DiscardOldestPolicy 丢弃队列最前面的任务,然后重新提交被拒绝的任务。
  • CallerRunsPolicy 由调用线程执行该任务。例如:如果是主线程调用线程池,提交任务,则拒绝的任务由主线程执行。

如果让你来设计线程池你会怎样设计?

首先是根据业务场景,判断是 CPU 密集型还是 IO 密集型,不同的类型方案不一样,通常 IO 密集型设置的 CPU 核数较多。其次根据实际访问量,以及部署环境来设定参数。拒绝策略的话,需要看具体业务对任务不能执行的容忍程度。最好设置足够适合的队列长度、核心线程数、最大线程数,尽量避免触发拒绝策略。

来源:

https://www.toutiao.com/article/6995324544550437388/?log_from=b46a7156b27b3_1653959189348

“IT大咖说”欢迎广大技术人员投稿,投稿邮箱:aliang@itdks.com

来都来了,走啥走,留个言呗~

 IT大咖说  |  关于版权

由“IT大咖说(ID:itdakashuo)”原创的文章,转载时请注明作者、出处及微信公众号。投稿、约稿、转载请加微信:ITDKS10(备注:投稿),茉莉小姐姐会及时与您联系!

感谢您对IT大咖说的热心支持!

  • 相关推荐
  • 推荐文章
  • 基于GF的后台管理系统,完善的权限用户管理,致力于快速高效开发
  • Java 工程师相见恨晚的神兵利器和使用技巧
  • MySQL 故障诊断:MySQL 占用 CPU 过高问题定位及优化
  • 高可用架构之 Sentinel 的降级原理详解
  • .NET 6 从0到1使用Docker部署至Linux环境
  • 中高级程序员可能都不会使用spring-boot-starter-jdbc访问MySQL
  • 作为一名程序员,你还需要会画图
  • DPDK的基本原理、学习路线总结
  • 一种并行,背压的Kafka Consumer
  • DBA的福音|分享免费oracle性能监控调优工具

0 人点赞