阻塞队列BlockQueue

2024-01-06 09:33:29 浏览数 (2)

阻塞队列BlockingQueue

BlockingQueue是Java并发编程中的一个关键接口,位于java.util.concurrent包下。它提供了一种在多线程环境中安全地共享数据的机制,特别适用于生产者-消费者模型和任务调度等场景。在BlockingQueue中,生产者线程将数据放入队列,而消费者线程则从队列中取出数据,这样可以很好地实现线程之间的协调和通信。

1. 概述

BlockingQueue接口扩展了Queue接口,其中包含了阻塞操作,这意味着当队列为空或满时,某些操作将被阻塞。它主要用于解决多线程间数据共享和同步的问题,提供了一种高效的方式来进行线程之间的通信。

2. 主要方法

2.1 放入元素

put(E e) 将指定的元素插入此队列,如果队列已满,则等待空间变为可用。

代码语言:javascript复制
BlockingQueue<String> queue = new LinkedBlockingQueue<>(5);
queue.put("Element");

offer(E e, long timeout, TimeUnit unit) 将指定的元素插入此队列,等待指定的等待时间,如果队列仍然是满的,则返回false。

代码语言:javascript复制
boolean offered = queue
        }

    @Override
    public void run() {
        // Task execution logic
        System.out.println("Executing task: "   taskName);
    }
}

class CustomThreadPool {
    private final BlockingQueue<Runnable> taskQueue;
    private final List<WorkerThread> workerThreads;

    public CustomThreadPool(int corePoolSize, int maxPoolSize, BlockingQueue<Runnable> taskQueue) {
        this.taskQueue = taskQueue;
        this.workerThreads = new ArrayList<>();

        for (int i = 0; i < corePoolSize; i  ) {
            WorkerThread workerThread = new WorkerThread();
            workerThreads.add(workerThread);
            workerThread.start();
        }
    }

    public void submitTask(Runnable task) {
        try {
            taskQueue.put(task);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private class WorkerThread extends Thread {
        @Override
        public void run() {
            try {
                while (true) {
                    Runnable task = taskQueue.take();
                    task.run();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

3.实现类

BlockingQueue的实现类在java.util.concurrent包中,常用的有ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue等。

  1. ArrayBlockingQueue: ArrayBlockingQueue是一个有界阻塞队列,内部使用数组来存储元素。它的大小是固定的,在创建时需要指定容量。当队列满时,生产者线程将被阻塞,直到有空间可用;当队列为空时,消费者线程将被阻塞,直到有元素可用。
  2. LinkedBlockingQueue: LinkedBlockingQueue是一个可选有界阻塞队列,内部使用链表来存储元素。如果创建时不指定容量,它将使用Integer.MAX_VALUE作为默认容量。当队列满时,生产者线程将被阻塞,直到有空间可用;当队列为空时,消费者线程将被阻塞,直到有元素可用。
  3. SynchronousQueue: SynchronousQueue是一个没有缓冲的阻塞队列,每个插入操作必须等待另一个线程的移除操作,反之亦然。它是一个直接传输的阻塞队列,意味着插入操作会阻塞直到另一个线程调用移除操作,反之亦然。

4.补充方法

BlockingQueue提供了一系列方法来实现线程间的数据传输和共享:

  1. put():将元素插入队列尾部,如果队列已满,则阻塞等待空间可用。
  2. take():移除并返回队列头部的元素,如果队列为空,则阻塞等待元素可用。
  3. offer():尝试将元素插入队列尾部,如果队列已满,则立即返回false。
  4. poll():移除并返回队列头部的元素,如果队列为空,则立即返回null。
  5. offer(E e, long timeout, TimeUnit unit):尝试将元素插入队列尾部,如果队列已满,则等待指定的时间,如果超时仍未插入成功,则返回false。
  6. poll(long timeout, TimeUnit unit):移除并返回队列头部的元素,如果队列为空,则等待指定的时间,如果超时仍未获取到元素,则返回null。
  7. isEmpty():判断队列是否为空。
  8. isFull():判断队列是否已满。

BlockingQueue的应用场景:

  1. 生产者-消费者模式:BlockingQueue常用于生产者-消费者模式中,生产者线程向队列中插入数据,消费者线程从队列中取出数据,通过BlockingQueue的阻塞特性,可以有效控制生产者和消费者的速度,避免资源竞争和线程间的协作问题。
  2. 线程池:Java线程池中常用BlockingQueue来存放待执行的任务,当线程池的工作队列满时,新的任务将会被阻塞,直到有空闲线程来执行任务。
  3. 任务调度:BlockingQueue可以用于任务调度中,可以将需要执行的任务放入队列中,然后由多个工作线程从队列中取出任务并执行。
  4. 数据交换:BlockingQueue可以用于不同线程之间的数据交换,一个线程可以将数据放入队列中,另一个线程可以从队列中取出数据进行处理。

0 人点赞