BlockingQueue

2022-06-28 18:41:10 浏览数 (2)

BlockingQueue 介绍

BlockingQueue 是一个先进先出的队列(Queue), 并且当获取队列元素但是队列为空时,会阻塞等待队列中有元素再返回;也支持添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入。

BlockingQueue 对插入、删除、获取元素在不同场景下提供了不同的操作:

抛异常

返回特殊值(成功或失败)

阻塞等待

阻塞等待直至超时

插入

add(e)

offer(e)

put(e)

offer(e, time, unit)

删除

remove()

poll()

take()

poll(time, unit)

获取

element()

peek()

我们重点关注 puttake 这两个阻塞操作, BlockingQueue 主要是于消费者-生产者场景的一个线程安全容器

ArrayBlockingQueue

  • ArrayBlockingQueue 是 BlockingQueue 的一个有界队列实现,底层采取数组
  • 并发控制采取可重入锁, 插入和读取操作都需要获取锁
  • 如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程
  • 如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程
主要属性
代码语言:javascript复制
// 用于存放元素的数组
final Object[] items;
// 下一次读取操作的位置
int takeIndex;
// 下一次写入操作的位置
int putIndex;
// 队列中的元素数量
int count;

// 以下几个就是控制并发用的同步器
final ReentrantLock lock;
// 队列为空的条件队列
private final Condition notEmpty;
// 队列满的条件队列
private final Condition notFull;
阻塞操作 take & put

put 操作流程:

代码语言:javascript复制
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 插入前先获取锁
    lock.lockInterruptibly();
    try {
        // 如果队列满了, 写操作线程进入条件队列等待
        while (count == items.length)
            notFull.await();
        // 元素入队
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
// 入队操作
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    // 入队
    items[putIndex] = x;
    // 循环使用 index
    if (  putIndex == items.length)
        putIndex = 0;
    count  ;
    // 队列已经有数据了,唤醒等待队消费者
    notEmpty.signal();
}

take 操作流程:

代码语言:javascript复制
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 如果队列为空, 读操作线程挂起等待
        while (count == 0)
            notEmpty.await();
        // 出队操作
        return dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (  takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    // 唤醒队列满时等待的写线程
    notFull.signal();
    return x;
}

LinkedBlockingQueue

  • LinkedBlockingQueue 底层基于单向链表,可以作为无界队列也可作为有界队列
  • 如果要获取(take)一个元素,需要获取 takeLock 锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)
  • 如果要插入(put)一个元素,需要获取 putLock 锁,但是获取了锁还不够,如果队列此时已满,还需要队列不是满的(notFull)这个条件(Condition)
主要属性

构造方法:

代码语言:javascript复制
// 无界队列构造方法
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

// 有界队列构造方法
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

类属性:

代码语言:javascript复制
// 队列容量
private final int capacity;

// 队列中的元素数量
private final AtomicInteger count = new AtomicInteger(0);

// 队头
private transient Node<E> head;

// 队尾
private transient Node<E> last;

// take, poll, peek 等读操作的方法需要获取到这个锁
private final ReentrantLock takeLock = new ReentrantLock();

// 如果读操作的时候队列是空的,那么等待 notEmpty 条件
private final Condition notEmpty = takeLock.newCondition();

// put, offer 等写操作的方法需要获取到这个锁
private final ReentrantLock putLock = new ReentrantLock();

// 如果写操作的时候队列是满的,那么等待 notFull 条件
private final Condition notFull = putLock.newCondition();
阻塞操作 put & take

put 操作流程:

代码语言:javascript复制
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // 标识成功、失败的标志
    int c = -1;
    Node<E> node = new Node(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 必须要获取到 putLock 才可以进行插入操作
    putLock.lockInterruptibly();
    try {
        // 如果队列满,等待 notFull 的条件满足。
        while (count.get() == capacity) {
            notFull.await();
        }
        // 入队
        enqueue(node);
        // count 原子加 1,c 还是加 1 前的值
        c = count.getAndIncrement();
        // 如果这个元素入队后,还有至少一个槽可以使用,调用 notFull.signal() 唤醒等待线程。
        if (c   1 < capacity)
            notFull.signal();
    } finally {
        // 入队后,释放掉 putLock
        putLock.unlock();
    }
    // 如果 c == 0,那么代表队列在这个元素入队前是空的(不包括head空节点),
    // 那么所有的读线程都在等待 notEmpty 这个条件,等待唤醒,这里做一次唤醒操作
    if (c == 0)
        signalNotEmpty();
}

// 入队的代码非常简单,就是将 last 属性指向这个新元素,并且让原队尾的 next 指向这个元素
// 这里入队没有并发问题,因为只有获取到 putLock 独占锁以后,才可以进行此操作
private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

