阻塞队列BlockingQueue
BlockingQueue
是Java并发编程中的一个关键接口,位于java.util.concurrent
包下。它提供了一种在多线程环境中安全地共享数据的机制,特别适用于生产者-消费者模型和任务调度等场景。在BlockingQueue
中,生产者线程将数据放入队列,而消费者线程则从队列中取出数据,这样可以很好地实现线程之间的协调和通信。
1. 概述
BlockingQueue
接口扩展了Queue
接口,其中包含了阻塞操作,这意味着当队列为空或满时,某些操作将被阻塞。它主要用于解决多线程间数据共享和同步的问题,提供了一种高效的方式来进行线程之间的通信。
2. 主要方法
2.1 放入元素
put(E e)
: 将指定的元素插入此队列,如果队列已满,则等待空间变为可用。
BlockingQueue<String> queue = new LinkedBlockingQueue<>(5);
queue.put("Element");
offer(E e, long timeout, TimeUnit unit)
: 将指定的元素插入此队列,等待指定的等待时间,如果队列仍然是满的,则返回false。
boolean offered = queue
}
@Override
public void run() {
// Task execution logic
System.out.println("Executing task: " taskName);
}
}
class CustomThreadPool {
private final BlockingQueue<Runnable> taskQueue;
private final List<WorkerThread> workerThreads;
public CustomThreadPool(int corePoolSize, int maxPoolSize, BlockingQueue<Runnable> taskQueue) {
this.taskQueue = taskQueue;
this.workerThreads = new ArrayList<>();
for (int i = 0; i < corePoolSize; i ) {
WorkerThread workerThread = new WorkerThread();
workerThreads.add(workerThread);
workerThread.start();
}
}
public void submitTask(Runnable task) {
try {
taskQueue.put(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private class WorkerThread extends Thread {
@Override
public void run() {
try {
while (true) {
Runnable task = taskQueue.take();
task.run();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
3.实现类
BlockingQueue的实现类在java.util.concurrent包中,常用的有ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue等。
- ArrayBlockingQueue: ArrayBlockingQueue是一个有界阻塞队列,内部使用数组来存储元素。它的大小是固定的,在创建时需要指定容量。当队列满时,生产者线程将被阻塞,直到有空间可用;当队列为空时,消费者线程将被阻塞,直到有元素可用。
- LinkedBlockingQueue: LinkedBlockingQueue是一个可选有界阻塞队列,内部使用链表来存储元素。如果创建时不指定容量,它将使用Integer.MAX_VALUE作为默认容量。当队列满时,生产者线程将被阻塞,直到有空间可用;当队列为空时,消费者线程将被阻塞,直到有元素可用。
- SynchronousQueue: SynchronousQueue是一个没有缓冲的阻塞队列,每个插入操作必须等待另一个线程的移除操作,反之亦然。它是一个直接传输的阻塞队列,意味着插入操作会阻塞直到另一个线程调用移除操作,反之亦然。
4.补充方法
BlockingQueue提供了一系列方法来实现线程间的数据传输和共享:
- put():将元素插入队列尾部,如果队列已满,则阻塞等待空间可用。
- take():移除并返回队列头部的元素,如果队列为空,则阻塞等待元素可用。
- offer():尝试将元素插入队列尾部,如果队列已满,则立即返回false。
- poll():移除并返回队列头部的元素,如果队列为空,则立即返回null。
- offer(E e, long timeout, TimeUnit unit):尝试将元素插入队列尾部,如果队列已满,则等待指定的时间,如果超时仍未插入成功,则返回false。
- poll(long timeout, TimeUnit unit):移除并返回队列头部的元素,如果队列为空,则等待指定的时间,如果超时仍未获取到元素,则返回null。
- isEmpty():判断队列是否为空。
- isFull():判断队列是否已满。
BlockingQueue的应用场景:
- 生产者-消费者模式:BlockingQueue常用于生产者-消费者模式中,生产者线程向队列中插入数据,消费者线程从队列中取出数据,通过BlockingQueue的阻塞特性,可以有效控制生产者和消费者的速度,避免资源竞争和线程间的协作问题。
- 线程池:Java线程池中常用BlockingQueue来存放待执行的任务,当线程池的工作队列满时,新的任务将会被阻塞,直到有空闲线程来执行任务。
- 任务调度:BlockingQueue可以用于任务调度中,可以将需要执行的任务放入队列中,然后由多个工作线程从队列中取出任务并执行。
- 数据交换:BlockingQueue可以用于不同线程之间的数据交换,一个线程可以将数据放入队列中,另一个线程可以从队列中取出数据进行处理。