JAVA并发编程系列(9)CyclicBarrier循环屏障原理分析

2024-09-20 15:53:41 浏览数 (1)

拼多多2面,还是模拟拼团,要求用户拼团成功后,提交订单支付金额。

之前我们在系列(8)《CountDownLatch核心原理》,实现过拼团场景。但是CountDownLatch里调用countDown()方法后,线程还是可以继续执行后面的代码,没有真正的阻塞。

1、面试真题:完善模拟拼团

这里我们应用循环屏障CyclicBarrier,可以控制一组线程到达屏障点后,再全部继续执行,而且这个屏障可以重复利用的特性来实现这个场景。

现在我们模拟2人拼团成功的场景,每满2人就允许提交订单支付,且后台发送消息给仓库发货。

代码语言:txt复制
package lading.java.mutithread;

import cn.hutool.core.date.DateTime;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
/**
 * 模拟拼团,并通知仓库发货
 */
public class Demo010CyclicBarrier {
    public static int count = 2;//要求达到屏障数量
    public static volatile ConcurrentHashMap<String, String> customerNames = new ConcurrentHashMap<>();//记录已到达屏障客户线程名
    public static CyclicBarrier barrier = new CyclicBarrier(count, new sendMsg());//屏障数量2,目标线程数量达到后,执行sendMsg
    public static void main(String[] args) {
        //模拟5个人拼团
        for (int i = 0; i < 5; i  ) {
            new Thread(() -> {
                try {
                    Thread.sleep((new Random().nextInt(10 - 1   1)) * 1000);//客户浏览商品信息Ns
                    System.out.println(DateTime.now().toString("YYYY-MM-dd hh:mm:ss SSS")  " 【"  Thread.currentThread().getName()   "】,到达屏障");
                    customerNames.put(Thread.currentThread().getName(), Thread.currentThread().getName());//本批次拼团名单
                    barrier.await();//到达屏障,进入阻塞
                    System.out.println(DateTime.now().toString("YYYY-MM-dd hh:mm:ss SSS")  " 【"  Thread.currentThread().getName()   "】,完成支付。");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } catch (BrokenBarrierException e) {
                    throw new RuntimeException(e);
                }
            }, "客户00"   (i   1)).start();
        }
    }
    static class sendMsg extends Thread {
        @Override
        public void run() {
            System.out.println(DateTime.now().toString("YYYY-MM-dd hh:mm:ss")   customerNames.keySet()   "达到屏障拼团人数,允许提交订单!");
            customerNames.clear();//本批次拼团名单清空
        }
    }
}

客户2,3到达后,任务线程就发送信息给参考发货,同时客户2、3可以支持订单。当客户1到达后,就阻塞等待客户4拼团才继续执行。最后客户5,因为没有成团,一直阻塞。

2、说说CyclicBarrier的核心原理

CyclicBarrier,顾名思义就是循环屏障。支持一组多个线程相互等待,线程调用了屏障的await()方法后,原地阻塞等待。当本组线程最后一个到达屏障后,本组其他线程全部被唤醒,继续执行屏障await()方法后面代码。

由于这个屏障在释放完本组等待线程后,可以重复使用,等待下一组线程过来阻塞排队,因此称为:循环屏障。

3、具体说说CyclicBarrier怎么使用

循环屏障也是很简单,核心方法就几个。首先第一个

1.CyclicBarrier(int parties, Runnable barrierAction)

就是实例化一个循环屏障,parties就是本组线程目标数量。barrierAction就有意思了,这个是可选参数。如果本组线程都到达屏障后,就先执行这个Runnable barrierAction,阻塞等待的线程才能继续执行。可以从模拟拼团实例运行结果看到:线程2、3到达屏障后,先执行sendMsg的方法,线程2、3才可以开始支付。

2.await()

线程调用这个方法后,表示已经到达屏障,该线程阻塞进入休眠状态,等本组其他线程都到达屏障点,才会被唤醒继续执行后面的代码。还有两个入参可选,await(long timeout, TimeUnit unit) 如果超出指定的等待时间,则抛出TimeoutException异常。

核心常用就这2个方法了,没别的!

4、CyclicBarrier源码分析

首先,看一下CyclicBarrier的成员变量,里面int 有parties、和count。这两个变量成功支持了屏障变成循环屏障。其中parties表示屏障阈值,count表示当前还差多少个线程到达屏障,每来一个线程调用await(),本组线程的屏障count就减1.count为0时候,就唤醒本组线程继续执行。

代码语言:txt复制
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition trip = lock.newCondition();
    private final int parties;
    private final Runnable barrierCommand;
    private Generation generation = new Generation();
    private int count;

其次,看一下实例化循环屏障对象代码,重点将本组循环等待线程数量parties赋值给parties、count,以及设置屏障任务。

代码语言:txt复制
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    //不指定屏障任务
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

然后,重点看一下await()阻塞等待的方法,里面调用了dowait()方法。

源码很长,简单总结:dowait方法就是更新count-1,表示本组又报道了一个线程,还没到的名额少了一个。如果count为0,那说明自己是本组最后来屏障集合的线程,负责唤醒大家,以及执行屏障的barrierCommand任务(如果有的话)。

源码细的说:除了count-1判断是否全部到齐,如果是0,包括如何唤醒其他线程。不是0,如何陷入阻塞等待。

具体就是:

1、如果count不是0,就把自己加入到AQS的条件队列里,等待信号唤醒。

2、如果count是0,说明本线程是最后一个到达的,咱不用进入阻塞,先执行屏障的barrierCommand,然后去唤醒本组的其他线程兄弟继续执行。并重置count值为parties阈值,方便下一组线程使用,达成屏障可循环使用的目的。

其他就是在AQS里如何阻塞等待、以及唤醒其他线程具体逻辑,源码有点复杂,等后续我们出源码分析专栏,就画图分析讲解。

代码语言:txt复制
 private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        //加锁,去更新count,这里就不是CAS了
        lock.lock();
        try {
            final Generation g = generation;
            //判断屏障是否被中断
            if (g.broken)
                throw new BrokenBarrierException();
            //判断本线程是否已中断
            if (Thread.interrupted()) {
                //本组线程的屏障中断,并重置屏障
                breakBarrier();
                throw new InterruptedException();
            }
            // 对count减1
            int index = --count;
            // 本组屏障全部线程都到达屏障,接下来执行屏障任务、以及唤醒本组其他阻塞兄弟
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    //实例CyclicBarrier(),如果有指定任务,本线程就代劳去执行
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
            // 如果不为0,说明约定到本屏障的其他兄弟们还没到齐,那就自旋等待,直到被打断或者超时
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } 
      ........
    }

今天就分享这么多,明天我们继续分享并发编程里的Condition条件接口。

0 人点赞