// 元素入队后,如果需要,调用这个方法唤醒读线程来读
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

take 操作流程:

代码语言:javascript复制
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 首先,需要获取到 takeLock 才能进行出队操作
    takeLock.lockInterruptibly();
    try {
        // 如果队列为空,等待 notEmpty 这个条件满足再继续执行
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 出队
        x = dequeue();
        // count 进行原子减 1
        c = count.getAndDecrement();
        // 如果这次出队后,队列中至少还有一个元素,那么调用 notEmpty.signal() 唤醒其他的读线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 出队后释放掉 takeLock
        takeLock.unlock();
    }
    // 如果 c == capacity,那么说明在这个 take 方法发生的时候,队列是满的
    // 既然出队了一个,那么意味着队列不满了,唤醒写线程去写
    if (c == capacity)
        signalNotFull();
    return x;
}
// 取队头,出队
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    // 之前说了,头结点是空的
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    // 设置这个为新的头结点
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
// 元素出队后,如果需要,调用这个方法唤醒写线程来写
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

SynchronousQueue

  • 同步队列: 当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要等待另一个线程来将这个元素拿走;同理,当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作
  • SynchronousQueue 不提供任何空间来存储元素。数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费
  • 没有 peek 方法(直接返回null)
构造方法
代码语言:javascript复制
// 构造时,我们可以指定公平模式还是非公平模式
// queue 先入先出, stack 先入后出
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue() : new TransferStack();
}

// TransferQueue 和 TransferStack 都实现了抽象类 Transfer
abstract static class Transferer {
    // 从方法名上大概就知道,这个方法用于转移元素,从生产者手上转到消费者手上
    // 也可以被动地,消费者调用这个方法来从生产者手上取元素
    // 第一个参数 e 如果不是 null,代表场景为:将元素从生产者转移给消费者
    // 如果是 null,代表消费者等待生产者提供元素,然后返回值就是相应的生产者提供的元素
    // 第二个参数代表是否设置超时,如果设置超时,超时时间是第三个参数的值
    // 返回值如果是 null,代表超时,或者中断。具体是哪个,可以通过检测中断状态得到。
    abstract Object transfer(Object e, boolean timed, long nanos);
}
put & take
代码语言:javascript复制
// 写入值
public void put(E o) throws InterruptedException {
    if (o == null) throw new NullPointerException();
    if (transferer.transfer(o, false, 0) == null) { // 1
        Thread.interrupted();
        throw new InterruptedException();
    }
}
// 读取值并移除
public E take() throws InterruptedException {
    Object e = transferer.transfer(null, false, 0); // 2
    if (e != null)
        return (E)e;
    Thread.interrupted();
    throw new InterruptedException();
}
transfer 分析(queue 公平模式)

put(E o)take() 都调用了 transferer.transfer(), 区别是 take 操作的第一个参数为 null(则该操作为读操作)

transfer 整体的设计思路如下:

  1. 当调用这个方法时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素也都是写线程)。这种情况下,将当前线程加入到等待队列
  2. 如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然)。这种情况下,匹配等待队列的队头,出队,返回相应数据

等待队列节点 QNode 的结构如下:

代码语言:javascript复制
static final class QNode {
    volatile QNode next;          // 可以看出来,等待队列是单向链表
    volatile Object item;        
    volatile Thread waiter;       // 将线程对象保存在这里,用于挂起和唤醒
    final boolean isData;         // 用于判断是写线程节点(isData == true),还是读线程节点

    QNode(Object item, boolean isData) {
        this.item = item;
        this.isData = isData;
    }
    ......

整个transfer流程:

代码语言:javascript复制
Object transfer(Object e, boolean timed, long nanos) {

    QNode s = null;
    // 标记是否为写操作
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;
        
        // 未初始化, 自旋
        if (t == null || h == null)
            continue;                       

        // 队列空,或队列中节点类型和当前节点一致, 直接入队
        if (h == t || t.isData == isData) {
            QNode tn = t.next;
            // t != tail 说明刚刚有节点入队,continue 重试
            if (t != tail)                 
                continue;
            // 有其他节点入队,但是 tail 还是指向原来的,此时设置 tail 即可
            if (tn != null) {
                // 这个方法就是:如果 tail 此时为 t 的话,设置为 tn 为 tail
                advanceTail(t, tn);
                continue;
            }
            // 超时  can't wait
            if (timed && nanos <= 0)        
                return null;
            // 构造新节点    
            if (s == null)
                s = new QNode(e, isData);
            // 将当前节点,插入到 tail 的后面
            if (!t.casNext(null, s))        // failed to link in
                continue;

            // 将当前节点设置为新的 tail
            advanceTail(t, s);              // swing tail and wait
            
            // 自旋或阻塞,直到满足条件, 进入该方法看详情
            Object x = awaitFulfill(s, e, timed, nanos);
            // 到这里,说明之前入队的线程被唤醒了,准备往下执行
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? x : e;

        // 有相应的读或写相匹配的情况
        } else {                           
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                  

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }

            advanceHead(h, m);              // successfully fulfilled
            LockSupport.unpark(m.waiter);
            return (x != null) ? x : e;
        }
    }
}

