Java使用Executor框架执行多线程任务,创建与操作系统线程一对一的映射线程,由操作系统分配CPU来执行。称为任务的两级调度模型,如下图所示:
代码语言:javascript复制public interface Executor {
void execute(Runnable command);
}
执行器Executor包含三个部分:
任务
- 无返回值任务,实现Runnable接口
- 带返回值任务,实现Callable接口
注:工具类Executors可以把一个Runnable对象封装为一个Callable对象
任务的执行
- ExecutorService
主要方法
void execute(Runnable runnable) 无返回结果执行
Future<T> submit(Callable<T> task) 带返回结果执行
shutdown()
shutdownNow()
- ScheduledExecutorService
主要方法
-无返回结果:执行任务,延迟时间、单位
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
-带返回结果:执行任务,延迟时间、单位
ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
-固定延迟开始:以上一次发起时间计算
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
-固定延迟开始:以上一次结束时间计算
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
注:相比于Timer,ScheduledExecutorService更灵活,功能更强大。
执行结果
Future
Executor的框架结构也是基于这三个方面实现,下面是各个接口的实现类和接口示意:
线程池—ThreadPoolExecutor
Java线程池应该是使用最多的并发框架,通过使用线程池可以减少系统因频繁的创建和销毁线程而带来的资源的浪费,降低资源消耗;执行的任务也可以直接从线程池获得线程执行,提高响应速度;线程创建过多也降低系统的稳定性,通过线程池也可以统一分配、监控,从而提高线程的可管理性。下面结合源码分析一下线程池原理:
主要属性
代码语言:javascript复制// 线程池控制状态ctl是包含两个概念字段的原子整数
// workerCount,表示有效的线程数
// runState,表示是否正在运行,关闭等
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 锁
private final ReentrantLock mainLock = new ReentrantLock();
// 工作线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
// 创建线程工厂
private volatile ThreadFactory threadFactory;
// 饱和策略
private volatile RejectedExecutionHandler handler;
// 空闲线程等待工作的超时时间
private volatile long keepAliveTime;
// 是否允许核心线程超时
private volatile boolean allowCoreThreadTimeOut;
// 核心线程数
private volatile int corePoolSize;
// 最大线程数
private volatile int maximumPoolSize;
线程池状态
RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务;
SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);
STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。
构造方法:主要就是设置核心线程数、最大线程数、阻塞队列、空闲线程存活时间、线程工厂、饱和策略
代码语言:javascript复制this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
执行方法
代码语言:javascript复制public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
- 执行任务时,先判断线程池中线程数是否小于核心线程数;
- 小于核心线程数,则将任务添加到工作线程中,并执行;
- 大于核心线程数,将任务添加到阻塞队列,判断阻塞队列是否已满,不满则添加;
- 如果阻塞队列满的话,判断线程池中的线程数是否小于最大线程数;
- 小于最大线程数,将任务添加到工作线程中,并执行;
- 大于最大线程数,使用包含策略处理任务。
Work类
代码语言:javascript复制private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// 执行任务的线程
final Thread thread;
// 执行的任务
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
线程池中的每个线程都被封装成一个Worker对象,线程池中调度执行的就是Worker对象——执行线程和执行任务。
线程池关闭
- shutdown方法
将线程池切换到SHUTDOWN状态,并且终止所以空闲的线程,最后尝试终止线程池。
- shutdownNow方法
将线程池状态切换到STOP状态,并且终止所有线程,取出阻塞队列中的所有未执行的任务,尝试终止线程池。
线程池监控
- getTaskCount:线程池已经执行的和未执行的任务总数;
- getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount;
- getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize;
- getPoolSize:线程池当前的线程数量;
- getActiveCount:当前线程池中正在执行任务的线程数量。
扩展方法
- beforeExecute方法:执行前
- afterExecute方法:执行后
- terminated方法:终止方法
调度线程池—ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor是支持周期性任务的调度的线程池执行器,与Timer相比更加强大;
ScheduledThreadPoolExecutor实现ScheduledExecutorService接口;
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor类,主要用于执行延迟执行任务或周期性执行任务。周期性执行在执行之后将重设任务和时间,进行下次执行。
ScheduledThreadPoolExecutor中的阻塞队列是内部实现的DelayedWorkQueue——无界可延迟优先级阻塞队列,基于最小堆算法实现;
FutureTask
FutureTask是异步的且能够获取返回值的可执行任务,实现了Future接口和Runnable接口;
Future接口提供获取返回值的get方法和可以取消任务的cancel方法;
FutureTask的构造可以接收Callable对象、Runnable对象和返回值对象两种形式(注:Executors.callable(runnable, result)可以将runnable转化成callable);
FutureTask通过CAS操作state状态值,来控制整个FutureTask的生命周期,state包括:NEW(新建)、COMPLETING(执行中)、NORMAL(正常结束)、EXCEPTIONAL(异常结束)、CANCELLED(取消)、INTERRUPTING(中断中)、INTERRUPTED(中断)。
FutureTask执行任务结束后,将设置返回值,并唤醒调用get方法的线程;当任务未执行结束时,调用get方法的线程将会阻塞并且装入到waiters(链表结构)等待队列;
FutureTask使用LockSupport的park方法实现waiters队列中的线程阻塞,LockSupport的unpark方法唤醒阻塞线程;
例子:
代码语言:javascript复制// 结算展示页面关于商品价格展示的逻辑:使用线程池和FutureTask来实现
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(3);
// 获取商品基础价格
FutureTask<Double> productPrice = new FutureTask<>(() -> {
// 调用商品系统接口
return 19.5;
});
// 获取商品促销
FutureTask<Double> productPromotion = new FutureTask<>(() -> {
// 调用促销系统接口
return 16.0;
});
// 获取商品运费
FutureTask<Double> productFreight = new FutureTask<>(() -> {
// 调用运费系统接口
return 6.0;
});
executor.execute(productPrice);
executor.execute(productPromotion);
executor.execute(productFreight);
// 结果的组装
System.out.println("商品原价:" productPrice.get());
System.out.println("商品促销价格:" productPromotion.get());
System.out.println("商品运费价格:" productFreight.get());
}
- 《Java并发编程艺术》
- http://www.ideabuffer.cn/categories/开发手册/J-U-C/