ThreadPoolExecutor 使用说明

2020-09-11 14:56:37 浏览数 (1)

它是一个ExecutorService,使用线程池中的线程执行提交的任务。通常我们使用Executors框架,定义使用。

一、线程池主要用来解决两类问题:

1、通过缓存一定数量的可用线程,避免频繁的线程创建,销毁,来提升执行大量异步任务的性能。

2、资源管理,包括线程及任务。同时也维护一些诸如完成任务数等统计数据。

为了能够获得更广泛的适用性,此类提供了多个可调整的参数及扩展性钩子,但是,开发者通常更倾向于使用更为便捷的Executors框架,通过它预定义了各种类型线程池,如newCachedThreadPool(缓存类型的无界线程池),newFixedThreadPool(固定数量的线程池),newSingleThreadExecutor(单个线程的线程池)。

二、核心及最大线程池数量

ThreadPoolExecutor 会根据核心及最大线程数设定自动的调整线程池内线程的数量。

三、线程池何时创建新的线程:

当一个任务提交后,如果当前线程池内的可运行的线程(无论是否处于闲置状态)数量小于核心数量,那么就会创建新的线程;如果当前线程池内的线程数量处于核心数和最大数之间,则,只有在队列(排队等待处理的任务队列)满的时候会创建新的线程。

四、固定容量及无界线程池:

如果把核心数和最大数设置为相同的值,那么线程池就会变为固定容量的线程池;可以通过把最大数设置为一个极大的值,如Integer.MAX_VALUE,来创建一个可以自适应高并发任务的场景的线程池。核心数和最大值通常在线程池初始定义的时候通过构造参数传入,也可以通过相应的方法动态的进行变更配置。

五、自定义构造

默认情况下,线程池中的线程会在初始化时进行创建,然后在任务到达时运行。我们也可以通过方法prestartCoreThread或者prestartAllCoreThreads来预先运行线程。

六、新线程创建:

线程是通过ThreadFactory来创建的,如果没有明确定义,则会使用Executors默认的defaultThreadFactory来创建,先创建的线程会归在同一个分组,并且拥有相同的NORM_PRIORITY优先级别及非守护特性。我们可以通过定义相应的ThreadFactory来,自定义线程的名称,群组,优先级及守护特性。如果ThreadFactory的方法newThread()创建线程失败,返回null,那么Executor会继续运行,但是无法在处理新添加的任务。

七、生存时间:

如果线程池当前拥有的线程数量多于核心数,那么多于的线程会在空闲生存时间后被终止销毁。减少非活跃线程池的资源耗费,当需求增多时,再重新创建需要的线程放入线程池以供使用。生存时间可以通过getKeepAliveTime() 方法进行动态设置,如果设置为LONG.MAX_VALUE,那么闲置线程将不会被回收。通常keep-alive只适用于超过核心数的线程,我们也可以通过设置allowCoreThreadTimeOut(boolean)来应用到所有的线程。keepAliveTime不为0时起作用。

八、队列:

可以使用任意形式的阻塞队列来存放提交到线程池的任务。队列和线程池大小相关。

如果线程数小于核心数,那么线程池会优先创建新的线程,而不进行任务入队。

如果等于或者超过核心数,那么线程池会优先将任务入队,而不创建新的线程。

如果等于或者超过核心数且任务队列已满,在不超过最大数情况下,会进行新线程的创建,超过最大线程数,则会执行相应的丢弃策略。

三种排队策略:

直接传递:通常使用SynchronousQueue队列,直接将任务转交给线程,而不进行任务的存储。如果没有线程接收任务,那么任务入队会失败,所以此种情况,会直接创建新的线程。这种策略,对于处理任务间有依赖关系的情况,避免了队列任务遍历。直接传递策略需要相应的无界线程池策略支持,以避免任务被丢弃,拒绝。者反过来也可能引发当处理速度小于任务到达速度时,线程池的无限增长。

无界队列:通常使用LinkedBlockingQueue,不预定义容量。因为队列无界,所以,核心线程都忙时,不会创建新的线程,可能导致任务的无限堆积,无界队列情况下,线程池最大值配置将不起作用。无界队列情况适用于任务间没有联系,各自肚独自运行的场景。例如,对于web系统,可以用来处理突发请求平滑处理。