// nt cas 设置为 新的 tail
void advanceTail(QNode t, QNode nt) {
    if (tail == t)
        UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}


// 自旋或阻塞,直到满足条件,这个方法返回
Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {

    long lastTime = timed ? System.nanoTime() : 0;
    Thread w = Thread.currentThread();
    // 判断需要自旋的次数,
    int spins = ((head.next == s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 如果被中断了,那么取消这个节点
        if (w.isInterrupted())
            // 就是将当前节点 s 中的 item 属性设置为 this
            s.tryCancel(e);
        Object x = s.item;
        // 这里是这个方法的唯一的出口
        if (x != e)
            return x;
        // 如果需要,检测是否超时
        if (timed) {
            long now = System.nanoTime();
            nanos -= now - lastTime;
            lastTime = now;
            if (nanos <= 0) {
                s.tryCancel(e);
                continue;
            }
        }
        if (spins > 0)
            --spins;
        // 如果自旋达到了最大的次数,那么检测
        else if (s.waiter == null)
            s.waiter = w;
        // 如果自旋到了最大的次数,那么线程挂起,等待唤醒
        else if (!timed)
            LockSupport.park(this);
        // spinForTimeoutThreshold 这个之前讲 AQS 的时候其实也说过,剩余时间小于这个阈值的时候,就
        // 不要进行挂起了,自旋的性能会比较好
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}
非公平模式 TransferStack

上面分析了公平模式 TransferQueue, TransferStack 流程类似:

  1. 如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而栈中的元素也都是写线程)。这种情况下,将当前线程加入到等待栈中,等待配对
  2. 如果栈中有等待节点,而且与当前操作可以匹配(如栈里面都是读操作线程,当前线程是写操作线程,反之亦然)。将当前节点压入栈顶,和栈中的节点进行匹配,然后将这两个节点出栈
  3. 如果栈顶是进行匹配而入栈的节点,帮助其进行匹配并出栈,然后再继续操作

PriorityBlockingQueue

  • PriorityQueue 的线程安全版本
  • 插入值不可为null, 并且是 comparable 的(否则会抛出 ClassCastException)
  • 相较于其他 BlockingQueue,PriorityBlockingQueue 的 put 操作不会阻塞, 因为它是无界队列
主要属性

前面说了 PriorityBlockingQueue 是 PriorityQueue 的线程安全版本, 所以基本的存储结构也与 PriorityQueue 一样.

  • 使用一个基于数组的二叉堆来存储, 采取同一个 lock 来控制并发
  • 二叉堆(小顶堆), 每个节点的值都小于其左右子节点的值, 二叉堆中最小的值就是根节点
  • 对于数组中的元素 a[i],其左子节点为 a[2*i 1],其右子节点为 a[2*i 2],其父节点为 a[(i-1)/2]
代码语言:javascript复制
// 构造方法中,如果不指定大小的话,默认大小为 11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 数组的最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

// 这个就是存放数据的数组
private transient Object[] queue;

// 队列当前大小
private transient int size;

// 大小比较器,如果按照自然序排序,那么此属性可设置为 null
private transient Comparator<? super E> comparator;

// 并发控制所用的锁,所有的 public 且涉及到线程安全的方法,都必须先获取到这个锁
private final ReentrantLock lock;

// 由上面的 lock 属性创建
private final Condition notEmpty;

// 这个也是用于锁,用于数组扩容的时候,需要先获取到这个锁,才能进行扩容操作
// 其使用 CAS 操作
private transient volatile int allocationSpinLock;

// 用于序列化和反序列化的时候用,对于 PriorityBlockingQueue 我们应该比较少使用到序列化
private PriorityQueue q;
自动扩容

PriorityBlockingQueue 实现了并发安全的自动扩容, tryGrow:

代码语言:javascript复制
private void tryGrow(Object[] array, int oldCap) {
    // 先释放锁, 后面重新获取锁
    // 这里先释放独占锁, 这样读操作和扩容操作就可以同时进行了
    lock.unlock(); 
    Object[] newArray = null;
    // 用 CAS 操作将 allocationSpinLock 由 0 变为 1,也算是获取锁
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            // 如果节点个数小于 64,那么增加的 oldCap   2 的容量
            // 如果节点数大于等于 64,那么增加 oldCap 的一半
            // 所以节点数较小时,增长得快一些
            int newCap = oldCap   ((oldCap < 64) ?
                                   (oldCap   2) :
                                   (oldCap >> 1));
            // 这里有可能溢出
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap   1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            // 如果 queue != array,那么说明有其他线程给 queue 分配了其他的空间
            if (newCap > oldCap && queue == array)
                // 分配一个新的大数组
                newArray = new Object[newCap];
        } finally {
            // 重置,也就是释放锁
            allocationSpinLock = 0;
        }
    }
    // 如果有其他的线程也在做扩容的操作
    if (newArray == null) 
        Thread.yield();
    // 重新获取锁
    lock.lock();
    // 将原来数组中的元素复制到新分配的大数组中
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}
put & take 操作

