概述
Java中线程通信协作的最常见的两种方式:
- syncrhoized加锁的线程的Object类的wait()/notify()/notifyAll()
- ReentrantLock类加锁的线程的Condition类的await()/signal()/signalAll()
线程间直接的数据交换:
- 通过管道进行线程间通信:1)字节流;2)字符流
可参考: Java多线程编程核心技术
场景
场景假设:
一个工作台,两个工人: Worker A 和 Workder B .
约定,Worker A 生产货物到工作台上, Workder B 从工作台 取走(消费)货物。
- 当 工作台上没有货物时,Worker A 才生产货物,否则等待Worker B 取走(消费)货物。
- 当 工作台上有货物时, Woker B 才从工作台取走(消费)货物,否则等待Worker A 生产货物
引子
我们先来看下线程之间不通信的情况 (错误示例)
代码语言:javascript复制package com.artisan.test;
public class ProduceConsumeWrongDemo {
// 锁
private final Object LOCK = new Object();
// 模拟多线程间需要通信的数据 i
private int i = 0 ;
public void produce() throws InterruptedException {
// 加锁
synchronized (LOCK){
System.out.println("produce:" i );
Thread.sleep(1_000);
}
}
public void consume() throws InterruptedException{
// 加锁
synchronized (LOCK){
System.out.println("consume:" i);
Thread.sleep(1_000);
}
}
public static void main(String[] args) throws InterruptedException{
ProduceConsumeWrongDemo pc = new ProduceConsumeWrongDemo();
// 生产线程
new Thread(()->{
while (true){
try {
pc.produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 消费线程
new Thread(()->{
while (true){
try {
pc.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
运行结果:
代码语言:javascript复制"E:Program FilesJavajdk1.8.0_161binjava" "-javaagent:E:Program FilesJetBrainsIntelliJ IDEA 2017.2.4libidea_rt.jar=52137:E:Program FilesJetBrainsIntelliJ IDEA 2017.2.4bin" -Dfile.encoding=UTF-8 -classpath "E:Program FilesJavajdk1.8.0_161jrelibcharsets.jar;E:Program FilesJavajdk1.8.0_161jrelibdeploy.jar;E:Program FilesJavajdk1.8.0_161jrelibextaccess-bridge-64.jar;E:Program FilesJavajdk1.8.0_161jrelibextcldrdata.jar;E:Program FilesJavajdk1.8.0_161jrelibextdnsns.jar;E:Program FilesJavajdk1.8.0_161jrelibextjaccess.jar;E:Program FilesJavajdk1.8.0_161jrelibextjfxrt.jar;E:Program FilesJavajdk1.8.0_161jrelibextlocaledata.jar;E:Program FilesJavajdk1.8.0_161jrelibextnashorn.jar;E:Program FilesJavajdk1.8.0_161jrelibextsunec.jar;E:Program FilesJavajdk1.8.0_161jrelibextsunjce_provider.jar;E:Program FilesJavajdk1.8.0_161jrelibextsunmscapi.jar;E:Program FilesJavajdk1.8.0_161jrelibextsunpkcs11.jar;E:Program FilesJavajdk1.8.0_161jrelibextzipfs.jar;E:Program FilesJavajdk1.8.0_161jrelibjavaws.jar;E:Program FilesJavajdk1.8.0_161jrelibjce.jar;E:Program FilesJavajdk1.8.0_161jrelibjfr.jar;E:Program FilesJavajdk1.8.0_161jrelibjfxswt.jar;E:Program FilesJavajdk1.8.0_161jrelibjsse.jar;E:Program FilesJavajdk1.8.0_161jrelibmanagement-agent.jar;E:Program FilesJavajdk1.8.0_161jrelibplugin.jar;E:Program FilesJavajdk1.8.0_161jrelibresources.jar;E:Program FilesJavajdk1.8.0_161jrelibrt.jar;D:IdeaProjectsmvctargetclasses" com.artisan.test.ProduceConsumeWrongDemo
produce:0
produce:1
consume:2
consume:2
consume:2
produce:2
consume:3
consume:3
consume:3
produce:3
produce:4
produce:5
consume:6
....
....
....
....
....
....
....
很明显的可以看到,数据都是错乱的,因为没有线程间的通信,全凭CPU调度,生产线程和消费线程都很随意,数据一团糟糕,那该如何改进呢?
synchronized wait/notify机制
- wait()——让当前线程 (
Thread.concurrentThread()
方法所返回的线程) 释放对象锁并进入等待(阻塞)状态。 - notify()——唤醒一个正在等待相应对象锁的线程,使其进入就绪队列,以便在当前线程释放锁后竞争锁,进而得到CPU的执行。
- notifyAll()——唤醒所有正在等待相应对象锁的线程,使它们进入就绪队列,以便在当前线程释放锁后竞争锁,进而得到CPU的执行。
为了解决上面的问题,我们先来了解下synchronized wait/notify .
- wait()、notify()和notifyAll()方法是本地方法,并且为final方法,无法被重写。
- 调用某个对象的wait()方法能让当前线程阻塞,并且当前线程必须拥有此对象的monitor(即锁). 因此调用wait()方法必须在同步块或者同步方法中进行(synchronized块或者synchronized方法)。如果当前线程没有这个对象的锁就调用wait()方法,则会抛出IllegalMonitorStateException.
- 调用某个对象的wait()方法,相当于让当前线程交出(释放)此对象的monitor,然后进入等待状态,等待后续再次获得此对象的锁
- 调用某个对象的notify()方法能够唤醒一个正在等待这个对象的monitor的线程,如果有多个线程都在等待这个对象的monitor,则只能唤醒其中一个线程. 同样的,调用某个对象的notify()方法,当前线程也必须拥有这个对象的monitor,因此调用notify()方法必须在同步块或者同步方法中进行(synchronized块或者synchronized方法)。
- 调用notifyAll()方法能够唤醒所有正在等待这个对象的monitor的线程
- notify()和notifyAll()方法只是唤醒等待该对象的monitor的线程,并不决定哪个线程能够获取到monitor。
举个例子: 假如有三个线程Thread1、Thread2和Thread3都在等待对象objectA的monitor,此时Thread4拥有对象objectA的monitor,当在Thread4中调用objectA.notify()方法之后,Thread1、Thread2和Thread3只有一个能被唤醒。
注意,被唤醒不等于立刻就获取了objectA的monitor。
假若在Thread4中调用objectA.notifyAll()方法,则Thread1、Thread2和Thread3三个线程都会被唤醒,至于哪个线程接下来能够获取到objectA的monitor就具体依赖于操作系统的调度了。
一个线程被唤醒不代表立即获取了对象的monitor,只有等调用完notify()或者notifyAll()并退出synchronized块,释放对象锁后,其余线程才可获得锁执行。
synchronized wait/notify 改造
代码语言:javascript复制package com.artisan.test;
public class ProduceConsumerDemo {
// 对象监视器-锁
private final Object LOCK = new Object();
// 是否生产出数据的标识
private boolean isProduced = false;
// volatile 确保可见性, 假设 i 就是生产者生产的数据
private volatile int i = 0 ;
public void produce(){
// 加锁
synchronized (LOCK){
if (isProduced){
try {
// 让当前线程 (Thread.concurrentThread() 方法所返回的线程) 释放对象锁并进入等待(阻塞)状态
// 如果已经生产,则等待
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
// 生产数据
i ;
System.out.println("Produce:" i);
// 唤醒一个正在等待相应对象锁的线程,使其进入就绪队列,以便在当前线程释放锁后竞争锁,进而得到CPU的执行
// 通知等待的Worker B 来消费数据
LOCK.notify();
// 将生产标识置为true
isProduced = true;
}
}
}
public void consume(){
// 加锁
synchronized (LOCK){
if (isProduced){
// 消费数据
System.out.println("Consume:" i);
// 唤醒一个正在等待相应对象锁的线程,使其进入就绪队列,以便在当前线程释放锁后竞争锁,进而得到CPU的执行
// 通知 等待的Wokrer A 生产数据
LOCK.notify();
// 已经消费完了,将生产标识置为false
isProduced = false;
}else{
try {
// 让当前线程 (Thread.concurrentThread() 方法所返回的线程) 释放对象锁并进入等待(阻塞)状态
// 未生产,Worker B等待
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
ProduceConsumerDemo produceConsumerDemo = new ProduceConsumerDemo();
new Thread(){
@Override
public void run() {
while(true) produceConsumerDemo.produce();
}
}.start();
new Thread(){
@Override
public void run() {
while(true) produceConsumerDemo.consume();
}
}.start();
}
}
当然了并不是绝对的上面的对应关系(这里只是为了演示),因为notify唤醒后,线程只是进入Runnable状态,至于哪个线程能进入到running状态,就看哪个线程能抢到CPU的资源了。 JVM规范并没有规定哪个线程优先得到执行权,每个JVM的实现都是不同的
单个生产者 单个消费者,运行OK
代码语言:javascript复制.....
.....
.....
Produce:1171
Consume:1171
Produce:1172
Consume:1172
Produce:1173
Consume:1173
Produce:1174
Consume:1174
Produce:1175
Consume:1175
Produce:1176
Consume:1176
.....
.....
.....
问题
单个生产者 单个消费者 上面的代码是没有问题的,加入有多个生产者 和多个消费者呢?
我们来复用上面的代码来演示下 ,其他代码保持不变,仅在main方法中改造下,两个生产者,两个消费者
代码语言:javascript复制 Stream.of("P1","P2").forEach(n-> new Thread(){
@Override
public void run() {
while(true) produceConsumerDemo.produce();
}
}.start());
Stream.of("C1","C2").forEach(n->new Thread(){
@Override
public void run() {
while(true) produceConsumerDemo.consume();
}
}.start());
下篇博客,我们来分析下原因,并给出解决办法