1、学习切入点
Executors 框架是整个JUC 包中类/接口关系中最为复杂的框架,真正理解Executors框架的前提是理清楚各个模块之间的关系,高屋建瓴,从整体到局部才能透彻理解各个模块的功能和背后设计的思路!
本文将从 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory和Callable这些核心模块展开分析。
2、从Executor谈起
Executor 是JDK 1.5 时,随着JUC引入的一个接口,引入该接口的主要目的是 解耦任务本身和任务执行。我们之前通过线程执行一个任务时,往往需要先创建一个线程.satrt()去执行任务,
而Executor 接口解耦了任务和任务的执行,该接口只有一个方法,入参为待执行的任务。
代码语言:javascript复制public interface Executor {
/**
* 执行给定的Runable任务
* 根据Executor接口的实现类不同,具体执行方式也不同
*/
void execute(Runnable command);
}
我们可以像下面这样执行任务,而不必心线程的创建
代码语言:javascript复制Executor executor = anExecutor
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
由于Executor 仅仅是一个接口,所有根据其实现不同,执行具体的任务也不同,参看源码注释,我们可以知道比如:
2.1、同步执行任务
代码语言:javascript复制class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}}
2.2、异步执行任务
代码语言:javascript复制class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}}
2.3、对任务进行排队执行
代码语言:javascript复制 class SerialExecutor implements Executor {
final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
final Executor executor;
Runnable active;
SerialExecutor(Executor executor) {
this.executor = executor;
}
public synchronized void execute(final Runnable r) {
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}}
以上这些源码给出的示例,仅仅是给出了一些可能的 Executor实现,JUC包中提供了很多 Executor的具体实现类,这里关键是理解Executor的设计思想——对任务和任务的执行解耦。
3、增强的ExecutorService
因为Executor 接口的功能很简单,为了对它进行增强,JUC又提供了一个名为 ExecutorService 接口,那它到达增强了哪些东西呢?
可以看到,ExecutorService 在Executor 的基础上增加了对任务的控制,同时也包括对自己生命周期的管理,主要有四类:
1、关闭执行器,禁止任务提交
2、监视执行器状态
3、提供对异步任务支持
4、提供对处理任务的支持
4、周期任务调度——ScheduledExecutorService
在开发环境中,我们可能需要提交给执行器某些任务能够定时执行或者周期性执行,我们可以自己去实现Executor 接口来创建符合我们需要的类,Doug Lea 已经考虑好了这类需求,所有在ExecutorService 的基础之上,又提供了一个接口 ScheduledExecutorService。
案例演示
代码语言:javascript复制public class ScheduleExecutorTest {
public static void main(String[] args) {
ScheduledExecutorService scheduler=Executors.newScheduledThreadPool(4);
//开始执行时间1秒后,每隔1秒执行一次任务
final ScheduledFuture<?>scheduledFuture=scheduler.scheduleAtFixedRate(new BeepTask(),1,1,TimeUnit.SECONDS);
//5秒后,取消任务,关闭线程池
scheduler.schedule(()->{
scheduledFuture.cancel(true);
System.out.println("over");
scheduler.shutdownNow();
},5,TimeUnit.SECONDS);
}
private static class BeepTask implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName() "beep!");
}
}
}
注意:scheduleAtFixedRate 方法返回一个ScheduledFuture 对象,ScheduledFuture其实就是在Future 的基础上增加了延迟功能。通过ScheduledFuture 可以取消一个任务的执行。
ScheduledExecutorService 完整接口说明如下:
代码语言:javascript复制public interface ScheduledExecutorService extends ExecutorService {
/**
* 提交一个待执行的任务, 并在给定的延迟后执行该任务.
*
* @param command 待执行的任务
* @param delay 延迟时间
* @param unit 延迟时间的单位
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
/**
* 提交一个待执行的任务(具有返回值), 并在给定的延迟后执行该任务
*
* @param callable 待执行的任务
* @param delay 延迟时间
* @param unit 延迟时间的单位
* @param <V> 返回值类型
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
/**
* 提交一个待执行的任务
* 该任务在 initialDelay 后开始执行, 然后在initialDelay period 后执行, 接
* 着在 initialDelay 2 * period 后执行, 依此类推
*
* @param command 待执行的任务
* @param initialDelay 首次执行的延迟时间
* @param period 连续执行之间的周期
* @param unit 延迟时间的单位
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
/**
* 提交一个待执行的任务
* 该任务在 initialDelay 后开始执行, 随后在每一次执行终止和下一次执行开始之间
* 都存在给定的延迟
* 如果任务的任一执行遇到异常, 就会取消后续执行. 否则, 只能通过执行程序的取消或
* 终止方法来终止该任务
*
* @param command 待执行的任务
* @param initialDelay 首次执行的延迟时间
* @param delay 一次执行终止和下一次执行开始之间的延迟
* @param unit 延迟时间的单位
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
至此,Executors 框架中的三个最核心的接口介绍完毕,三个核心接口关系如下
5、生产Executor的工厂
通过前面的介绍,读者应该对Executors框架有了一个初步的认识,Executors 框架就是用来解耦任务本身与任务的执行,并提供了三个核心(Executor、ExecutorService、ScheduledExecutorService)接口来满足使用者的需求!
1、Executor:提交普通的可执行任务 2、ExecutorService:提供对线程池生命周期的管理、异步任务的支持 3、ScheduledExecutorService:提供对任务周期性执行的支持
既然有了接口,那么就有相应的实现类,JUC 提供了许多默认的接口实现,用户如果自己去创建这些类的实例,就需要了解这些类的细节,我们可不可以仅仅根据一些参数就能创建这些实例呢?
因此Executors类就是专门用于创建上述接口的实现类对象,Executors其实就是一个简单工厂,它的所有方法都是static的,Executors 一共提供了五类可供创建的Executors执行器实例。
1、固定线程数的线程池
代码语言:javascript复制 /**
* 创建一个具有固定线程数的Executor
*
* @param nThreads 核心线程数(银行有五个窗口,平时开三个窗口(核心线程数))
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* 创建一个具有固定线程数的Executor.
* 在需要时使用提供的 ThreadFactory 创建新线程.
*
* @param nThreads 核心线程数
* @param threadFactory 创建线程的工厂
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public interface ThreadFactory {
//创建了一个线程
Thread newThread(Runnable r);
}
ThreadFactory 作为一个线程工厂,我们可以由外部指定ThreadFactory实例,以决定线程具体的创建方式。
下面就以 DefaultThreadFactory为例
代码语言:javascript复制//默认的线程工厂
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);//原子性
private final ThreadGroup group;//线程组
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-"
poolNumber.getAndIncrement()
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
可以看到,DefaultThreadFactory 初始化的时候定义了线程组、线程名称等信息,每创建一个线程,都给线程统一分配了这些信息,避免了手动new的方式创建线程,又可以进行工厂复用。
2、单个线程的线程池
代码语言:javascript复制 /**
* 创建一个使用单个 worker 线程的 Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/**
* 创建一个使用单个 worker 线程的 Executor
* 在需要时使用提供的 ThreadFactory 创建新线程
*
* @param threadFactory 线程工厂
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
//对返回的Executor实例进行一个包装
static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}
我们可以看到,只有单个线程上文线程池其实就是指定好了线程数为1的固定线程池,主要区别就是返回的Executor实例用了一个 FinalizableDelegatedExecutorService对象 进行包装,它核心继承了 DelegatedExecutorService ,这是个包装类,实现了 ExecutorService的所有方法,但是内部实现其实都委托给了传入的 ExecutorService 实例。
为什么要多此一举呢?
因为返回的 ThreadPoolExecutor 包含一些设置线程池大小的方法,对于只有单个线程的线程池来说,我们是不希望用户通过强转的方式使用这些方法的,所以需要这么一个包装类,只暴露ExecutorService 本身的方法。
3、可缓存的线程池
代码语言:javascript复制/**
* 创建一个可缓存线程的Execotor
* 如果线程池中没有线程可用, 则创建一个新线程并添加到池中
* 如果有线程长时间未被使用(默认60s,可通过threadFactory配置), 则从缓存中移除
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
/**
* 创建一个可缓存线程的Execotor.
* 如果线程池中没有线程可用, 则创建一个新线程并添加到池中;
* 如果有线程长时间未被使用(默认60s, 可通过threadFactory配置), 则从缓存中移除.
* 在需要时使用提供的 ThreadFactory 创建新线程.
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
可以看到,返回的还是ThreadPoolExecutor对象,只是指定了超时时间,另外线程池中线程的数量在[0, Integer.MAX_VALUE]
之间。
4、可延时/周期性调度的线程池
代码语言:javascript复制/**
* 创建一个具有固定线程数的 可调度Executor.
* 它可安排任务在指定延迟后或周期性地执行.
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
/**
* 创建一个具有固定线程数的 可调度Executor.
* 它可安排任务在指定延迟后或周期性地执行.
* 在需要时使用提供的 ThreadFactory 创建新线程.
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
5、Fork/Join线程池
代码语言:javascript复制/**
* 创建具有指定并行级别的ForkJoin线程池.
*/
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
/**
* 创建并行级别等于CPU核心数的ForkJoin线程池.
*/
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
3、总结
至此,Executors框架就基本介绍完了,我们看下他的类图