深入浅出CyclicBarrier
CyclicBarrier的基本使用
基本概念
CyclicBarrier 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,让一组线程达到一个屏障(也可以叫同步点)时被阻塞,知道最后一个线程打到屏障时,屏障才会开门,所有被屏障拦截的线程才会继续工作。
数据导入案例
代码语言:javascript复制public class Main extends Thread {
@Override
public void run() {
System.out.println("开始执行数据汇总操作");
}
public static void main(String[] args) {
// parties: 3 必须要有 3 个线程参与进来才能继续
CyclicBarrier cb = new CyclicBarrier(3, new Main());
new CyclicBarrierDemo("/src/store/", cb).start();
new CyclicBarrierDemo("/src/image/", cb).start();
// 如果不满足 3 个则 cb 会一直阻塞直到第三个加入
new CyclicBarrierDemo("/src/frame/", cb).start();
// TODO 希望三个线程执行结束之后,再做一个汇总处理
}
}
class CyclicBarrierDemo extends Thread {
String path;
CyclicBarrier cyclicBarrier;
public CyclicBarrierDemo(String path, CyclicBarrier cyclicBarrier) {
this.path = path;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("开始导入数据: " path);
try {
cyclicBarrier.await(); // 阻塞
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}
得到如下的运行结果:
代码语言:javascript复制开始导入数据: /src/frame/
开始导入数据: /src/store/
开始导入数据: /src/image/
开始执行数据汇总操作
使用场景
CyclicBarrier 会在前提任务完成后再向下执行,所以当需要所有的子线程完成任务再执行主线程时,就可以选择使用 CyclicBarrier。例如:需要成功获取到所有图片,再进行展示;需要成功加载所有文件,再进行文件内容分析等。CyclicBarrier 具有以下性质:
- 如果指定了 parties 但又没有足够的线程来调用
await()
那就会导致所有线程都被阻塞住 await(timeout, unit)
为了避免出现所有线程阻塞可以对 await 设置一个超时等待时间- 同样可以使用
reset()
来重置计数,但会抛出BrokenBarrierException
的异常
CyclicBarrier的原理分析
原理图
await()方法
代码语言:javascript复制private final Runnable barrierCommand; // 传入的第二个线程参数
private int count; // 对 parties 进行备份操作
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock; // 获得一个重入锁
lock.lock();
try {
// 构造一个周期
final Generation g = generation;
// 判断栅栏是否被打破
if (g.broken)
throw new BrokenBarrierException();
// 判断线程是否被中断
if (Thread.interrupted()) {
breakBarrier(); // 将栅栏打破并抛出异常
throw new InterruptedException();
}
int index = --count; // parties 副本进行递减
if (index == 0) { // tripped
Runnable command = barrierCommand;
if (command != null) {
try {
// 执行构造时传递过来的第二个参数的run方法 线程对象
command.run();
} catch (Throwable ex) {
breakBarrier();
throw ex;
}
}
// 进入下一个周期来唤醒所有线程 notifyAll()
// 并恢复 generation 和 count
nextGeneration();
return 0;
}
for (;;) {
try {
if (!timed)
// 不带有超时的阻塞
trip.await(); // condition 条件阻塞
else if (nanos > 0L)
// await 带有超时的阻塞
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 线程被中断且在同一个周期内还未被打破栅栏
if (g == generation && ! g.broken) {
breakBarrier(); // 打破栅栏并返回异常
throw ie;
} else {
Thread.currentThread().interrupt(); // 否则继续中断
}
}
// broken 状态抛出异常
if (g.broken)
throw new BrokenBarrierException();
// 不是同一个周期内返回周期
if (g != generation)
return index;
// 超时抛出 toe 异常
if (timed && nanos <= 0L) {
breakBarrier(); // 打破栅栏
throw new TimeoutException();
}
}
} finally {
lock.unlock(); // 释放锁
}
}
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
CyclicBarrier 需要两个参数,第一个参数是 parties
是参与循环屏障的线程个数,第二个参数是 barrierCommand
是参与被屏障的线程实例。CyclicBarrier 的作用就是若要执行 barrierCommand.run()
则必须先完成 parties
个线程的任务,主要实现了一种设定前提条件的作用。
深入浅出Condition
基本概念
Condition 是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒。
启动和唤醒案例
代码语言:javascript复制public class ConditionDemo {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
ConditionDemoWait cdw = new ConditionDemoWait(lock, condition);
ConditionDemoNotify cdn = new ConditionDemoNotify(lock, condition);
new Thread(cdw).start();
new Thread(cdn).start();
}
}
record ConditionDemoWait(Lock lock, Condition condition) implements Runnable {
@Override
public void run() {
System.out.println("begin - ConditionDemoWait");
try {
lock.lock();
condition.await();
System.out.println("end - ConditionDemoWait");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
record ConditionDemoNotify(Lock lock, Condition condition) implements Runnable {
@Override
public void run() {
System.out.println("begin - ConditionDemoNotify");
try {
lock.lock();
condition.signal(); // 条件释放
System.out.println("end - ConditionDemoNotify");
} finally {
lock.unlock();
}
}
}
得到如下的运行结果:
代码语言:javascript复制begin - ConditionDemoWait
begin - ConditionDemoNotify
end - ConditionDemoNotify
end - ConditionDemoWait
原理分析
await()方法
代码语言:javascript复制// 条件类型节点
static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {
// 指向下一个 Node
ConditionNode nextWaiter;
public final boolean isReleasable() {
return status <= 1 || Thread.currentThread().isInterrupted();
}
public final boolean block() {
while (!isReleasable()) LockSupport.park();
return true;
}
}
// ConditionObject 是 AQS 的一个内部类实现了 Condition
public class ConditionObject implements Condition, java.io.Serializable {
// Condition 的本质是一个单向链表只有 next 没有 prev
private transient ConditionNode firstWaiter;
private transient ConditionNode lastWaiter;
private int enableWait(ConditionNode node) {
if (isHeldExclusively()) {
node.waiter = Thread.currentThread();
node.setStatusRelaxed(COND | WAITING);
ConditionNode last = lastWaiter;
if (last == null)
firstWaiter = node;
else
last.nextWaiter = node;
lastWaiter = node;
int savedState = getState();
// 尝试释放锁 简介调用重入锁的 tryRelease()
if (release(savedState))
// 返回 state
return savedState;
}
node.status = CANCELLED;
throw new IllegalMonitorStateException();
}
// 阻塞使得后续的方法无法运行
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 创建一个 Node 并把当前的线程放进去
ConditionNode node = new ConditionNode();
int savedState = enableWait(node);
LockSupport.setCurrentBlocker(this);
boolean interrupted = false, cancelled = false, rejected = false;
// 判断节点是否在 AQS 队列上
while (!canReacquire(node)) {
if (interrupted |= Thread.interrupted()) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break;
} else if ((node.status & COND) != 0) {
try {
if (rejected)
node.block();
else
ForkJoinPool.managedBlock(node);
} catch (RejectedExecutionException ex) {
rejected = true;
} catch (InterruptedException ie) {
interrupted = true;
}
} else
Thread.onSpinWait()
}
LockSupport.setCurrentBlocker(null);
node.clearStatus();
acquire(node, savedState, false, false, false, 0L);
if (interrupted) {
if (cancelled) {
unlinkCancelledWaiters(node);
throw new InterruptedException();
}
Thread.currentThread().interrupt();
}
}
}
signal()方法
代码语言:javascript复制public class ConditionObject implements Condition, java.io.Serializable {
public final void signal() {
ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
if (first != null)
// 调用 doSignal()
doSignal(first, false);
}
private void doSignal(ConditionNode first, boolean all) {
while (first != null) {
ConditionNode next = first.nextWaiter;
// 说明 condition 节点已经没有了 下一个节点需要被释放掉
if ((firstWaiter = next) == null)
lastWaiter = null;
// CAS 替换
if ((first.getAndUnsetStatus(COND) & COND) != 0) {
// 转移 AQS 队列
enqueue(first);
if (!all)
break;
}
first = next;
}
}
}
流程图
Condition 实质上就是两个队列不断地挂起、转移,且必须是基于 AQS 通信管道进行通信的。