信号量用于解决进程的互斥,同步问题,可以使用信号量来表示系统中某种资源的数量。
信号量机制的发明者迪杰特斯拉也是我们的老熟人了。
整型信号量
用一个整型变量作为信号量,表示系统中的某种资源的数量。
对于信号量的有两种操作P操作和V操作。
如下为伪代码说明P V操作的细节。
代码语言:javascript复制public class Semaphore{
int s;
// 资源获取
void P(){
// 当前没有可用资源时,当前进程就会在此死等,直到其他进程使用完了释放了资源。
while(s <= 0){};
s--;
}
// 资源释放
void V(){
s ;
}
}
注意上述的P操作和V操作都是原子性的。
上述代码中的若把s为当前可用资源的数量,P操作表示获取一个资源,V操作表示释放一个资源。使用资源前先进行P操作,使用完后进行V操作。
我们也发现了,在无可用资源时,该进程会一直占着处理机死等。aging情况下应该让进程进入阻塞态让出处理机资源,当有资源了就唤醒他,于是引入了记录型信号量的机制。
记录型信号量
伪代码描述如下:
代码语言:javascript复制public class Semaphore{
int s;
Queue waitQueue; // 等待队列
void P(){
s--;
if(s < 0){
waitQueue.block();
}
}
void V(){
s ;
if(s <= 0){
waitQueue.wakeup();
}
}
}
P操作先申请资源,再判断s,若发现无资源就使用block原语使当前进程由运行态转为阻塞态,并挂到等待队列的队尾;
V操作先释放资源,然后判断是否存在别进程等待该资源,若存在则使用wakeup原语唤醒等待队列的一个进程,将该进程由阻塞态转为就绪态。
当一个进程暂时得不到资源时,其就会自我阻塞,其就放弃处理机,将自己挂到等待队列上,与计数型信号量相比遵循了“让权等待”,即不会出现忙等。
信号量实现进程互斥
初始化时信号量的s为1,认为只有一块资源。
在访问临界区之前执行p操作申请资源;
访问完之后执行v操作释放资源。
利用Java信号量模拟模拟互斥访问临界区代码如下:
代码语言:javascript复制 public static class T implements Runnable{
private Semaphore semaphore;
public T(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() "进入临界区");
Thread.sleep(10);
System.out.println(Thread.currentThread().getName() "正在访问临界区");
Thread.sleep(10);
System.out.println(Thread.currentThread().getName() "访问完了");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
semaphore.release();
}
}
public static void main(String[] args) {
Semaphore sema = new Semaphore(1);
Runnable r = new T(sema);
for(int i = 0; i < 5; i ) {
new Thread(r).start();
}
}
执行结果如下,可以发现每个线程是互斥访问的,不存在多个线程同时进入临界区的情况。
代码语言:javascript复制Thread-1进入临界区
Thread-1正在访问临界区
Thread-1访问完了
Thread-2进入临界区
Thread-2正在访问临界区
Thread-2访问完了
Thread-4进入临界区
Thread-4正在访问临界区
Thread-4访问完了
Thread-0进入临界区
Thread-0正在访问临界区
Thread-0访问完了
Thread-3进入临界区
Thread-3正在访问临界区
Thread-3访问完了
若不使用信号量,执行结果如下:
代码语言:javascript复制Thread-0进入临界区
Thread-1进入临界区
Thread-2进入临界区
Thread-3进入临界区
Thread-4进入临界区
Thread-2正在访问临界区
Thread-4正在访问临界区
Thread-3正在访问临界区
Thread-0正在访问临界区
Thread-1正在访问临界区
Thread-1访问完了
Thread-3访问完了
Thread-0访问完了
Thread-4访问完了
Thread-2访问完了
信号量实现进程同步
进程同步:让各并发的进程按照我们事先规定的顺序执行,即始终保证“后操作”在“前操作”之后发生。
初始化信号量初值为0,可以认为初始无资源,只有执行一次V操作才会获得一个资源,此时P操作才可以执行。
利用该特性在“前操作”之后使用V操作,“后操作”之前使用P操作。若后操作想先执行其必须执行一次P操作此时由于不存在资源,因此该进程在此阻塞住了,只有先执行完”前操作”,再执行完“前操作”之后的V操作,此时才会把之前阻塞态的进程转到就绪态,“后操作”才得以执行。
下面用一个具体案例说明。假设有两个人一个人专门倒水,一个人专门喝水,因此倒水的人的倒水操作应该为“前操作”,喝水的人喝水操作为“后操作”,利用Java的信号量实现该问题。
代码语言:javascript复制 // 倒水者
public static class Maker implements Runnable{
Semaphore semaphore;
public Maker(Semaphore semaphore) {
this.semaphore = semaphore;
}
public void make() {
System.out.println("我倒水了");
}
@Override
public void run() {
make();
// 前操作之后使用V操作
semaphore.release();
}
}
// 喝水者
public static class Drinker implements Runnable{
Semaphore semaphore;
public Drinker(Semaphore semaphore) {
this.semaphore = semaphore;
}
public void drink() {
System.out.println("我喝水了");
}
@Override
public void run() {
try {
semaphore.acquire();
drink();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(0);
Runnable maker = new Maker(semaphore);
Runnable drinker = new Drinker(semaphore);
new Thread(maker).start();
new Thread(drinker).start();
}
信号量实现生产者消费者模型
生产者消费者模型介绍:
系统中一组生产者进程一组消费者进程,生产者每次生产一个产品放入缓冲区,消费者一次从缓冲区中拿出一个消费掉。
生产者,消费者共享一个初始为空,最大可放N个商品的缓冲区。缓冲区不满时才可以生产,缓冲区不空时才可以消费,此外缓冲区属于临界资源,各个进程应该互斥访问。
大体思路:
我们发现生产者消费者模型中共两个同步关系,一个互斥关系。
生产者每次生产消耗一块缓冲区,获得一个商品;消费者每次消耗一个商品,获得一块缓冲区。
同步关系一:商品这种资源为空时,对于商品这种资源生产者生产为前操作,消费者消费为后操作;
同步关系二:缓冲区这种资源为空时,对于缓冲区这种资源消费者消费为前操作,生产者生产为后操作。
互斥关系:每次生产/消费都应该互斥的访问缓冲区。
因此我们需要使用三个信号量,分别为goodsSemaphore,bufferSemaphore,mutexSemaphore来维持上述三种关系。其初值分别为0, n, 1。
具体实现:
代码语言:javascript复制public class ProductorConsummer {
public static class Productor implements Runnable{
private Semaphore goodsSemaphore;
private Semaphore bufferSemaphore;
private Semaphore mutexSemaphore;
public Productor(Semaphore goodsSemaphore, Semaphore bufferSemaphore, Semaphore mutexSemaphore) {
this.goodsSemaphore = goodsSemaphore;
this.bufferSemaphore = bufferSemaphore;
this.mutexSemaphore = mutexSemaphore;
}
public void product() {
System.out.println("生产者" Thread.currentThread().getName() "生产了一个商品");
}
@Override
public void run() {
try {
while(true) {
// 对于缓冲区而言,消费为前操作,生产为后操作,因此后操作之前执行P操作
bufferSemaphore.acquire();
mutexSemaphore.acquire();
product();
mutexSemaphore.release();
// 对于商品而言,生产为前操作,消费为后操作,因此前操作之后执行V操作
goodsSemaphore.release();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static class Consummer implements Runnable{
private Semaphore goodsSemaphore;
private Semaphore bufferSemaphore;
private Semaphore mutexSemaphore;
public Consummer(Semaphore goodsSemaphore, Semaphore bufferSemaphore, Semaphore mutexSemaphore) {
this.goodsSemaphore = goodsSemaphore;
this.bufferSemaphore = bufferSemaphore;
this.mutexSemaphore = mutexSemaphore;
}
public void consum() {
System.out.println("消费者" Thread.currentThread().getName() "消费了一个商品");
}
@Override
public void run() {
try {
while(true) {
// 对于商品而言,生产为前操作,消费为后操作,因此后操作之后执行商品信号量P操作
goodsSemaphore.acquire();
mutexSemaphore.acquire();
consum();
mutexSemaphore.release();
// 对于缓冲区而言,消费为前操作,生产为后操作,因此前操作执行之后执行缓冲区信号量的V操作
bufferSemaphore.release();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) {
int n = 3;
Semaphore goodsSemaphore = new Semaphore(0);
Semaphore bufferSemaphore = new Semaphore(n);
Semaphore mutexSemaphore = new Semaphore(1);
Runnable productor = new Productor(goodsSemaphore, bufferSemaphore, mutexSemaphore);
Runnable consummer = new Consummer(goodsSemaphore, bufferSemaphore, mutexSemaphore);
for(int i = 0; i < 3; i ) {
new Thread(consummer).start();
new Thread(productor).start();
}
}
}
上述代码中两个p操作不能交换顺序,否则会产生死锁。分析如下
代码语言:javascript复制prodctor{
mutex.p(); (1)
buffer.p(); (2)
生产一件商品
...
}
consumer{
mutex.p();(3)
goods.p();(4)
消费一件商品
...
}
按照(3)(4)(1)的顺序执行,执行完(3)(4)mutex = 0,goods = -1,此时其等待一个生产者生产一个商品其才可以执行后续部分,最终再释放mutex;然而生产者线程根本进不去缓冲区也阻塞住了因此产生了死锁。
信号量实现吸烟者问题
当前系统中总共四个进程,三个吸烟者进程,一个提供者进程。每次吸烟需要三种材料烟草、纸以及胶水。这三个人各拥有一样,供应者无限制的提供其余两样,抽烟者抽完后供应者会放另外两种材料,从而达到让这三位爷轮流抽烟的目的。
我们发现,只有供应者放了抽烟者才能抽,只有抽烟者抽了供应者才能接着放。
其中存在四个同步关系,
同步关系一:供应者放了组合一抽烟者1号才能抽烟,对于组合一这种资源供应者放为前操作,抽烟者1号抽为后操作;
同步关系二:供应者放了组合二抽烟者2号才能抽烟,同理;
同步关系三:供应者放了组合三抽烟者3号才能抽烟,同理;
同步关系四:抽烟者1/2/3号只有拿了桌上的材料,供应者才能接着放,对于桌面这种资源,抽烟者拿为先操作,供应者放为后操作。
此外对于无论是放还是拿都应该满足互斥访问桌面这块区域。
实现代码如下:
代码语言:javascript复制public class Smoking {
public static class Smoker implements Runnable{
Semaphore material;
Semaphore buffer;
Semaphore mutex;
public Smoker(Semaphore material, Semaphore buffer, Semaphore mutex) {
this.material = material;
this.buffer = buffer;
this.mutex = mutex;
}
public void smok() {
System.out.println(Thread.currentThread().getName() " 抽烟了");
}
public void run() {
try {
while(true) {
material.acquire();
mutex.acquire();
smok();
mutex.release();;
buffer.release();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static class Provider implements Runnable{
Semaphore[] materials; // 每个位置代表一种组合
Semaphore buffer;
Semaphore mutex;
public Provider(Semaphore[] materials, Semaphore buffer, Semaphore mutex) {
this.materials = materials;
this.buffer = buffer;
this.mutex = mutex;
}
public void provide1() {
System.out.println(Thread.currentThread().getName() "提供组合一");
}
public void provide2() {
System.out.println(Thread.currentThread().getName() "提供组合二");
}
public void provide3() {
System.out.println(Thread.currentThread().getName() "提供组合三");
}
@Override
public void run() {
try {
while(true) {
buffer.acquire();
mutex.acquire();
provide1();
mutex.release();
materials[0].release();
buffer.acquire();
mutex.acquire();
provide2();
mutex.release();
materials[1].release();
buffer.acquire();
mutex.acquire();
provide3();
mutex.release();
materials[2].release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Semaphore[] materials = new Semaphore[]{new Semaphore(0),new Semaphore(0),new Semaphore(0)};
Semaphore buffer = new Semaphore(1);
Semaphore mutex = new Semaphore(1);
new Thread(new Provider(materials, buffer, mutex)).start(); // 提供着
for(Semaphore material : materials) {
new Thread(new Smoker(material, buffer, mutex)).start();
}
}
}
信号量实现哲学家进餐问题
一张大圆桌子上坐着五位名声显赫的哲学家,他们中间五个空隙处放着五只筷子,哲学家要想吃放,其第一步拿到他左手边的筷子,第二步拿到他右手边的筷子,然后可以开始吃饭。
由于存在5只筷子,因此使用5个信号量表示,其初值均为1。5个哲学家可以看成5个进程。代码实现如下:
代码语言:javascript复制/**
* 哲学家进餐问题
* @author wg
* @date 2020/07/07
*/
public class PhilosophersDinning {
public static class Philosopher implements Runnable{
// 哲学家id
int id;
// chopsticks[id] 为其左手边的筷子
// chopsticks[(id 1 )% 5] 为其右手边的筷子
Semaphore[] chopsticks;
public Philosopher(Semaphore[] chopsticks, int id) {
this.chopsticks = chopsticks;
this.id = id;
}
public void eat() {
System.out.println(id "号哲学家正在进餐");
}
@Override
public void run() {
try {
// 拿起左手边筷子
chopsticks[id].acquire();
Thread.sleep(1);
// 拿起右手边筷子
chopsticks[(id 1) % chopsticks.length].acquire();
eat();
// 放下左手边筷子
chopsticks[(id 1) % chopsticks.length].release();
// 放下右手边筷子
chopsticks[id].release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Semaphore[] chopsticks = new Semaphore[5];
for(int i = 0; i < 5; i ) {
chopsticks[i] = new Semaphore(1);
}
for(int i = 0; i < 5; i ) {
new Thread(new Philosopher(chopsticks, i)).start();
}
}
}
很容易分析得到当每个哲学家都拿起自己左手边的筷子时,就会陷入死锁。
信号量实现阻塞容器
以阻塞队列为例,利用信号量 queue 实现阻塞队列。
对于阻塞队列当队列中没元素时,从队列中取元素的操作不应该被失败而是阻塞该进程,当别的进程放进去元素后该进程由阻塞态变为就绪态,然后从中取出元素。当阻塞队列满了时,往队列中加元素的操作应该被阻塞。
我们发现存在两种资源,元素 和 空间 ,为其定义两个信号量记做itemS,和bufferS,此外为了互斥访问队列还定义互斥量mutexS,初值如下:
itemS.s = 0, bufferS.s = n, mutexS = 1。
其中n为队列的最大容量。
思路如下:
把信号量itemS和队列中的元素逻辑上相对应,在执行获取队列中元素之前,先执行itemS.P(),若此时队列中无元素也就意味着itemS的值为0,该进程就此阻塞。执行添加操作后再执行itemS.V(),如此队列中元素多了一个,itemS中资源亦多了一个,此时之前阻塞住的进程就可以恢复了。如此解决了队列为空时取元素的场景,对于队列满了时加元素的场景同理,就不多加赘述了。
实现代码如下:
代码语言:javascript复制import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Semaphore;
public class BlockingQueue<E> {
private Semaphore itemS;
private Semaphore bufferS;
private Semaphore mutex;
private Queue<E> data;
public BlockingQueue(int boundary) {
this.itemS = new Semaphore(0);
this.bufferS = new Semaphore(boundary);
this.mutex = new Semaphore(1);
this.data = new LinkedList<E>();
}
public void add(E element) throws InterruptedException {
bufferS.acquire();
mutex.acquire();
data.add(element);
mutex.release();
itemS.release();
}
public E remove() throws InterruptedException {
E element = null;
itemS.acquire();
mutex.acquire();
element = data.remove();
mutex.release();
bufferS.release();
return element;
}
}
测试代码如下
代码语言:javascript复制class Adder implements Runnable{
private BlockingQueue<Integer> queue;
public Adder(BlockingQueue<Integer> queue) {
this.queue = queue;
}
public void run() {
for(int i = 0; i < 100000; i ) {
try {
queue.add(i);
System.out.println("adder 放入了" i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Remover implements Runnable{
private BlockingQueue<Integer> queue;
public Remover(BlockingQueue<Integer> queue) {
this.queue = queue;
}
public void run() {
for(int i = 0; i < 100000; i ) {
try {
System.out.println("remover 取出了" queue.remove());
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Test {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new BlockingQueue<Integer>(5);
for(int i = 0; i < 5; i ) {
// new Thread(new Adder(queue)).start();
new Thread(new Remover(queue)).start();
}
}
}