深入浅出Semaphore

2023-01-11 13:01:20 浏览数 (2)

深入浅出Semaphore

Semaphore的基本使用

基本概念

Semaphore 信号量,也就是我们常说的信号灯,semaphore 可以控制同时访问的线程个数,通过 acquire 获取一个许可,如果没有就等到,通过 release 释放一个许可。有点类似限流的作用。比如某商场只有 5 个停车位,每个停车位只能停一辆车,如果这时候来了 10 辆车,就必须等前面有空的车位才能进入(类似于可以阻塞的队列)。

停车场案例

代码语言:javascript复制
public class SemaphoreDemo {  
    public static void main(String[] args) {  
        // 当前可以获得的最大许可数是 5 个
        // 最大并发数为 5 超过的线程会被阻塞
        Semaphore semaphore = new Semaphore(5);  
        for (int i = 1; i <= 10; i  ) {  
            new Car(i, semaphore).start();  
        }  
    }  
	  
    static class Car extends Thread {  
        int num;  
        Semaphore semaphore;  
		  
        public Car(int num, Semaphore semaphore) {  
            this.num = num;  
            this.semaphore = semaphore;  
        }  
		  
        @Override  
        public void run() {  
            try {  
                semaphore.acquire(); // 获得一个许可  
                System.out.println(num   "号车占用一个停车位");  
                TimeUnit.SECONDS.sleep(2);  
                System.out.println(num   "号车离开了");  
                semaphore.release(); // 释放许可  
            } catch (InterruptedException e) {  
                throw new RuntimeException(e);  
            }  
        }  
    }  
}

得到如下的运行结果:

代码语言:javascript复制
1号车占用一个停车位
2号车占用一个停车位
5号车占用一个停车位
3号车占用一个停车位
4号车占用一个停车位
2号车离开了
7号车占用一个停车位
1号车离开了
6号车占用一个停车位
5号车离开了
8号车占用一个停车位
4号车离开了
9号车占用一个停车位
3号车离开了
10号车占用一个停车位
6号车离开了
7号车离开了
8号车离开了
9号车离开了
10号车离开了

Semaphore源码分析

类关系图

构造函数

代码语言:javascript复制
// 猜都不用猜就知道 permits 一定存储在 AQS 的 state 变量中
public Semaphore(int permits) {
	// 实例化为非公平锁 -> state
    sync = new NonfairSync(permits);
}

acquire()方法

代码语言:javascript复制
// jdk 17
// Semaphore.java
abstract static class Sync extends AbstractQueuedSynchronizer {
	final int nonfairTryAcquireShared(int acquires) {
		// 自旋
	    for (;;) {  
	        int available = getState(); // 获取令牌池的 state 值
	        int remaining = available - acquires;  
	        if (remaining < 0 || 
		        // CAS 修改值
	            compareAndSetState(available, remaining))
	            // 修改成功才会返回 remaining
	            return remaining;  
	    }  
	}
}

public void acquire() throws InterruptedException {
	// 调用顶层 AQS 类内的方法
    sync.acquireSharedInterruptibly(1);  
}

// 在非公平锁中的处理方式
static final class NonfairSync extends Sync {
	NonfairSync(int permits) {  
	    super(permits);  
	}
	
	protected int tryAcquireShared(int acquires) {  
	    return nonfairTryAcquireShared(acquires);  
	}
}

///////////////////////////////////////////////////////

// AQS.java
public final void acquireSharedInterruptibly(int arg)  
    throws InterruptedException {  
    if (Thread.interrupted() || 
	    // 调用 Semaphore 中实现的 tryAcquireShared 方法
        (tryAcquireShared(arg) < 0 &&
         // 从 jdk 17 开始 oracle 对 acquire 方法进行了统一封装
         acquire(null, arg, true, true, false, 0L) < 0))  
        throw new InterruptedException();  
}

Semaphore 与 CountDownLatch 对 AQS 处理的唯一一个不同之处就在于 tryAcquireShared() 方法的实现上,Semaphore 是一种令牌的处理方式。

release()方法

代码语言:javascript复制
// jdk 17
// Semaphore.java
abstract static class Sync extends AbstractQueuedSynchronizer {
	protected final boolean tryReleaseShared(int releases) {  
	    for (;;) {  
	        int current = getState();  
	        int next = current   releases;  
	        if (next < current) // overflow  
	            throw new Error("Maximum permit count exceeded");  
	        if (compareAndSetState(current, next))  
	            return true;  
	    }  
	}
}

public void release() {  
    sync.releaseShared(1);  
}

///////////////////////////////////////////////////////

// AQS.java
public final boolean releaseShared(int arg) {  
    if (tryReleaseShared(arg)) {  
        signalNext(head);  
        return true;  
    }  
    return false;  
}

0 人点赞