CyclicBarrier和Condtion

2023-01-12 11:36:00 浏览数 (1)

深入浅出CyclicBarrier

CyclicBarrier的基本使用

基本概念

CyclicBarrier 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,让一组线程达到一个屏障(也可以叫同步点)时被阻塞,知道最后一个线程打到屏障时,屏障才会开门,所有被屏障拦截的线程才会继续工作。

数据导入案例

代码语言:javascript复制
public class Main extends Thread {
    @Override
    public void run() {
        System.out.println("开始执行数据汇总操作");
    }
	
    public static void main(String[] args) {
	    // parties: 3 必须要有 3 个线程参与进来才能继续
        CyclicBarrier cb = new CyclicBarrier(3, new Main());
        new CyclicBarrierDemo("/src/store/", cb).start();
        new CyclicBarrierDemo("/src/image/", cb).start();
        // 如果不满足 3 个则 cb 会一直阻塞直到第三个加入
        new CyclicBarrierDemo("/src/frame/", cb).start();
        // TODO 希望三个线程执行结束之后,再做一个汇总处理
    }
}
  
class CyclicBarrierDemo extends Thread {
    String path;  
    CyclicBarrier cyclicBarrier;  
	
    public CyclicBarrierDemo(String path, CyclicBarrier cyclicBarrier) {  
        this.path = path;  
        this.cyclicBarrier = cyclicBarrier;  
    }  
	
    @Override  
    public void run() {  
        System.out.println("开始导入数据: "   path);  
        try {  
            cyclicBarrier.await(); // 阻塞  
        } catch (InterruptedException | BrokenBarrierException e) {  
            throw new RuntimeException(e);  
        }  
    }  
}

得到如下的运行结果:

代码语言:javascript复制
开始导入数据: /src/frame/
开始导入数据: /src/store/
开始导入数据: /src/image/
开始执行数据汇总操作

使用场景

CyclicBarrier 会在前提任务完成后再向下执行,所以当需要所有的子线程完成任务再执行主线程时,就可以选择使用 CyclicBarrier。例如:需要成功获取到所有图片,再进行展示;需要成功加载所有文件,再进行文件内容分析等。CyclicBarrier 具有以下性质:

  1. 如果指定了 parties 但又没有足够的线程来调用 await() 那就会导致所有线程都被阻塞住
  2. await(timeout, unit) 为了避免出现所有线程阻塞可以对 await 设置一个超时等待时间
  3. 同样可以使用 reset() 来重置计数,但会抛出 BrokenBarrierException 的异常

CyclicBarrier的原理分析

原理图

await()方法

代码语言:javascript复制
private final Runnable barrierCommand; // 传入的第二个线程参数
private int count; // 对 parties 进行备份操作

private int dowait(boolean timed, long nanos)  
    throws InterruptedException, BrokenBarrierException,  
           TimeoutException {  
    final ReentrantLock lock = this.lock; // 获得一个重入锁 
    lock.lock();
    try {
	    // 构造一个周期
        final Generation g = generation;
		// 判断栅栏是否被打破
        if (g.broken)
            throw new BrokenBarrierException();
		// 判断线程是否被中断
        if (Thread.interrupted()) {
            breakBarrier(); // 将栅栏打破并抛出异常
            throw new InterruptedException();
        }
        int index = --count; // parties 副本进行递减
        if (index == 0) { // tripped
            Runnable command = barrierCommand;
            if (command != null) {  
                try {
	                // 执行构造时传递过来的第二个参数的run方法 线程对象
                    command.run();  
                } catch (Throwable ex) {  
                    breakBarrier();  
                    throw ex;  
                }  
            }
            // 进入下一个周期来唤醒所有线程 notifyAll()
            // 并恢复 generation 和 count
            nextGeneration();  
            return 0;  
        }  
		
        for (;;) {
            try {
                if (!timed)
	            // 不带有超时的阻塞
                    trip.await(); // condition 条件阻塞
                else if (nanos > 0L)
                // await 带有超时的阻塞
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
	            // 线程被中断且在同一个周期内还未被打破栅栏
                if (g == generation && ! g.broken) {
                    breakBarrier(); // 打破栅栏并返回异常
                    throw ie;
                } else {
                    Thread.currentThread().interrupt(); // 否则继续中断
                }
            }
			// broken 状态抛出异常
            if (g.broken)  
                throw new BrokenBarrierException();  
			// 不是同一个周期内返回周期
            if (g != generation)  
                return index;  
			// 超时抛出 toe 异常
            if (timed && nanos <= 0L) {
                breakBarrier(); // 打破栅栏
                throw new TimeoutException();  
            }  
        }  
    } finally {
        lock.unlock(); // 释放锁
    }  
}

public int await() throws InterruptedException, BrokenBarrierException {  
    try {  
        return dowait(false, 0L);  
    } catch (TimeoutException toe) {  
        throw new Error(toe); // cannot happen  
    }  
}

