Java并发——线程池(八)

2024-05-15 23:57:28 浏览数 (2)

一、概述

什么是线程池

线程池(Thread Pool)是一种基于“池化”思想管理线程的工具,经常出现在多线程服务器中。通过创建一定数量的线程,让这些线程处于就绪状态来提高系统响应速度,在线程使用完成后归还到线程池来达到重复利用的目标,从而降低系统资源的消耗。

参考:

Java线程池实现原理及其在美团业务中的实践

合理使用线程池以及线程变量

为什么使用线程池

如果没有线程池的时候,每发布一个任务就需要创建一个新的线程,带来问题有:

  1. 反复创建线程系统开销比较大,每个线程创建和销毁都需要时间,如果任务比较简单,那么就有可能导致创建和销毁线程消耗的资源比线程执行任务本身消耗的资源还要大
  2. 过多的线程会占用过多的内存等资源,还会带来过多的上下文切换,同时还会导致系统不稳定

线程池优势:

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 统筹内存和CPU使用,避免资源使用不当。
  4. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

二、线程池核心设计

2.1 线程池的创建

线程池的实现类是ThreadPoolExecutor

ThreadPoolExecutor的构造函数如下:

代码语言:java复制
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

2.2 线程池核心参数

参数

含义

corePoolSize

核心线程数

maximumPoolSize

最大线程数

keepAliveTime 时间单位

空闲线程存活时间

ThreadFactory

线程工厂,为线程池创建新线程

workQueue

任务队列

handler

处理被拒绝的任务

corePoolSize参数

corePoolSize为线程池核心线数,线程池初始化默认线程数是0,当有新任务提交时,会创建新线程执行任务。默认情况下,核心线程会一直存活,但是当将allowCoreThreadTimeout设置为true时,核心线程超时也会回收。

maximumPoolSize参数

在构造函数中,maximumPoolSize为线程池所能容纳的最大线程数。

可以设置maximumPoolSize = corePoolSize

keepAliveTime参数

线程池中线程数>corePoolSize 且没有任务可做,则会检测线程的 keepAliveTime,如果超过规定的时间,无事可做的非线程就会被销毁,以便减少内存的占用和资源消耗。

如果将allowCoreThreadTimeout设置为true时,核心线程也会超时回收。

timeUnit参数

在构造函数中,timeUnit表示线程闲置超时时长的时间单位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。

workQueue参数

阻塞队列,线程池任务队列,常用的有:

详细见 Java并发——阻塞队列(八)

ArrayBlockingQueue :一个数组实现的有界阻塞队列,此队列按照FIFO的原则对元素进行排序,支持公平访问队列。

LinkedBlockingQueue :一个由链表结构组成的可选有界阻塞队列,如果不指定大小,则使用Integer.MAX_VALUE作为队列大小,按照FIFO的原则对元素进行排序。

PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列,默认情况下采用自然顺序排列,也可以指定Comparator。

DelayQueue:一个支持延时获取元素的无界阻塞队列,创建元素时可以指定多久以后才能从队列中获取当前元素,常用于缓存系统设计与定时任务调度等。

SynchronousQueue:一个不存储元素的阻塞队列。存入操作必须等待获取操作,反之亦然。

LinkedTransferQueue:一个由链表结构组成的无界阻塞队列,与LinkedBlockingQueue相比多了transfer和tryTranfer方法,该方法在有消费者等待接收元素时会立即将元素传递给消费者。

LinkedBlockingDeque:一个由链表结构组成的双端阻塞队列,可以从队列的两端插入和删除元素

ThreadFactory参数

threadFactory表示线程工厂。用于指定为线程池创建新线程的方式,threadFactory可以设置线程名称、线程组、优先级等参数。

Handler参数

任务拒绝策略,当达到最大线程数且队列任务已满时需要执行的拒绝策略

拒绝时机

首先,拒绝策略是在新建线程池的时候制定的,拒绝时机如下:

  • 第一种情况是当我们调用 shutdown 等方法关闭线程池后,即便此时可能线程池内部依然有没执行完的任务正在执行,但是由于线程池已经关闭,此时如果再向线程池内提交任务,就会遭到拒绝。
  • 第二种情况是线程池没有能力继续处理新提交的任务,也就是工作已经非常饱和的时候。也就是说线程池核心线程满了-> 然后任务队列满了->然后非核心线程也满了,这个时候再提交任务就会触发拒绝策略。
拒绝策略
  • AbortPolicy

拒绝任务时,会直接抛出一个类型为 RejectedExecutionException 的 RuntimeException,让你感知到任务被拒绝了,于是你便可以根据业务逻辑选择重试或者放弃提交等策略

  • DiscardPolicy

当新任务被提交后直接被丢弃掉,也不会给你任何的通知,相对而言存在一定的风险,因为我们提交的时候根本不知道这个任务会被丢弃,可能造成数据丢失

  • DiscardOldestPolicy

如果线程池没被关闭且没有能力执行,则会丢弃任务队列中的头结点,通常是存活时间最长的任务,这种策略与第二种不同之处在于它丢弃的不是最新提交的,而是队列中存活时间最长的,这样就可以腾出空间给新提交的任务,但同理它也存在一定的数据丢失风险

  • CallerRunsPolicy

