并发编程之线程池及队列
问题:
- 线程池作用,主要实现类,并说出实现类场景以及区别。
- ThreadPoolExecutor使用场景。以及原理。
- Executor拒绝策略说的是什么?
- 无界阻塞延迟队列delayqueue原理是什么?
- CyclicBarrier和CountDownLatch的区别?
线程池作用,主要实现类,并说出实现类场景以及区别
作用
- 减少在创建和销毁线程上所花的时间以及系统资源的开销。
- 如果不使用线程池,可能造成系统创建大量线程。
线程池种类
newCachedThreadPool
用newCachedThreadPool()方法创建该线程池对象,创建之初里面一个线程都没有,当execute方法或submit方法向线程池提交任务时,会自动新建线程;如果线程池中有空余线程,则不会新建;这种线程池一般最多情况可以容纳几万个线程,里面的线程空余60s会被回收。
适用场景:执行很多短期异步的小程序。
newFixedThreadPool
固定线程数的池子,每个线程的存活时间是无限的,当池子满了就不再添加线程;若池中线程均在繁忙状态,新任务会进入阻塞队列中(无界的阻塞队列)。
适用场景:执行长期的任务,性能较好。
newSingleThreadExecutor
只有一个线程的线程池,且线程的存活时间是无限的;当线程繁忙时,对于新任务会进入阻塞队列中(无界的阻塞队列)。
适用:一个任务一个任务执行的场景。
NewScheduledThreadPool
创建一个固定大小的线程池,池内的线程存活时间无限,线程池支持定时及周期性的任务执行。如果所有线程均处于繁忙状态,对于新任务会进入DelayedWorkQueue
队列。
适用场景:周期性执行任务的场景。
线程池任务执行流程:
- 当线程池小于
corePoolSize
时,新任务将创建一个新的线程,即使此时线程池种存在空闲线程。 - 当线程池达到
corePoolSize
时,新提交的任务将被放入workQueue
中,等待线程池任务调度执行。 - 当
workQueue
已满,且maximumPoolSize
>corePoolSize
时,新任务会创建新线程执行任务。 - 当提交任务数超过
maximumPoolSize
时,新提交任务由RejectedExecutionHandler
处理。 - 当线程池中超过
corePoolSize
时,空闲时间达到keepAliveTime
时,关闭空闲线程。 - 当设置了
allowCoreThreadTimeOut(true)
时,线程池中corePoolSize
线程空闲时间达到keepAliveTime
也将关闭。
ThreadPoolExecutor使用场景。以及原理
ThreadPoolExecutor类实现了ExecutorService接口和Executor接口。
ThreadPoolExecutor
参数:
参数 | 含义 |
---|---|
corePoolSize | 核心线程池大小 |
maximumPoolSize | 最大线程池大小 |
keepAliveTime | 线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间 |
TimeUnit | keepAliveTime时间单位 |
workQueue | 阻塞任务队列 |
threadFactory | 新建线程工厂 |
RejectedExecutionHandler | 当提交任务数超过maxmumPoolSize workQueue之和时,任务会交给RejectedExecutionHandler来处理 |
- 当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
- 当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
- 当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
- 当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
- 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
- 当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
1.jpg
Executor拒绝策略说的是什么?
线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。
ThreadPoolExecutor.AbortPolicy
抛出java.util.concurrent.RejectedExecutionException异常。
ThreadPoolExecutor.CallerRunsPolicy
用于被拒绝任务的处理程序,它直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务。
ThreadPoolExecutor.DiscardOldestPolicy
丢弃任务队列中最旧任务。
ThreadPoolExecutor.DiscardPolicy
丢弃当前将要加入队列的任务。
无界阻塞延迟队列delayqueue原理是什么?
DelayQueue
是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue
来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
适用场景
缓存系统的设计:使用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,就表示有缓存到期了。
定时任务调度:使用DelayQueue保存当天要执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如Timer就是使用DelayQueue实现的。
实现思路
以支持优先级的PriorityQueue无界队列作为一个容器,因为元素都必须实现Delayed接口,可以根据元素的过期时间来对元素进行排列,因此,先过期的元素会在队首,每次从队列里取出来都是最先要过期的元素。如果延迟队列中的消息到了延迟时间则可以从中取出消息否则无法取出消息也就无法消费。
CyclicBarrier和CountDownLatch的区别
CountDownLatch | CyclicBarrier |
---|---|
减计数方式 | 加计数方式 |
计算为0时释放所有等待的线程 | 计数达到指定值时释放所有等待线程 |
计数为0时,无法重置 | 计数达到指定值时,计数置为0重新开始 |
调用countDown()方法计数减一,调用await()方法只进行阻塞,对计数没任何影响 | 调用await()方法计数加1,若加1后的值不等于构造方法的值,则线程阻塞 |
不可重复利用 | 可重复利用 |
CyclicBarrier
代码语言:javascript复制class Runner implements Runnable {
private CyclicBarrier barrier;
private String name;
public Runner(CyclicBarrier barrier, String name) {
super();
this.barrier = barrier;
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(1000 * (new Random()).nextInt(8));
System.out.println(name " 准备OK.");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name " Go!!");
}
}
public class Race {
public static void main(String[] args) throws IOException, InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(3);
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(new Thread(new Runner(barrier, "zhangsan")));
executor.submit(new Thread(new Runner(barrier, "lisi")));
executor.submit(new Thread(new Runner(barrier, "wangwu")));
executor.shutdown();
}
}
2.png
CyclicBarrier就是一个栅栏,等待所有线程到达后再执行相关的操作。barrier 在释放等待线程后可以重用。
CountDownLatch
代码语言:javascript复制public class TestCountDownLatch {
private static final int N = 10;
public static void main(String[] args) throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
CountDownLatch startSignal = new CountDownLatch(1);//开始执行信号
for (int i = 1; i <= N; i ) {
new Thread(new Worker(i, doneSignal, startSignal)).start();//线程启动了
}
System.out.println("begin------------");
startSignal.countDown();//开始执行啦
doneSignal.await();//等待所有的线程执行完毕
System.out.println("Ok");
}
static class Worker implements Runnable {
private final CountDownLatch doneSignal;
private final CountDownLatch startSignal;
private int beginIndex;
Worker(int beginIndex, CountDownLatch doneSignal,
CountDownLatch startSignal) {
this.startSignal = startSignal;
this.beginIndex = beginIndex;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await(); //等待开始执行信号的发布
beginIndex = (beginIndex - 1) * 10 1;
for (int i = beginIndex; i <= beginIndex 10; i ) {
System.out.println(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
doneSignal.countDown();
}
}
}
}
CountDownLatch 是计数器, 线程完成一个就记一个, 就像 报数一样, 只不过是递减的。
而CyclicBarrier更像一个水闸, 线程执行就像水流, 在水闸处都会堵住, 等到水满(线程到齐)了, 才开始泄流。