BlockingQueue 介绍
BlockingQueue 是一个先进先出的队列(Queue), 并且当获取队列元素但是队列为空时,会阻塞等待队列中有元素再返回;也支持添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入。
BlockingQueue 对插入、删除、获取元素在不同场景下提供了不同的操作:
抛异常 | 返回特殊值(成功或失败) | 阻塞等待 | 阻塞等待直至超时 | |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
删除 | remove() | poll() | take() | poll(time, unit) |
获取 | element() | peek() | 无 | 无 |
我们重点关注 put
和 take
这两个阻塞操作, BlockingQueue 主要是于消费者-生产者场景的一个线程安全容器
ArrayBlockingQueue
- ArrayBlockingQueue 是 BlockingQueue 的一个有界队列实现,底层采取数组
- 并发控制采取可重入锁, 插入和读取操作都需要获取锁
- 如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程
- 如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程
主要属性
代码语言:javascript复制// 用于存放元素的数组
final Object[] items;
// 下一次读取操作的位置
int takeIndex;
// 下一次写入操作的位置
int putIndex;
// 队列中的元素数量
int count;
// 以下几个就是控制并发用的同步器
final ReentrantLock lock;
// 队列为空的条件队列
private final Condition notEmpty;
// 队列满的条件队列
private final Condition notFull;
阻塞操作 take & put
put 操作流程:
代码语言:javascript复制public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 插入前先获取锁
lock.lockInterruptibly();
try {
// 如果队列满了, 写操作线程进入条件队列等待
while (count == items.length)
notFull.await();
// 元素入队
enqueue(e);
} finally {
lock.unlock();
}
}
// 入队操作
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
// 入队
items[putIndex] = x;
// 循环使用 index
if ( putIndex == items.length)
putIndex = 0;
count ;
// 队列已经有数据了,唤醒等待队消费者
notEmpty.signal();
}
take 操作流程:
代码语言:javascript复制public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列为空, 读操作线程挂起等待
while (count == 0)
notEmpty.await();
// 出队操作
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if ( takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒队列满时等待的写线程
notFull.signal();
return x;
}
LinkedBlockingQueue
- LinkedBlockingQueue 底层基于单向链表,可以作为无界队列也可作为有界队列
- 如果要获取(take)一个元素,需要获取 takeLock 锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)
- 如果要插入(put)一个元素,需要获取 putLock 锁,但是获取了锁还不够,如果队列此时已满,还需要队列不是满的(notFull)这个条件(Condition)
主要属性
构造方法:
代码语言:javascript复制// 无界队列构造方法
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 有界队列构造方法
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
类属性:
代码语言:javascript复制// 队列容量
private final int capacity;
// 队列中的元素数量
private final AtomicInteger count = new AtomicInteger(0);
// 队头
private transient Node<E> head;
// 队尾
private transient Node<E> last;
// take, poll, peek 等读操作的方法需要获取到这个锁
private final ReentrantLock takeLock = new ReentrantLock();
// 如果读操作的时候队列是空的,那么等待 notEmpty 条件
private final Condition notEmpty = takeLock.newCondition();
// put, offer 等写操作的方法需要获取到这个锁
private final ReentrantLock putLock = new ReentrantLock();
// 如果写操作的时候队列是满的,那么等待 notFull 条件
private final Condition notFull = putLock.newCondition();
阻塞操作 put & take
put 操作流程:
代码语言:javascript复制public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// 标识成功、失败的标志
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 必须要获取到 putLock 才可以进行插入操作
putLock.lockInterruptibly();
try {
// 如果队列满,等待 notFull 的条件满足。
while (count.get() == capacity) {
notFull.await();
}
// 入队
enqueue(node);
// count 原子加 1,c 还是加 1 前的值
c = count.getAndIncrement();
// 如果这个元素入队后,还有至少一个槽可以使用,调用 notFull.signal() 唤醒等待线程。
if (c 1 < capacity)
notFull.signal();
} finally {
// 入队后,释放掉 putLock
putLock.unlock();
}
// 如果 c == 0,那么代表队列在这个元素入队前是空的(不包括head空节点),
// 那么所有的读线程都在等待 notEmpty 这个条件,等待唤醒,这里做一次唤醒操作
if (c == 0)
signalNotEmpty();
}
// 入队的代码非常简单,就是将 last 属性指向这个新元素,并且让原队尾的 next 指向这个元素
// 这里入队没有并发问题,因为只有获取到 putLock 独占锁以后,才可以进行此操作
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
// 元素入队后,如果需要,调用这个方法唤醒读线程来读
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
take 操作流程:
代码语言:javascript复制public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 首先,需要获取到 takeLock 才能进行出队操作
takeLock.lockInterruptibly();
try {
// 如果队列为空,等待 notEmpty 这个条件满足再继续执行
while (count.get() == 0) {
notEmpty.await();
}
// 出队
x = dequeue();
// count 进行原子减 1
c = count.getAndDecrement();
// 如果这次出队后,队列中至少还有一个元素,那么调用 notEmpty.signal() 唤醒其他的读线程
if (c > 1)
notEmpty.signal();
} finally {
// 出队后释放掉 takeLock
takeLock.unlock();
}
// 如果 c == capacity,那么说明在这个 take 方法发生的时候,队列是满的
// 既然出队了一个,那么意味着队列不满了,唤醒写线程去写
if (c == capacity)
signalNotFull();
return x;
}
// 取队头,出队
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
// 之前说了,头结点是空的
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
// 设置这个为新的头结点
head = first;
E x = first.item;
first.item = null;
return x;
}
// 元素出队后,如果需要,调用这个方法唤醒写线程来写
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
SynchronousQueue
- 同步队列: 当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要等待另一个线程来将这个元素拿走;同理,当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作
- SynchronousQueue 不提供任何空间来存储元素。数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费
- 没有 peek 方法(直接返回null)
构造方法
代码语言:javascript复制// 构造时,我们可以指定公平模式还是非公平模式
// queue 先入先出, stack 先入后出
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue() : new TransferStack();
}
// TransferQueue 和 TransferStack 都实现了抽象类 Transfer
abstract static class Transferer {
// 从方法名上大概就知道,这个方法用于转移元素,从生产者手上转到消费者手上
// 也可以被动地,消费者调用这个方法来从生产者手上取元素
// 第一个参数 e 如果不是 null,代表场景为:将元素从生产者转移给消费者
// 如果是 null,代表消费者等待生产者提供元素,然后返回值就是相应的生产者提供的元素
// 第二个参数代表是否设置超时,如果设置超时,超时时间是第三个参数的值
// 返回值如果是 null,代表超时,或者中断。具体是哪个,可以通过检测中断状态得到。
abstract Object transfer(Object e, boolean timed, long nanos);
}
put & take
代码语言:javascript复制// 写入值
public void put(E o) throws InterruptedException {
if (o == null) throw new NullPointerException();
if (transferer.transfer(o, false, 0) == null) { // 1
Thread.interrupted();
throw new InterruptedException();
}
}
// 读取值并移除
public E take() throws InterruptedException {
Object e = transferer.transfer(null, false, 0); // 2
if (e != null)
return (E)e;
Thread.interrupted();
throw new InterruptedException();
}
transfer 分析(queue 公平模式)
put(E o)
和 take()
都调用了 transferer.transfer()
, 区别是 take 操作的第一个参数为 null(则该操作为读操作)
transfer
整体的设计思路如下:
- 当调用这个方法时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素也都是写线程)。这种情况下,将当前线程加入到等待队列
- 如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然)。这种情况下,匹配等待队列的队头,出队,返回相应数据
等待队列节点 QNode
的结构如下:
static final class QNode {
volatile QNode next; // 可以看出来,等待队列是单向链表
volatile Object item;
volatile Thread waiter; // 将线程对象保存在这里,用于挂起和唤醒
final boolean isData; // 用于判断是写线程节点(isData == true),还是读线程节点
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
......
整个transfer流程:
代码语言:javascript复制Object transfer(Object e, boolean timed, long nanos) {
QNode s = null;
// 标记是否为写操作
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
// 未初始化, 自旋
if (t == null || h == null)
continue;
// 队列空,或队列中节点类型和当前节点一致, 直接入队
if (h == t || t.isData == isData) {
QNode tn = t.next;
// t != tail 说明刚刚有节点入队,continue 重试
if (t != tail)
continue;
// 有其他节点入队,但是 tail 还是指向原来的,此时设置 tail 即可
if (tn != null) {
// 这个方法就是:如果 tail 此时为 t 的话,设置为 tn 为 tail
advanceTail(t, tn);
continue;
}
// 超时 can't wait
if (timed && nanos <= 0)
return null;
// 构造新节点
if (s == null)
s = new QNode(e, isData);
// 将当前节点,插入到 tail 的后面
if (!t.casNext(null, s)) // failed to link in
continue;
// 将当前节点设置为新的 tail
advanceTail(t, s); // swing tail and wait
// 自旋或阻塞,直到满足条件, 进入该方法看详情
Object x = awaitFulfill(s, e, timed, nanos);
// 到这里,说明之前入队的线程被唤醒了,准备往下执行
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? x : e;
// 有相应的读或写相匹配的情况
} else {
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue;
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? x : e;
}
}
}
// nt cas 设置为 新的 tail
void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
// 自旋或阻塞,直到满足条件,这个方法返回
Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
long lastTime = timed ? System.nanoTime() : 0;
Thread w = Thread.currentThread();
// 判断需要自旋的次数,
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 如果被中断了,那么取消这个节点
if (w.isInterrupted())
// 就是将当前节点 s 中的 item 属性设置为 this
s.tryCancel(e);
Object x = s.item;
// 这里是这个方法的唯一的出口
if (x != e)
return x;
// 如果需要,检测是否超时
if (timed) {
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <= 0) {
s.tryCancel(e);
continue;
}
}
if (spins > 0)
--spins;
// 如果自旋达到了最大的次数,那么检测
else if (s.waiter == null)
s.waiter = w;
// 如果自旋到了最大的次数,那么线程挂起,等待唤醒
else if (!timed)
LockSupport.park(this);
// spinForTimeoutThreshold 这个之前讲 AQS 的时候其实也说过,剩余时间小于这个阈值的时候,就
// 不要进行挂起了,自旋的性能会比较好
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
非公平模式 TransferStack
上面分析了公平模式 TransferQueue, TransferStack 流程类似:
- 如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而栈中的元素也都是写线程)。这种情况下,将当前线程加入到等待栈中,等待配对
- 如果栈中有等待节点,而且与当前操作可以匹配(如栈里面都是读操作线程,当前线程是写操作线程,反之亦然)。将当前节点压入栈顶,和栈中的节点进行匹配,然后将这两个节点出栈
- 如果栈顶是进行匹配而入栈的节点,帮助其进行匹配并出栈,然后再继续操作
PriorityBlockingQueue
- PriorityQueue 的线程安全版本
- 插入值不可为null, 并且是 comparable 的(否则会抛出 ClassCastException)
- 相较于其他 BlockingQueue,PriorityBlockingQueue 的 put 操作不会阻塞, 因为它是无界队列
主要属性
前面说了 PriorityBlockingQueue 是 PriorityQueue 的线程安全版本, 所以基本的存储结构也与 PriorityQueue 一样.
- 使用一个基于数组的二叉堆来存储, 采取同一个 lock 来控制并发
- 二叉堆(小顶堆), 每个节点的值都小于其左右子节点的值, 二叉堆中最小的值就是根节点
- 对于数组中的元素
a[i]
,其左子节点为a[2*i 1]
,其右子节点为a[2*i 2]
,其父节点为a[(i-1)/2]
// 构造方法中,如果不指定大小的话,默认大小为 11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 数组的最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 这个就是存放数据的数组
private transient Object[] queue;
// 队列当前大小
private transient int size;
// 大小比较器,如果按照自然序排序,那么此属性可设置为 null
private transient Comparator<? super E> comparator;
// 并发控制所用的锁,所有的 public 且涉及到线程安全的方法,都必须先获取到这个锁
private final ReentrantLock lock;
// 由上面的 lock 属性创建
private final Condition notEmpty;
// 这个也是用于锁,用于数组扩容的时候,需要先获取到这个锁,才能进行扩容操作
// 其使用 CAS 操作
private transient volatile int allocationSpinLock;
// 用于序列化和反序列化的时候用,对于 PriorityBlockingQueue 我们应该比较少使用到序列化
private PriorityQueue q;
自动扩容
PriorityBlockingQueue 实现了并发安全的自动扩容, tryGrow
:
private void tryGrow(Object[] array, int oldCap) {
// 先释放锁, 后面重新获取锁
// 这里先释放独占锁, 这样读操作和扩容操作就可以同时进行了
lock.unlock();
Object[] newArray = null;
// 用 CAS 操作将 allocationSpinLock 由 0 变为 1,也算是获取锁
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// 如果节点个数小于 64,那么增加的 oldCap 2 的容量
// 如果节点数大于等于 64,那么增加 oldCap 的一半
// 所以节点数较小时,增长得快一些
int newCap = oldCap ((oldCap < 64) ?
(oldCap 2) :
(oldCap >> 1));
// 这里有可能溢出
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
// 如果 queue != array,那么说明有其他线程给 queue 分配了其他的空间
if (newCap > oldCap && queue == array)
// 分配一个新的大数组
newArray = new Object[newCap];
} finally {
// 重置,也就是释放锁
allocationSpinLock = 0;
}
}
// 如果有其他的线程也在做扩容的操作
if (newArray == null)
Thread.yield();
// 重新获取锁
lock.lock();
// 将原来数组中的元素复制到新分配的大数组中
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
put & take 操作
put 流程与 PriorityQueue 类似, 都是先插入到最后,然后与父节点比较,直到父节点小于插入元素,不过加了一个 lock,
代码语言:javascript复制public void put(E e) {
// 直接调用 offer 方法,因为前面我们也说了,在这里,put 方法不会阻塞
offer(e);
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
// 首先获取到独占锁
lock.lock();
int n, cap;
Object[] array;
// 如果当前队列中的元素个数 >= 数组的大小,那么需要扩容了
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
// 节点添加到二叉堆中
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
// 更新 size
size = n 1;
// 唤醒等待的读线程
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
take 流程:
代码语言:javascript复制public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 独占锁
lock.lockInterruptibly();
E result;
try {
// dequeue 出队, 如果没有元素就阻塞在这里
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
// 队头,用于返回
E result = (E) array[0];
// 队尾元素先取出
E x = (E) array[n];
// 队尾置空
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
// 出队后调整树的结构, 使其符合小顶堆
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
// 这里得到的 half 肯定是非叶节点
// a[n] 是最后一个元素,其父节点是 a[(n-1)/2]。所以 n >>> 1 代表的节点肯定不是叶子节点
int half = n >>> 1; // 得到 half = 4
while (k < half) {
// 先取左子节点
int child = (k << 1) 1; // 得到 child = 1
Object c = array[child]; // c = 12
int right = child 1; // right = 2
// 如果右子节点存在,而且比左子节点小
// 此时 array[right] = 20,所以条件不满足
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
// key = 17, c = 12,所以条件不满足
if (key.compareTo((T) c) <= 0)
break;
// 把 12 填充到根节点
array[k] = c;
// k 赋值后为 1
k = child;
// 一轮过后,我们发现,12 左边的子树和刚刚的差不多,都是缺少根节点,接下来处理就简单了
}
array[k] = key;
}
}
DelayQueue
DelayQueue 是用优先队列实现的无界阻塞队列
主要属性
代码语言:javascript复制public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
// 持有内部重入锁。
private final transient ReentrantLock lock = new ReentrantLock();
// 优先级队列,存放工作任务。
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 当前等待获取到期元素的线程
private Thread leader = null;
// 依赖于重入锁的 condition。
private final Condition available = lock.newCondition();
}
put & take 操作
put 操作如下
代码语言:javascript复制public void put(E e) {
offer(e);
}
public boolean offer(E e) {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 往优先队列添加元素
q.offer(e);
// 添加成功
if (q.peek() == e) {
// 添加元素成功后将当前等待的leader线程移除, 重新唤醒一个线程
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
take 操作:
代码语言:javascript复制public E take() throws InterruptedException {
// 同样先获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 队头元素
E first = q.peek();
// 如果队头为空, 则挂起当前线程
if (first == null)
available.await();
else {
// 通过延迟任务的 getDelay 获取延迟时间
long delay = first.getDelay(NANOSECONDS);
// 时间到期, 删除并返回队头
if (delay <= 0)
return q.poll();
// 时间未到, 线程进入等待,此时不能持有头节点元素引用, 防止内存泄漏
first = null; // don't retain ref while waiting
// 如果 leader 线程已存在, 当前线程直接等待
if (leader != null)
available.await();
else {
// 当前线程赋值给 leader 并且限时等待获取元素
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 限时等待
available.awaitNanos(delay);
} finally {
// 限时等待完成, 进入下一个循环获取元素返回
// 如果 leader != thisThread, 代表等待期间有新元素添加, 重新选择 leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果leader线程为null且队头任务不为空,唤醒其中一个等待线程,使之能成为新leader
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}