put 流程与 PriorityQueue 类似, 都是先插入到最后,然后与父节点比较,直到父节点小于插入元素,不过加了一个 lock,

代码语言:javascript复制
public void put(E e) {
    // 直接调用 offer 方法,因为前面我们也说了,在这里,put 方法不会阻塞
    offer(e); 
}
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    // 首先获取到独占锁
    lock.lock();
    int n, cap;
    Object[] array;
    // 如果当前队列中的元素个数 >= 数组的大小,那么需要扩容了
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        // 节点添加到二叉堆中
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        // 更新 size
        size = n   1;
        // 唤醒等待的读线程
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

take 流程:

代码语言:javascript复制
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 独占锁
    lock.lockInterruptibly();
    E result;
    try {
        // dequeue 出队, 如果没有元素就阻塞在这里
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

private E dequeue() {
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        // 队头,用于返回
        E result = (E) array[0];
        // 队尾元素先取出
        E x = (E) array[n];
        // 队尾置空
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}
// 出队后调整树的结构, 使其符合小顶堆
private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        // 这里得到的 half 肯定是非叶节点
        // a[n] 是最后一个元素,其父节点是 a[(n-1)/2]。所以 n >>> 1 代表的节点肯定不是叶子节点
        int half = n >>> 1; // 得到 half = 4
        while (k < half) {
            // 先取左子节点
            int child = (k << 1)   1; // 得到 child = 1
            Object c = array[child];  // c = 12
            int right = child   1;  // right = 2
            // 如果右子节点存在,而且比左子节点小
            // 此时 array[right] = 20,所以条件不满足
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            // key = 17, c = 12,所以条件不满足
            if (key.compareTo((T) c) <= 0)
                break;
            // 把 12 填充到根节点
            array[k] = c;
            // k 赋值后为 1
            k = child;
            // 一轮过后,我们发现,12 左边的子树和刚刚的差不多,都是缺少根节点,接下来处理就简单了
        }
        array[k] = key;
    }
}

DelayQueue

DelayQueue 是用优先队列实现的无界阻塞队列

主要属性
代码语言:javascript复制
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    // 持有内部重入锁。
    private final transient ReentrantLock lock = new ReentrantLock();
    // 优先级队列,存放工作任务。
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    // 当前等待获取到期元素的线程
    private Thread leader = null;
    // 依赖于重入锁的 condition。
    private final Condition available = lock.newCondition();
}
put & take 操作

put 操作如下

代码语言:javascript复制
public void put(E e) {
    offer(e);
}

public boolean offer(E e) {
    // 获取锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 往优先队列添加元素
        q.offer(e);
        // 添加成功
        if (q.peek() == e) {
            // 添加元素成功后将当前等待的leader线程移除, 重新唤醒一个线程
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

take 操作:

代码语言:javascript复制
public E take() throws InterruptedException {
    // 同样先获取锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 队头元素
            E first = q.peek();
            // 如果队头为空, 则挂起当前线程
            if (first == null)
                available.await();
            else {
                // 通过延迟任务的 getDelay 获取延迟时间
                long delay = first.getDelay(NANOSECONDS);
                // 时间到期, 删除并返回队头
                if (delay <= 0)
                    return q.poll();
                // 时间未到, 线程进入等待,此时不能持有头节点元素引用, 防止内存泄漏
                first = null; // don't retain ref while waiting
                // 如果 leader 线程已存在, 当前线程直接等待
                if (leader != null)
                    available.await();
                else {
                    // 当前线程赋值给 leader 并且限时等待获取元素
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 限时等待
                        available.awaitNanos(delay);
                    } finally {
                        // 限时等待完成, 进入下一个循环获取元素返回
                        // 如果 leader != thisThread, 代表等待期间有新元素添加, 重新选择 leader
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果leader线程为null且队头任务不为空,唤醒其中一个等待线程,使之能成为新leader
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

0 人点赞