Java并发——BlockingQueue阻塞队列(九)

2024-04-27 07:41:43 浏览数 (1)

一、什么是阻塞队列(BlockingQueue)

阻塞队列,也就是 BlockingQueue,它是一个接口。BlockingQueue是基于阻塞机制实现的线程安全的队列。而阻塞机制的实现是通过在入队和出队时加锁的方式避免并发操作。

生产者线程,它会把生产出来的结果放到中间的阻塞队列中,而右侧的三个消费者也会从阻塞队列中取出它所需要的内容并进行处理。因为阻塞队列是线程安全的,所以生产者和消费者都可以是多线程的,不会发生线程安全问题

参考

深入理解Java系列 | BlockingQueue用法详解

深入理解Java系列 | Queue用法详解

二、并发队列关系

Java 提供的线程安全的队列(也称为并发队列)分为阻塞队列非阻塞队列两大类。

阻塞队列就是BlockingQueue接口的实现,非阻塞队列典型例子是ConcurrentLinkedQueue,这个类不会让线程阻塞,利用 CAS 保证了线程安全。并发队列关系如下图:

三、阻塞队列的特点

先进先出

FIFO的数据结构(因为extends Queue)先进先出是Queue的能力

代码语言:Java复制
public interface BlockingQueue<E> extends Queue<E> 

线程安全

队列本身加锁,生产者和消费者不需要考虑线程安全问题

支持阻塞插入和取出

阻塞功能使得生产者和消费者两端的能力得以平衡,当有任何一端速度过快时,阻塞队列便会把过快的速度给降下来。实现阻塞最重要的两个方法是 take 方法和 put 方法。

take方法

take方法是获取并移除头结点。队列为空,阻塞队列。直到队列里有数据,则解除阻塞,消费者可以取得数据。

put方法

put 方法是向对尾插入元素。插入时,如果队列没满,正常插入;如队列已满,则阻塞,直到队列里有空闲空间再插入。

可选容量限制

有界容量

如 ArrayBlockingQueue 如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了

无界容量

无界队列意味着里面可以容纳非常多的元素,例如 LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,约为 2 的 31 次方,是非常大的一个数,可以近似认为是无限容量,因为我们几乎无法把这个容量装满

四、阻塞队列常用方法

首先看下BlockingQueue接口代码

代码语言:java复制
public interface BlockingQueue<E> extends Queue<E> {
   /**
     * 入队一个元素,如果有空间则直接插入,并返回true;
     * 如果没有空间则抛出IllegalStateException
     */
    boolean add(E e);

    /**
     * 入队一个元素,如果有空间则直接插入,并返回true;
     * 如果没有空间返回false
     */
    boolean offer(E e);

    /**
     * 入队一个元素,如果有空间则直接插入,如果没有空间则一直阻塞等待
     */
    void put(E e) throws InterruptedException;

    /**
     * 入队一个元素,如果有空间则直接插入,并返回true;
     * 如果没有空间则等待timeout时间,插入失败则返回false
     */
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 出队一个元素,如果存在则直接出队,如果没有空间则一直阻塞等待
     */
    E take() throws InterruptedException;

    /**
     * 出队一个元素,如果存在则直接出队,如果没有空间则等待timeout时间,无元素则返回null
     */
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;
    
    /**
     * 返回该队列剩余的容量(如果没有限制则返回Integer.MAX_VALUE)
     */
    int remainingCapacity();

    /**
     * 如果元素o在队列中存在,则从队列中删除
     */
    boolean remove(Object o);

    /**
     * 判断队列中是否存在元素o
     */
    public boolean contains(Object o);
    
    /**
     * 将队列中的所有元素出队,并添加到给定的集合c中,返回出队的元素数量
     */
    int drainTo(Collection<? super E> c);

    /**
     * 将队列中的元素出队,限制数量maxElements个,并添加到给定的集合c中,返回出队的元素数量
     */
    int drainTo(Collection<? super E> c, int maxElements);
}

BlockingQueue主要提供方法,如下表所示:

方法

操作

抛出异常

