【八股文Java】图解Java线程池实现原理(ThreadPoolExecutor)

2023-10-30 17:12:18 浏览数 (1)

简介

弄清楚 ThreadPoolExecutor 的原理之后,线程池的面试题都很简单。

1、线程池的参数

2、线程的创建时机及执行流程

3、线程池的状态及如何优雅关闭线程池

为了实现优雅关闭线程池:

(1)必须拒绝接收新任务但是必须保证队列中的任务也有机会被执行。

调用方法:

代码语言:javascript复制
public void shutdown()

(2)等待某段时间,让任务执行尽量完成。

调用方法:

代码语言:javascript复制
public boolean awaitTermination(long timeout, TimeUnit unit)

(3)等待某段时间后,可以判断线程池中的任务是否已经全部完成,可以根据业务特性,是否强制关闭(调用shutdownNow方法),或者重试几次尝试关闭。

看一下dubbo (版本3.0.16-SNAPSHOT)的线程池优雅关闭实现:

代码语言:javascript复制
org.apache.dubbo.common.utils.ExecutorUtil#gracefulShutdown

4、线程池有哪些坑或注意点

(1)OOM问题。

线程池大小容量不受限或队列容量不受限可能导致OOM。

线程池中使用ThreadLocal,没有清理ThreadLocal值。

Java避坑指南:确保ThreadLocal变量在线程池及跨服务使用时,重新初始化及清理 崔认知,公众号:认知科技技术团队Java避坑指南:确保ThreadLocal变量在线程池及跨服务使用时,重新初始化及清理

(2)异常丢失问题及线程由于异常退出问题。任务执行过程中发生异常可能会丢失,还可能使线程池中的线程由于异常导致退出。

Java避坑指南:ThreadPoolExecutor提交任务出现异常,异常是否吞掉,线程是否退出的不同影响 崔认知,公众号:认知科技技术团队Java避坑指南:ThreadPoolExecutor提交任务出现异常,异常是否吞掉,线程是否退出的不同影响

解决方法:

(1)显示捕获异常处理。任务的整个执行使用try、catch来捕获异常处理。

(2)使用future模式,异常封装到future对象中,使用future的get方法捕获取异常。

(3)相互依赖的任务共享线程池导致死锁。

Java避坑指南:不要在池大小有限的线程池中,执行有相互依赖的任务,防止"线程饥饿锁"导致故障 崔认知,公众号:认知科技技术团队Java避坑指南:不要在池大小有限的线程池中,执行有相互依赖的任务,防止"线程饥饿锁"导致故障

(4)线程池中使用ThreadLocal没重新设值,导致隐式传参数据混乱。

(5)ThreadLocal在线程池中,由于跨线程,导致ThreadLocal隐式传参失效。

Java并发:线程封闭手段ThreadLocal实现线程安全的使用场景及避坑场景 崔认知,公众号:认知科技技术团队Java并发:线程封闭手段ThreadLocal实现线程安全的使用场景及避坑场景

5、如何监控线程池

  • 线程池活跃度告警:活跃度 = (activeCount / maximumPoolSize) * 100

比如 threshold 阈值配置 80,表示活跃度达到 80% 时触发告警。

  • 队列容量告警:容量使用率 = (queueSize / queueCapacity) * 100

比如 threshold 阈值配置 80,表示容量使用率达到 80% 时触发告警。

  • 拒绝策略告警:可以重写RejectedExecutionHandler,或动态代理RejectedExecutionHandler,加入报警逻辑。
  • 任务排队超时告警:重写beforeExecute,记录开始执行时间,对比任务提交时间。
  • 任务执行超时告警:重写 ThreadPoolExecutor 的 beforeExecute() 和 afterExecute() 方法,记录任务执行时间。

参考:https://dynamictp.cn/guide/notice/alarm.html#告警类型

代码语言:javascript复制
代码语言:javascript复制

附dubbo优雅关闭线程池源码:

代码语言:javascript复制
 public static void gracefulShutdown(Executor executor, int timeout) {
        if (!(executor instanceof ExecutorService) || isTerminated(executor)) {
            return;
        }
        final ExecutorService es = (ExecutorService) executor;
        try {
            // Disable new tasks from being submitted
            es.shutdown();
        } catch (SecurityException | NullPointerException ex2) {
            return;
        }
        try {
            // Wait a while for existing tasks to terminate
            if (!es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
                es.shutdownNow();
            }
        } catch (InterruptedException ex) {
            es.shutdownNow();
            Thread.currentThread().interrupt();
        }
        if (!isTerminated(es)) {
            newThreadToCloseExecutor(es);
        }
    }
代码语言:javascript复制
 private static void newThreadToCloseExecutor(final ExecutorService es) {
        if (!isTerminated(es)) {
            SHUTDOWN_EXECUTOR.execute(() -> {
                try {
                    for (int i = 0; i < 1000; i  ) {
                        es.shutdownNow();
                        if (es.awaitTermination(10, TimeUnit.MILLISECONDS)) {
                            break;
                        }
                    }
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                } catch (Throwable e) {
                    logger.warn(e.getMessage(), e);
                }
            });
        }
    }
代码语言:javascript复制
 public static boolean isTerminated(Executor executor) {
        if (executor instanceof ExecutorService) {
            if (((ExecutorService) executor).isTerminated()) {
                return true;
            }
        }
        return false;
    }

0 人点赞