阻塞队列和生产者 - 消费者模式
LinkedBlockingQueue在BlockingQueue的实现类中使用最多(如果知道队列的大小,可以考虑使用ArrayBlockIngQueue,它使用循环数组实现。但是如果不知道队列未来的大小,那么使用ArrayBlockingQueue就必然会导致数组的来回复制,降低效率)。我们主要关心可阻塞的put和take方法,以及支持定时的offer和poll方法。如果队列已经满了,那么put方法将阻塞直到有空间可用;如果队列为空,那么take方法将会阻塞直到有元素可用。队列可以是有界的也可以是无界的,无界队列永远都不会充满,因此无界队列上 的put方法也永远不会阻塞(如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限)。
生产者-消费者模式好处
- 解耦:假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。如果将来消费者的代码发生变化, 可能会影响到生产者。而如果两者都依赖于某个缓冲区(比如阻塞队列),两者之间不直接依赖,耦合也就相应降低了,同时提高了代码可读性和可重用性
- 提高并发性能:生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。使用了生产者/消费者模式之后,由于生产者与消费者是两个独立的并发体,他们之间是用缓冲区作为桥梁连接,生产者只需要往缓冲区里丢数据,就可以继续生产下一个数据,而消费者只需要从缓冲区里拿数据即可,减少了因为彼此的处理速度差异而引起的阻塞。
- 在高并发场景下平滑短时间内大量的服务请求:在访问量剧增的情况下,你的应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。而在生产者-消费者模式中,当数据生产快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中,等生产者的生产速度慢下来,消费者再慢慢处理掉。
2. 源码分析
LinkedBlockingQueue采用的是单链表结构,包含了头结点和尾节点,last入队,head出队。
入队:last.next=node;last = node;
,即last中保存了最后一个有效元素;
出队:Node<E> h = head; Node<E> first = h.next; head = first; E x = first.item; first.item = null; return x;
,即head并没有存放有效的值(为null),将head指向的下一个节点的值返回,并将下一个节点的设为新的head。
// 所有的元素都通过Node这个静态内部类来进行存储,这与LinkedList的处理方式完全一样
static class Node<E> {
//使用item来保存元素本身
E item;
//保存当前节点的后继节点
Node<E> next;
Node(E x) { item = x; }
}
/**
阻塞队列所能存储的最大容量
用户可以在创建时手动指定最大容量,如果用户没有指定最大容量
那么最默认的最大容量为Integer.MAX_VALUE.
*/
private final int capacity;
/**
当前阻塞队列中的元素数量,由于它的入队列和出队列使用的是两个
不同的lock对象,因此无论是在入队列还是出队列,都会涉及对元素数
量的并发修改,因此这里使用了一个原子操作类来解决对同一个变量进行并发修改的线程安全问题。
*/
private final AtomicInteger count = new AtomicInteger(0);
/**
* 链表的头部
* LinkedBlockingQueue的头部具有一个不变性:
* 头部的元素总是为null,head.item==null
*/
private transient Node<E> head;
/**
* 链表的尾部
* LinkedBlockingQueue的尾部也具有一个不变性:
* 即last.next==null
*/
private transient Node<E> last;
/**
元素出队列时线程所获取的锁
当执行take、poll等操作时线程需要获取的锁
*/
private final ReentrantLock takeLock = new ReentrantLock();
/**
当队列为空时,通过该Condition让从队列中获取元素的线程处于等待状态
*/
private final Condition notEmpty = takeLock.newCondition();
/**
元素入队列时线程所获取的锁
当执行add、put、offer等操作时线程需要获取锁
*/
private final ReentrantLock putLock = new ReentrantLock();
/**
当队列的元素已经达到capactiy,通过该Condition让元素入队列的线程处于等待状态
*/
private final Condition notFull = putLock.newCondition();
通过上面的分析,我们可以发现LinkedBlockingQueue在入队列和出队列时使用的不是同一个Lock,这也意味着它们之间的操作不会存在互斥操作。在多个CPU的情况下,它们可以做到真正的在同一时刻既消费、又生产,能够做到并行处理。