生产消费者模型
多线程并发应用程序有一个经典的模型,即生产者/消费者模型。系统中,产生消息的是生产者,处理消息的是消费者,消费者和生产者通过一个缓冲区进行消息传递。生产者产生消息后提交到缓冲区,然后通知消费者可以从中取出消息进行处理。消费者处理完信息后,通知生产者可以继续提供消息。
要实现这个模型,关键在于消费者和生产者这两个线程进行同步。也就是说:只有缓冲区中有消息时,消费者才能够提取消息;只有消息已被处理,生产者才能产生消息提交到缓冲区。
生产消费者模式如下图。
缓冲区实际上是一个先进先出的队列,锁(信号量)的条件notEmpty和notFull。
Java实现:
代码语言:javascript复制import java.util.concurrent.*;
import java.util.concurrent.locks.*;
public class ConsumerProducer {
private static Buffer buffer = new Buffer();
public static void main(String[] args) {
// Create a thread pool with two threads
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(new ProducerTask());
executor.execute(new ConsumerTask());
executor.shutdown();
}
// A task for adding an int to the buffer
private static class ProducerTask implements Runnable {
public void run() {
try {
int i = 1;
while (true) {
System.out.println("生产者生产 " i);
buffer.write(i ); // Add a value to the buffer
// Put the thread into sleep
Thread.sleep((int)(Math.random() * 1000));
}
}
catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
// A task for reading and deleting an int from the buffer
private static class ConsumerTask implements Runnable {
public void run() {
try {
while (true) {
System.out.println("ttt消费者消费 " buffer.read());
// Put the thread into sleep
Thread.sleep((int)(Math.random() * 1000));
}
}
catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
// An inner class for buffer
private static class Buffer {
private static final int CAPACITY = 1; // buffer size
private java.util.LinkedList<Integer> queue =
new java.util.LinkedList<>();
// Create a new lock
private static Lock lock = new ReentrantLock();
// Create two conditions
private static Condition notEmpty = lock.newCondition();
private static Condition notFull = lock.newCondition();
public void write(int value) {
lock.lock(); // Acquire the lock
try {
while (queue.size() == CAPACITY) {
System.out.println("缓冲区已满!");
notFull.await();
}
queue.offer(value);
notEmpty.signal(); // Signal notEmpty condition
}
catch (InterruptedException ex) {
ex.printStackTrace();
}
finally {
lock.unlock(); // Release the lock
}
}
public int read() {
int value = 0;
lock.lock(); // Acquire the lock
try {
while (queue.isEmpty()) {
System.out.println("ttt缓冲区为空");
notEmpty.await();
}
value = queue.remove();
notFull.signal(); // Signal notFull condition
}
catch (InterruptedException ex) {
ex.printStackTrace();
}
finally {
lock.unlock(); // Release the lock
return value;
}
}
}
}
效果: