它是一个ExecutorService,使用线程池中的线程执行提交的任务。通常我们使用Executors框架,定义使用。
一、线程池主要用来解决两类问题:
1、通过缓存一定数量的可用线程,避免频繁的线程创建,销毁,来提升执行大量异步任务的性能。
2、资源管理,包括线程及任务。同时也维护一些诸如完成任务数等统计数据。
为了能够获得更广泛的适用性,此类提供了多个可调整的参数及扩展性钩子,但是,开发者通常更倾向于使用更为便捷的Executors框架,通过它预定义了各种类型线程池,如newCachedThreadPool(缓存类型的无界线程池),newFixedThreadPool(固定数量的线程池),newSingleThreadExecutor(单个线程的线程池)。
二、核心及最大线程池数量
ThreadPoolExecutor 会根据核心及最大线程数设定自动的调整线程池内线程的数量。
三、线程池何时创建新的线程:
当一个任务提交后,如果当前线程池内的可运行的线程(无论是否处于闲置状态)数量小于核心数量,那么就会创建新的线程;如果当前线程池内的线程数量处于核心数和最大数之间,则,只有在队列(排队等待处理的任务队列)满的时候会创建新的线程。
四、固定容量及无界线程池:
如果把核心数和最大数设置为相同的值,那么线程池就会变为固定容量的线程池;可以通过把最大数设置为一个极大的值,如Integer.MAX_VALUE,来创建一个可以自适应高并发任务的场景的线程池。核心数和最大值通常在线程池初始定义的时候通过构造参数传入,也可以通过相应的方法动态的进行变更配置。
五、自定义构造
默认情况下,线程池中的线程会在初始化时进行创建,然后在任务到达时运行。我们也可以通过方法prestartCoreThread或者prestartAllCoreThreads来预先运行线程。
六、新线程创建:
线程是通过ThreadFactory来创建的,如果没有明确定义,则会使用Executors默认的defaultThreadFactory来创建,先创建的线程会归在同一个分组,并且拥有相同的NORM_PRIORITY优先级别及非守护特性。我们可以通过定义相应的ThreadFactory来,自定义线程的名称,群组,优先级及守护特性。如果ThreadFactory的方法newThread()创建线程失败,返回null,那么Executor会继续运行,但是无法在处理新添加的任务。
七、生存时间:
如果线程池当前拥有的线程数量多于核心数,那么多于的线程会在空闲生存时间后被终止销毁。减少非活跃线程池的资源耗费,当需求增多时,再重新创建需要的线程放入线程池以供使用。生存时间可以通过getKeepAliveTime() 方法进行动态设置,如果设置为LONG.MAX_VALUE,那么闲置线程将不会被回收。通常keep-alive只适用于超过核心数的线程,我们也可以通过设置allowCoreThreadTimeOut(boolean)来应用到所有的线程。keepAliveTime不为0时起作用。
八、队列:
可以使用任意形式的阻塞队列来存放提交到线程池的任务。队列和线程池大小相关。
如果线程数小于核心数,那么线程池会优先创建新的线程,而不进行任务入队。
如果等于或者超过核心数,那么线程池会优先将任务入队,而不创建新的线程。
如果等于或者超过核心数且任务队列已满,在不超过最大数情况下,会进行新线程的创建,超过最大线程数,则会执行相应的丢弃策略。
三种排队策略:
直接传递:通常使用SynchronousQueue队列,直接将任务转交给线程,而不进行任务的存储。如果没有线程接收任务,那么任务入队会失败,所以此种情况,会直接创建新的线程。这种策略,对于处理任务间有依赖关系的情况,避免了队列任务遍历。直接传递策略需要相应的无界线程池策略支持,以避免任务被丢弃,拒绝。者反过来也可能引发当处理速度小于任务到达速度时,线程池的无限增长。
无界队列:通常使用LinkedBlockingQueue,不预定义容量。因为队列无界,所以,核心线程都忙时,不会创建新的线程,可能导致任务的无限堆积,无界队列情况下,线程池最大值配置将不起作用。无界队列情况适用于任务间没有联系,各自肚独自运行的场景。例如,对于web系统,可以用来处理突发请求平滑处理。
有界队列:例如ArrayBlockingQueue,可以避免使用无界线程池时的的资源耗尽。但是,却不利于对请求处理的协调和控制。队列大小和线程池最大值参数之间会进行平衡处理。如:大容量队列和小容量线程池的使用可以最小化cpu使用,系统资源耗费及线程间上下文切换带来的负担。但是会造成吞吐量的降低,也无法充分使用系统的调度能力;相应的使用小队列则需要较大的线程池容量,可能造成cpu繁忙,进而造成系统无法调度现象,也会造成系统吞吐量的降低。
九、任务丢弃:
Executor关闭时,提交的任务会被丢弃;使用有界队列及有界线程池时,并且都已达到边界值时,提交的任务会被丢弃。任务丢弃会通过 RejectedExecutionHandler执行相应的丢弃策略。
默认AbortPolicy:抛出运行时异常
CallerRunsPolicy:使用提交任务的线程执行任务,这种策略提供了一种反馈机制,减缓了新任务的提交速度。
DiscardPolicy:直接丢弃任务。
DiscardOldestPolicy:丢弃任务队列头部的任务。
也可以自定义相应的丢弃策略,特别需要注意针对特定线程池容量及队列容量情况下的策略。
ThreadPoolExecutor提供了相应的beforeExecute及afterExecute方法用于在任务执行的前后执行相应的操作,如,调整任务执行的环境,重新初始化ThreadLocals,收集统计信息,添加日志等。另外,可以重写terminated方法,以在Executor完全关闭后做相应的处理。
如果钩子或者回调函数抛出异常,那么内部工作线程也会失败。
getQueue通常用于调试,其它情况不建议使用。
remove和purge可以用于大量任务取消情况下的任务队列处理。
当线程池不再被应用系统使用,并且,内部的也没有线程的时候,会被自动关闭。如果存在被闲置线程池重新声明的情况,那么需要确保对内部线程设置了合适的生存时间,及使用0核心数,设置线程可超时策略等。
如下示例:通过继承ThreadPoolExecutor实现一个可暂停,继续特性的线程池:
代码语言:javascript复制class PausableThreadPoolExecutor extends ThreadPoolExecutor {
代码语言:javascript复制 private boolean isPaused;
代码语言:javascript复制 private ReentrantLock pauseLock = new ReentrantLock();
代码语言:javascript复制 private Condition unpaused = pauseLock.newCondition();
代码语言:javascript复制
代码语言:javascript复制 public PausableThreadPoolExecutor(...) { super(...); }
代码语言:javascript复制
代码语言:javascript复制 protected void beforeExecute(Thread t, Runnable r) {
代码语言:javascript复制 super.beforeExecute(t, r);
代码语言:javascript复制 pauseLock.lock();
代码语言:javascript复制 try {
代码语言:javascript复制 while (isPaused) unpaused.await();
代码语言:javascript复制 } catch (InterruptedException ie) {
代码语言:javascript复制 t.interrupt();
代码语言:javascript复制 } finally {
代码语言:javascript复制 pauseLock.unlock();
代码语言:javascript复制 }
代码语言:javascript复制 }
代码语言:javascript复制
代码语言:javascript复制 public void pause() {
代码语言:javascript复制 pauseLock.lock();
代码语言:javascript复制 try {
代码语言:javascript复制 isPaused = true;
代码语言:javascript复制 } finally {
代码语言:javascript复制 pauseLock.unlock();
代码语言:javascript复制 }
代码语言:javascript复制 }
代码语言:javascript复制
代码语言:javascript复制 public void resume() {
代码语言:javascript复制 pauseLock.lock();
代码语言:javascript复制 try {
代码语言:javascript复制 isPaused = false;
代码语言:javascript复制 unpaused.signalAll();
代码语言:javascript复制 } finally {
代码语言:javascript复制 pauseLock.unlock();
代码语言:javascript复制 }
代码语言:javascript复制 }
代码语言:javascript复制 }}