CyclicBarrier 需要两个参数,第一个参数是 parties 是参与循环屏障的线程个数,第二个参数是 barrierCommand 是参与被屏障的线程实例。CyclicBarrier 的作用就是若要执行 barrierCommand.run() 则必须先完成 parties 个线程的任务,主要实现了一种设定前提条件的作用。

深入浅出Condition

基本概念

Condition 是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒。

启动和唤醒案例

代码语言:javascript复制
public class ConditionDemo {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        ConditionDemoWait cdw = new ConditionDemoWait(lock, condition);
        ConditionDemoNotify cdn = new ConditionDemoNotify(lock, condition);
        new Thread(cdw).start();
        new Thread(cdn).start();
    }
}

record ConditionDemoWait(Lock lock, Condition condition) implements Runnable {
    @Override
    public void run() {
        System.out.println("begin - ConditionDemoWait");
        try {
            lock.lock();
            condition.await();
            System.out.println("end - ConditionDemoWait");
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }
}

record ConditionDemoNotify(Lock lock, Condition condition) implements Runnable {
    @Override
    public void run() {
        System.out.println("begin - ConditionDemoNotify");
        try {
            lock.lock();
            condition.signal(); // 条件释放
            System.out.println("end - ConditionDemoNotify");
        } finally {
            lock.unlock();
        }
    }
}

得到如下的运行结果:

代码语言:javascript复制
begin - ConditionDemoWait
begin - ConditionDemoNotify
end - ConditionDemoNotify
end - ConditionDemoWait

原理分析

await()方法

代码语言:javascript复制
// 条件类型节点
static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {
	// 指向下一个 Node
    ConditionNode nextWaiter;
	
    public final boolean isReleasable() {  
        return status <= 1 || Thread.currentThread().isInterrupted();  
    }  
	
    public final boolean block() {  
        while (!isReleasable()) LockSupport.park();  
        return true;  
    }  
}

// ConditionObject 是 AQS 的一个内部类实现了 Condition
public class ConditionObject implements Condition, java.io.Serializable {
	// Condition 的本质是一个单向链表只有 next 没有 prev
	private transient ConditionNode firstWaiter; 
	private transient ConditionNode lastWaiter;
	
	private int enableWait(ConditionNode node) {
	    if (isHeldExclusively()) {
	        node.waiter = Thread.currentThread();
	        node.setStatusRelaxed(COND | WAITING);
	        ConditionNode last = lastWaiter;
	        if (last == null)
	            firstWaiter = node;
	        else
	            last.nextWaiter = node;
	        lastWaiter = node;
	        int savedState = getState();
	        // 尝试释放锁 简介调用重入锁的 tryRelease()
	        if (release(savedState))
		        // 返回 state
	            return savedState;
	    }
	    node.status = CANCELLED;
	    throw new IllegalMonitorStateException();
	}
	// 阻塞使得后续的方法无法运行
	public final void await() throws InterruptedException {  
	    if (Thread.interrupted())  
	        throw new InterruptedException();
	    // 创建一个 Node 并把当前的线程放进去
	    ConditionNode node = new ConditionNode();
	    int savedState = enableWait(node);
	    LockSupport.setCurrentBlocker(this);
	    boolean interrupted = false, cancelled = false, rejected = false;
	    // 判断节点是否在 AQS 队列上
	    while (!canReacquire(node)) {
	        if (interrupted |= Thread.interrupted()) {
	            if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
	                break;
	        } else if ((node.status & COND) != 0) {
	            try {
	                if (rejected)
	                    node.block();
	                else
	                    ForkJoinPool.managedBlock(node);
	            } catch (RejectedExecutionException ex) {
	                rejected = true;
	            } catch (InterruptedException ie) {
	                interrupted = true;
	            }
	        } else
	            Thread.onSpinWait()
	    }
	    LockSupport.setCurrentBlocker(null);
	    node.clearStatus();
	    acquire(node, savedState, false, false, false, 0L);
	    if (interrupted) {
	        if (cancelled) {
	            unlinkCancelledWaiters(node);
	            throw new InterruptedException();
	        }
	        Thread.currentThread().interrupt();
	    }
	}
}

signal()方法

代码语言:javascript复制
public class ConditionObject implements Condition, java.io.Serializable {
	public final void signal() {  
	    ConditionNode first = firstWaiter;  
	    if (!isHeldExclusively())  
	        throw new IllegalMonitorStateException();  
	    if (first != null)
		    // 调用 doSignal()
	        doSignal(first, false);  
	}
	
	private void doSignal(ConditionNode first, boolean all) {  
		while (first != null) {  
			ConditionNode next = first.nextWaiter;
			// 说明 condition 节点已经没有了 下一个节点需要被释放掉
			if ((firstWaiter = next) == null)  
				lastWaiter = null;
			// CAS 替换
			if ((first.getAndUnsetStatus(COND) & COND) != 0) {
				// 转移 AQS 队列
				enqueue(first);  
				if (!all)  
					break;  
			}  
			first = next;  
		}  
	}
}

流程图

Condition 实质上就是两个队列不断地挂起、转移,且必须是基于 AQS 通信管道进行通信的。

0 人点赞