返回特定值

阻塞

阻塞特定时间

特点

add(e)

入队

如果队列满了,操作失败,抛出异常

remove(o)

出队

如果队列空,删除失败,抛出异常

element()

获取队首元素

如果队列空,删除失败,抛出异常

offer(e)

入队

队列满返回false,添加成功返回true

poll()

出队

队列空返回null

peek()

获取队首元素

队列空返回null

offer(e, timeout, unit)

入队

poll(timeout, unit)

出队

put(e)

入队

队列满,阻塞

take()

出队

队列空,阻塞

五、常用阻塞队列

ArrayBlockingQueue

1、说明

基于数组的阻塞队列,使用数组存储数据,并需要指定其长度,所以是一个有界队列;利用 ReentrantLock 实现线程安全。

2、原理

代码语言:java复制
   final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

ArrayBlockingQueue 实现并发同步的原理就是利用 ReentrantLock 和它的两个 Condition notEmpty 和notFull(两个阻塞唤醒条件,分别表示等待出队的条件和等待入队的条件),读操作和写操作都需要先获取到 ReentrantLock 独占锁才能进行下一步操作。进行读操作时如果队列为空,线程就会进入到读线程专属的 notEmpty 的 Condition 的队列中去排队,等待写线程写入新的元素;同理,如果队列已满,这个时候写操作的线程会进入到写线程专属的 notFull 队列中去排队,等待读线程将队列元素移除并腾出空间。

3、源码分析

put

代码语言:java复制
    /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        //检查元素是否为空
        checkNotNull(e);
        //加锁(可响应中断)
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //count 等于数组的长度,代表已经满了,进行阻塞等待
            while (count == items.length)
                notFull.await();
            //元素入队
            enqueue(e);
        } finally {
            //释放锁
            lock.unlock();
        }
    }

    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (  putIndex == items.length)
            putIndex = 0;
        count  ;
        notEmpty.signal();
    }

take

代码语言:java复制
 public E take() throws InterruptedException {
     //加锁(可响应中断)
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //队列为空则阻塞
            while (count == 0)
                notEmpty.await();
            //出队
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

LinkedBlockingQueue

基于链表的阻塞队列,使用链表存储数据,容量默认就为整型的最大值 Integer.MAX_VALUE是一个无界队列;也可以通过构造方法中的capacity设置最大元素数量,所以也可以作为有界队列

SynchronousQueue

带你了解下SynchronousQueue

一种没有缓冲的队列,生产者产生的数据直接会被消费者获取并且立刻消费

容量为 0,所以没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。

SynchronousQueue 实际上它不是一个真正的队列,因为SynchronousQueue没有容量,因为它不需要去持有元素,它所做的就是直接传递(direct handoff)。由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的

Executors.newCachedThreadPool() 就是使用了SynchronousQueue

代码语言:java复制
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

PriorityBlockingQueue

是一个支持优先级的无界阻塞队列,可以通过自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则。底层基于数组实现,是一个无界队列。

它的 take 方法在队列为空的时候会阻塞,但是正因为它是无界队列,而且会自动扩容,所以它的队列永远不会满,所以它的 put 方法永远不会阻塞,添加操作始终都会成功

DelayQueue

延迟队列,其中的元素只有到了其指定的延迟时间,才能够从队列中出队

六、非阻塞队列ConcurrentLinkedQueue

原理

非阻塞队列 ConcurrentLinkedQueue 使用链表作为数据结构

使用 CAS 非阻塞算法 不停重试,来实现线程安全,适合用在不需要阻塞功能,且并发不是特别剧烈的场景

源码分析

代码语言:java复制
    /**
     * Inserts the specified element at the tail of this queue.
     * As the queue is unbounded, this method will never return {@code false}.
     *
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);
        //循环
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }
代码语言:java复制
     boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

运用了 UNSAFE.compareAndSwapObject 方法来完成 CAS 操作,而 compareAndSwapObject 是一个 native 方法,最终会利用 CPU 的 CAS 指令保证其不可中断

我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!

0 人点赞