当有新任务提交后,如果线程池没被关闭且没有能力执行,则把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务。好处是提交任务不会被丢弃且提交任务速度会被减缓(因为主线程也用于执行任务了),坏处是容易把主线程拖死。

2.3 线程池生命周期

RUNNING:;

SHUTDOWN:不再接受新任务,但要处理任务队列里的任务;

STOP:不再接受新任务,不再处理任务队列里的任务,中断正在进行中的任务;

TIDYING:表示线程池正在停止运作,中止所有任务,销毁所有工作线程,当线程池执行terminated()方法时进入TIDYING状态;

TERMINATED:表示线程池已停止运作,所有工作线程已被销毁,所有任务已被清空或执行完毕,terminated()方法执行完成;

运行状态

状态说明

RUNNING

运行状态,接受新任务,持续处理任务队列里的任务

SHUTDOWN

不再接受新任务,但要处理任务队列里的任务

STOP

不再接受新任务,不再处理任务队列里的任务,中断正在进行中的任务

TIDYING

表示线程池正在停止运作,中止所有任务,销毁所有工作线程,当线程池执行terminated()方法时进入TIDYING状态

TERMINATED

表示线程池已停止运作,所有工作线程已被销毁,所有任务已被清空或执行完毕,terminated()方法执行完成

2.4 线程池调度机制

从线程池调度机制可以看出来:

① 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加线程;当线程池里存活的核心线程数小于corePoolSize核心线程数参数的值时,线程池会创建一个核心线程去处理提交的任务

② 如果线程池核心线程数已满,即线程数已经等于corePoolSize,新提交的任务会被尝试放进任务队列workQueue中等待执行

③ 当线程池里面存活的线程数已经等于corePoolSize了,且任务队列workQueue填满时才创建多于 corePoolSize 的非核心线程,如果使用的是无界队列(例如 LinkedBlockingQueue),那么由于队列不会满,所以线程数不会超过 corePoolSize。

④通过设置 corePoolSize 和 maximumPoolSize 为相同的值,就可以创建固定大小的线程池。

⑤如果当前的线程数已达到了maximumPoolSize,还有新的任务提交过来时,执行拒绝策略进行处理

核心代码

代码语言:java复制
//ThreadPoolExecutor构造函数
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

//线程池执行
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

2.5 常用线程池

线程池可以使用Executors类来创建 (有弊端)

2.5.1 FixedThreadPool

核心线程数=最大线程数

任务队列无限长

代码语言:java复制
 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

      public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();**加粗**
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

2.5.2 CachedThreadPool

线程数是几乎可以无限增加的(实际最大可以达到 Integer.MAX_VALUE,为 2^31-1,这个数非常大,所以基本不可能达到),而当线程闲置时还可以对线程进行回收。也就是说该线程池的线程数量不是固定不变的,当然它也有一个用于存储提交任务的队列,但这个队列是 SynchronousQueue,队列的容量为0,实际不存储任何任务,它只负责对任务进行中转和传递,所以效率比较高。

代码语言:java复制
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

2.5.3 ScheduledThreadPool

支持定时或周期性执行任务

代码语言:java复制
 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
代码语言:java复制
 public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
代码语言:java复制
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

2.5.4 SingleThreadExecutor

使用唯一线程执行任务。由于只有一个线程,所以任务执行是顺序的

代码语言:java复制
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

2.5.5 SingleThreadScheduledExecutor

将 ScheduledThreadPool 的核心线程数设置为了 1

代码语言:java复制
  public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

###2.5.6 ForkJoinPool

三、线程池最佳实践

3.1 Executors类的弊端

1、FiexedThreadPool和SingleThreadPool:

任务队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM;

2、CachedThreadPool和ScheduledThreadPool:

允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM;

3、默认工厂局限性

Executors使用的默认线程工厂创建的线程都是非守护线程,且没有设置线程名称和优先级。这在某些应用场景中可能不是最佳选择

4、缺乏灵活性和透明度

使用Executors快捷方法创建的线程池隐藏了许多重要的配置细节,比如线程数量和任务队列类型,这降低了配置的灵活性和透明度

使用线程时,可以直接调用 ThreadPoolExecutor 的构造函数来创建线程池,并根据业务实际场景来设置corePoolSize、blockingQueue、RejectedExecuteHandler等参数。

3.2 如何正确创建线程池

使用ThreadPoolExecutor

代码语言:java复制
        // 核心线程数
        int corePoolSize = 5;
        // 最大线程数
        int maximumPoolSize = 10;
        // 当线程数大于核心线程数时,多余空闲线程的存活时间
        long keepAliveTime = 5000;
        // 时间单位,这里使用毫秒
        TimeUnit unit = TimeUnit.MILLISECONDS;
 
        // 任务队列,使用有界队列可以避免资源耗尽的问题
        ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);
 
        // 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                workQueue
        );

3.3 利用Spring创建线程池

spring 的ThreadPoolTaskExecutor 本质是对java.util.concurrent.ThreadPoolExecutor的包装

0 人点赞