有界队列:例如ArrayBlockingQueue,可以避免使用无界线程池时的的资源耗尽。但是,却不利于对请求处理的协调和控制。队列大小和线程池最大值参数之间会进行平衡处理。如:大容量队列和小容量线程池的使用可以最小化cpu使用,系统资源耗费及线程间上下文切换带来的负担。但是会造成吞吐量的降低,也无法充分使用系统的调度能力;相应的使用小队列则需要较大的线程池容量,可能造成cpu繁忙,进而造成系统无法调度现象,也会造成系统吞吐量的降低。

九、任务丢弃:

Executor关闭时,提交的任务会被丢弃;使用有界队列及有界线程池时,并且都已达到边界值时,提交的任务会被丢弃。任务丢弃会通过 RejectedExecutionHandler执行相应的丢弃策略。

默认AbortPolicy:抛出运行时异常

CallerRunsPolicy:使用提交任务的线程执行任务,这种策略提供了一种反馈机制,减缓了新任务的提交速度。

DiscardPolicy:直接丢弃任务。

DiscardOldestPolicy:丢弃任务队列头部的任务。

也可以自定义相应的丢弃策略,特别需要注意针对特定线程池容量及队列容量情况下的策略。

ThreadPoolExecutor提供了相应的beforeExecute及afterExecute方法用于在任务执行的前后执行相应的操作,如,调整任务执行的环境,重新初始化ThreadLocals,收集统计信息,添加日志等。另外,可以重写terminated方法,以在Executor完全关闭后做相应的处理。

如果钩子或者回调函数抛出异常,那么内部工作线程也会失败。

getQueue通常用于调试,其它情况不建议使用。

remove和purge可以用于大量任务取消情况下的任务队列处理。

当线程池不再被应用系统使用,并且,内部的也没有线程的时候,会被自动关闭。如果存在被闲置线程池重新声明的情况,那么需要确保对内部线程设置了合适的生存时间,及使用0核心数,设置线程可超时策略等。

如下示例:通过继承ThreadPoolExecutor实现一个可暂停,继续特性的线程池:

代码语言:javascript复制
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
代码语言:javascript复制
        private boolean isPaused;
代码语言:javascript复制
        private ReentrantLock pauseLock = new ReentrantLock();
代码语言:javascript复制
        private Condition unpaused = pauseLock.newCondition();
代码语言:javascript复制
代码语言:javascript复制
        public PausableThreadPoolExecutor(...) { super(...); }
代码语言:javascript复制
代码语言:javascript复制
        protected void beforeExecute(Thread t, Runnable r) {
代码语言:javascript复制
            super.beforeExecute(t, r);
代码语言:javascript复制
            pauseLock.lock();
代码语言:javascript复制
            try {
代码语言:javascript复制
                while (isPaused) unpaused.await();
代码语言:javascript复制
            } catch (InterruptedException ie) {
代码语言:javascript复制
                t.interrupt();
代码语言:javascript复制
            } finally {
代码语言:javascript复制
                pauseLock.unlock();
代码语言:javascript复制
            }
代码语言:javascript复制
        }
代码语言:javascript复制
代码语言:javascript复制
        public void pause() {
代码语言:javascript复制
            pauseLock.lock();
代码语言:javascript复制
            try {
代码语言:javascript复制
                isPaused = true;
代码语言:javascript复制
            } finally {
代码语言:javascript复制
                pauseLock.unlock();
代码语言:javascript复制
            }
代码语言:javascript复制
        }
代码语言:javascript复制
代码语言:javascript复制
        public void resume() {
代码语言:javascript复制
            pauseLock.lock();
代码语言:javascript复制
            try {
代码语言:javascript复制
                isPaused = false;
代码语言:javascript复制
                unpaused.signalAll();
代码语言:javascript复制
            } finally {
代码语言:javascript复制
                pauseLock.unlock();
代码语言:javascript复制
            }
代码语言:javascript复制
        }
代码语言:javascript复制
    }}

0 人点赞