1. 简介
生产者 & 消费者之间存在 强耦合问题
2. 解决方案
采用 生产者 & 消费者 模式,具体介绍如下:
3. 具体解决方式介绍
方式1:wait() / notify()
代码语言:javascript复制// Object类里的两个方法,所有Object子类都可使用这2个方法
// 对象的监视器对锁对象的锁定(也就是代码中的lock对象),注意是调用锁对象的wait() / nofity()
public class Test {
private static Integer count = 0;
private final Integer FULL = 5;
private static String lock = "lock";
public static void main(String[] args) {
Test t = new Test();
new Thread(t.new Producer()).start();
new Thread(t.new Consumer()).start();
new Thread(t.new Producer()).start();
new Thread(t.new Consumer()).start();
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i ) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
synchronized (lock) {
while (count == FULL) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
count ;
System.out.println("生产者" Thread.currentThread().getName()
"已生产完成,商品数量:" count);
lock.notifyAll();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i ) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
synchronized (lock) {
while (count == 0) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
count--;
System.out.println("消费者" Thread.currentThread().getName()
"已消费,剩余商品数量:" count);
lock.notifyAll();
}
}
}
}
}
// 测试结果
生产者Thread-0已生产完成,商品数量:1
生产者Thread-2已生产完成,商品数量:2
消费者Thread-1已消费,剩余商品数量:1
消费者Thread-3已消费,剩余商品数量:0
生产者Thread-0已生产完成,商品数量:1
消费者Thread-3已消费,剩余商品数量:0
生产者Thread-2已生产完成,商品数量:1
消费者Thread-1已消费,剩余商品数量:0
生产者Thread-0已生产完成,商品数量:1
生产者Thread-2已生产完成,商品数量:2
消费者Thread-1已消费,剩余商品数量:1
消费者Thread-3已消费,剩余商品数量:0
生产者Thread-0已生产完成,商品数量:1
消费者Thread-1已消费,剩余商品数量:0
生产者Thread-2已生产完成,商品数量:1
消费者Thread-3已消费,剩余商品数量:0
生产者Thread-0已生产完成,商品数量:1
消费者Thread-1已消费,剩余商品数量:0
生产者Thread-2已生产完成,商品数量:1
消费者Thread-3已消费,剩余商品数量:0
方式2:await() / signal()
代码语言:javascript复制// 对wait()/notify()的改进,功能更加强大、更适用于高级用户
// synchronized 托管给JVM执行
// 而lock是Java写的控制锁的代码
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Test {
private static Integer count = 0;// 缓冲区
private final Integer FULL = 5;
final Lock lock = new ReentrantLock(); // 此处采用 ReentrantLock,获得可重入锁
final Condition put = lock.newCondition();
final Condition get = lock.newCondition();
public static void main(String[] args) {
Test t = new Test();
new Thread(t.new Producer()).start();
new Thread(t.new Consumer()).start();
new Thread(t.new Consumer()).start();
new Thread(t.new Producer()).start();
}
// 生产者
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i ) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
//加锁
lock.lock();
try {
while (count == FULL) {
try {
put.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
count ;
System.out.println("生产者" Thread.currentThread().getName()
"已生产完成,商品数量: " count);
//通知消费者,现在可以消费
get.signal();
} finally {
lock.unlock();
}
}
}
}
// 消费者
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i ) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
lock.lock();
try {
while (count == 0) {
try {
get.await();
} catch (Exception e) {
e.printStackTrace();
}
}
count--;
System.out.println("消费者" Thread.currentThread().getName()
"已消费,剩余商品数量: " count);
put.signal();
} finally {
lock.unlock();
}
}
}
}
}
// 测试结果
生产者Thread-3已生产完成,商品数量: 1
生产者Thread-0已生产完成,商品数量: 2
消费者Thread-1已消费,剩余商品数量: 1
消费者Thread-2已消费,剩余商品数量: 0
生产者Thread-3已生产完成,商品数量: 1
生产者Thread-0已生产完成,商品数量: 2
消费者Thread-1已消费,剩余商品数量: 1
消费者Thread-2已消费,剩余商品数量: 0
生产者Thread-0已生产完成,商品数量: 1
生产者Thread-3已生产完成,商品数量: 2
消费者Thread-1已消费,剩余商品数量: 1
消费者Thread-2已消费,剩余商品数量: 0
生产者Thread-0已生产完成,商品数量: 1
生产者Thread-3已生产完成,商品数量: 2
消费者Thread-2已消费,剩余商品数量: 1
消费者Thread-1已消费,剩余商品数量: 0
生产者Thread-3已生产完成,商品数量: 1
生产者Thread-0已生产完成,商品数量: 2
消费者Thread-2已消费,剩余商品数量: 1
消费者Thread-1已消费,剩余商品数量: 0
方式3:(BlockingQueue)阻塞队列 系列方法
代码语言:javascript复制// 下面主要使用其中的 put()、take()
// put():将指定元素插入此队列中,将等待可用的空间(若有必要)
// take():获取并移除此队列的头部,在指定的等待时间前等待可用的元素(若有必要)
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Test {
private static Integer count = 0;
final BlockingQueue<Integer> bq = new ArrayBlockingQueue<Integer>(5);// 容量为5的阻塞队列
public static void main(String[] args) {
Test t = new Test();
new Thread(t.new Producer()).start();
new Thread(t.new Consumer()).start();
new Thread(t.new Consumer()).start();
new Thread(t.new Producer()).start();
}
// 生产者
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i ) {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
try {
bq.put(1);
count ;
System.out.println("生产者" Thread.currentThread().getName()
"已生产完成,商品数量:" count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 消费者
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i ) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
bq.take();
count--;
System.out.println("消费者" Thread.currentThread().getName()
"已消费,剩余商品数量:" count);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
方式4:信号量 系列方法(Semaphore)
代码语言:javascript复制// 实例使用
import java.util.concurrent.Semaphore;
public class Test {
int count = 0;
final Semaphore put = new Semaphore(5);// 初始令牌个数
// 注:同步令牌(notFull.acquire())必须在互斥令牌(mutex.acquire())前面获得;若先得到互斥锁再发生等待,会造成死锁。
final Semaphore get = new Semaphore(0);
final Semaphore mutex = new Semaphore(1);
public static void main(String[] args) {
Test t = new Test();
new Thread(t.new Producer()).start();
new Thread(t.new Consumer()).start();
new Thread(t.new Consumer()).start();
new Thread(t.new Producer()).start();
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i ) {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
try {
put.acquire();// 注意顺序
mutex.acquire();
count ;
System.out.println("生产者" Thread.currentThread().getName()
"已生产完成,商品数量:" count);
} catch (Exception e) {
e.printStackTrace();
} finally {
mutex.release();
get.release();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i ) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
get.acquire();// 注意顺序
mutex.acquire();
count--;
System.out.println("消费者" Thread.currentThread().getName()
"已消费,剩余商品数量:" count);
} catch (Exception e) {
e.printStackTrace();
} finally {
mutex.release();
put.release();
}
}
}
}
}
// 测试结果
生产者Thread-0已生产完成,商品数量:1
消费者Thread-2已消费,剩余商品数量:0
生产者Thread-3已生产完成,商品数量:1
消费者Thread-1已消费,剩余商品数量:0
生产者Thread-0已生产完成,商品数量:1
生产者Thread-3已生产完成,商品数量:2
消费者Thread-2已消费,剩余商品数量:1
消费者Thread-1已消费,剩余商品数量:0
生产者Thread-0已生产完成,商品数量:1
生产者Thread-3已生产完成,商品数量:2
消费者Thread-2已消费,剩余商品数量:1
消费者Thread-1已消费,剩余商品数量:0
生产者Thread-0已生产完成,商品数量:1
生产者Thread-3已生产完成,商品数量:2
消费者Thread-2已消费,剩余商品数量:1
消费者Thread-1已消费,剩余商品数量:0
生产者Thread-0已生产完成,商品数量:1
生产者Thread-3已生产完成,商品数量:2
消费者Thread-2已消费,剩余商品数量:1
消费者Thread-1已消费,剩余商品数量:0
方式5:PipedInputStream / PipedOutputStream
代码语言:javascript复制import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class Test {
final PipedInputStream pis = new PipedInputStream();
final PipedOutputStream pos = new PipedOutputStream();
public static void main(String[] args) {
Test t = new Test();
new Thread(t.new Producer()).start();
new Thread(t.new Consumer()).start();
}
class Producer implements Runnable {
@Override
public void run() {
try {
pis.connect(pos);
} catch (IOException e) {
e.printStackTrace();
}
try {
while (true) { // 不断的产生数据
int n = (int) (Math.random() * 255);
System.out.println("生产者" Thread.currentThread().getName()
"已生产完成,商品数量:" n);
pos.write(n);
pos.flush();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
pis.close();
pos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
int n;
try {
while (true) {
n = pis.read();
System.out.println("消费者" Thread.currentThread().getName()
"已消费,剩余商品数量:" n);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
pis.close();
pos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
// 测试结果
生产者Thread-0已生产完成,商品数量:6
生产者Thread-0已生产完成,商品数量:158
生产者Thread-0已生产完成,商品数量:79
生产者Thread-0已生产完成,商品数量:119
生产者Thread-0已生产完成,商品数量:93
生产者Thread-0已生产完成,商品数量:213
生产者Thread-0已生产完成,商品数量:151
生产者Thread-0已生产完成,商品数量:101
生产者Thread-0已生产完成,商品数量:125
生产者Thread-0已生产完成,商品数量:109
生产者Thread-0已生产完成,商品数量:67
生产者Thread-0已生产完成,商品数量:109
生产者Thread-0已生产完成,商品数量:132
生产者Thread-0已生产完成,商品数量:139
...
至此,关于Java解决生产者、消费者问题的五种实现方式讲解完毕。