阻塞队列长用于生产者消费者场景,生产者添加元素,消费者获取元素。BlockingQueue是存放元素的容器,它提供了线程安全的队列访问方式,JUC下面很多高级同步类都是基于它实现的。
1 概述
阻塞队列(BlockingQueue)是一个支持两种附加操作的队列。支持附加阻塞的插入和移除操作。
- 支持阻塞的插入:当队列满时,插入操作会被阻塞,直到队列不满。
- 支持阻塞的移除:当队列空时,移除操作会被阻塞,直到队列不空。
阻塞队列不可用时,操作处理方式
方法处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除方法 | remove() | poll() | take() | poll(time, unit) |
检查方法 | element() | peek() | 无 | 无 |
- 抛出异常:队列满时,若继续插入元素会抛出
IllegalStateException
;当队列为空时,若获取元素则会抛出NoSuchElementException
异常。 - 返回特殊值:向队列插入元素时,会返回是否插入成功true/false;获取元素时,成功则返回元素,失败则返回null。
- 一直阻塞:当阻塞队列满时,若继续使用put新增元素时会被阻塞,直到队列不为空或者响应中断退出;当阻塞队列为空时,继续使用take获取元素时会被阻塞,直到队列不为空。
- 超时退出:当阻塞队列满时,使用offer(e, time, unit)新增元素会被阻塞至超时退出;当队列为空时,使用poll(time, unit)获取元素时会被阻塞至超时退出。
注意:
- 阻塞队列中不允许插入
null
,会抛出NPE异常。 - 可以访问阻塞队列中的任意元素,调用
remove(Object o)
可以将队列之中的特定对象移除,但会遍历全部元素,并不高效。
2 阻塞队列的实现
2.1 ArrayBlockingQueue
由数组构成的有界阻塞队列,内部由数组final Object[] items
实现。默认情况下不保证线程公平的访问队列,所谓公平访问队列指阻塞的线程,可以按照阻塞的先后顺序访问队列。
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair); // 使用公平锁/非公平锁
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
队列大小初始化后不可修改。参数fair
控制内部ReentrantLock
是否采用公平锁。
2.2 LinkedBlockingQueue
链表实现的有界阻塞队列。内部结构是单链表。默认大小为Integer.MAX_VALUE
,可以指定大小。
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
// 指定队列大小
this.capacity = capacity;
last = head = new Node<E>(null);
}
// 单链表节点Node
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
2.3 PriorityBlockingQueue
支持优先级的无界阻塞队列。默认情况下采取自然顺序升序排列。也可以自定义compareTo()
方法来指定元素的排列顺序,或者初始化队列时,指定构造参数Comparator
来对元素进行排序。同优先级顺序无法保证。
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock(); // 非公平锁
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
// offer方法部分代码
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
由offer代码可以看出,Comparator
的优先级是大于Comparable.compareTo
方法的。
注意:PriorityBlockingQueue
不会阻塞数据生产者(队列无界),只会在没有数据时阻塞消费者。生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则将有可能耗尽堆空间。
2.4 DelayQueue
支持延时获取元素的无界队列。队列使用PriorityQueue
实现。队列中的元素必须实现java.util.concurrent.Delayed
接口,在创建元素时指定多久才能才能从队列中取到元素。
DelayQueue非常有用,可以将DelayQueu应用在以下应用场景。
- 缓存系统的设计:用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能获取到元素时,表示缓存有限期到了。
- 定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行。比如
TimerQueue
就是使用DelayQueue实现的。
2.5 SynchronousQueue
不存储元素的阻塞队列。每个put
操作都必须等待一个take
操作,反之亦然。
// fair为true,等待线程将以FIFO的顺序进行访问
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
将生产者线程处理的数据直接传递给消费者线程。队列本身不存储任何元素,非常适合传递性场景。SynchronousQueue
的吞吐量高于ArrayBlockingQueue
和LinkedBlockingQueue
。
3 阻塞队列的原理
利用Lock
锁的多条件(Condition)阻塞控制。下面简单分析下ArrayBlockingQueue
部分代码。
3.1 ArrayBlockingQueue属性
代码语言:javascript复制/** The queued items */
// 数据元素数组
final Object[] items;
/** items index for next take, poll, peek or remove */
// 下一个待获取元素索引
int takeIndex;
/** items index for next put, offer, or add */
// 下一个待插入元素索引
int putIndex;
/** Number of elements in the queue */
// 队列中元素个数
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
// 所有访问的主锁
final ReentrantLock lock;
/** Condition for waiting takes */
// 消费者监视器
private final Condition notEmpty;
/** Condition for waiting puts */
// 生产者监视器
private final Condition notFull;
//
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
3.2 put操作
代码语言:javascript复制// 在队列尾部插入元素,若队列已满则等待队列非满。
public void put(E e) throws InterruptedException {
// 校验插入元素,为空则抛出NPE
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 1. 尝试获取锁(响应中断)
lock.lockInterruptibly();
try {
// 2. 当队列满时
while (count == items.length)
// 2.1 若队列满,则阻塞当前线程。等待`notFull.signal()`唤醒。
notFull.await();
// 3. 非满则执行入队操作
enqueue(e);
} finally {
lock.unlock();
}
}
// 在`putIndex`处放置当前元素,只有获取lock锁后才会调用
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
// 在`putIndex`处放置元素
items[putIndex] = x;
// putIndex等于数组长度时,重置为0索引。
if ( putIndex == items.length)
putIndex = 0;
// 数量加1
count ;
// 4. 唤醒一个等待线程(等待取元素的线程)
notEmpty.signal();
}
put总体流程:
- 获取lock锁,拿到锁后继续执行,否则自旋竞争锁。
- 判断阻塞队列是否满。满了了则调用
await
阻塞当前线程。同时释放lock锁。 - 如果没满,则调用
enqueue
方法将元素put进阻塞队列。此时还有一种可能是:第2步中被阻塞的线程被唤醒且又拿到了lock锁。 - 唤醒一个标记为
notEmpty(消费者)
的线程。
3.3 take操作
代码语言:javascript复制// 从头部获取元素,若队列为空则等待队列非空。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 1. 获取锁
lock.lockInterruptibly();
try {
// 2. 当队列为空时
while (count == 0)
// 2.1 当队列为空时,阻塞当前线程。等待`notEmpty.signal()`唤醒。
notEmpty.await();
// 3. 非空则进行入队操作
return dequeue();
} finally {
lock.unlock();
}
}
// 从`takeIndex`位置获取当前元素,只有获取到lock锁后才会调用
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 从`takeIndex`位置获取元素,然后清除该位置元素
E x = (E) items[takeIndex];
items[takeIndex] = null;
//
if ( takeIndex == items.length)
takeIndex = 0;
// 队列元素减1
count--;
if (itrs != null)
itrs.elementDequeued();
// 4. 唤醒一个标记为notFull(生产者)的线程
notFull.signal();
return x;
}
take的整体流程:
- 获取lock锁,拿到锁则执行下一步流程;未拿到则自旋竞争锁。
- 当前队列是否为空,若为空则调用
notEmpty.await
阻塞当前线程,同时释放锁,等待被唤醒。 - 若非空,则调用
dequeue
进行出队操作。此时还有一种可能:第2步中的阻塞的线程被唤醒并且又拿到了lock锁。 - 唤醒一个被标记为notFull(生产者)的线程。
3.4 总结
put
和take
操作都需要先获得锁,没有获得锁的线程无法进行操作。- 拿到锁后,并不一定能顺利执行
put
/take
操作,还需要判断队列是否可用(是否满/空),不可用则会被阻塞,并释放锁。 - 在2中被阻塞的线程会被唤醒,但唤醒之后依然需要拿到锁之后才能继续向下执行。否则,自旋拿锁,拿到锁后再while判断队列是否可用。
参考资料:
- 《Java并发编程的艺术》
- 深入浅出Java多线程