深入浅出Semaphore
Semaphore的基本使用
基本概念
Semaphore 信号量,也就是我们常说的信号灯,semaphore 可以控制同时访问的线程个数,通过 acquire 获取一个许可,如果没有就等到,通过 release 释放一个许可。有点类似限流的作用。比如某商场只有 5 个停车位,每个停车位只能停一辆车,如果这时候来了 10 辆车,就必须等前面有空的车位才能进入(类似于可以阻塞的队列)。
停车场案例
代码语言:javascript复制public class SemaphoreDemo {
public static void main(String[] args) {
// 当前可以获得的最大许可数是 5 个
// 最大并发数为 5 超过的线程会被阻塞
Semaphore semaphore = new Semaphore(5);
for (int i = 1; i <= 10; i ) {
new Car(i, semaphore).start();
}
}
static class Car extends Thread {
int num;
Semaphore semaphore;
public Car(int num, Semaphore semaphore) {
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire(); // 获得一个许可
System.out.println(num "号车占用一个停车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(num "号车离开了");
semaphore.release(); // 释放许可
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
得到如下的运行结果:
代码语言:javascript复制1号车占用一个停车位
2号车占用一个停车位
5号车占用一个停车位
3号车占用一个停车位
4号车占用一个停车位
2号车离开了
7号车占用一个停车位
1号车离开了
6号车占用一个停车位
5号车离开了
8号车占用一个停车位
4号车离开了
9号车占用一个停车位
3号车离开了
10号车占用一个停车位
6号车离开了
7号车离开了
8号车离开了
9号车离开了
10号车离开了
Semaphore源码分析
类关系图
构造函数
代码语言:javascript复制// 猜都不用猜就知道 permits 一定存储在 AQS 的 state 变量中
public Semaphore(int permits) {
// 实例化为非公平锁 -> state
sync = new NonfairSync(permits);
}
acquire()方法
代码语言:javascript复制// jdk 17
// Semaphore.java
abstract static class Sync extends AbstractQueuedSynchronizer {
final int nonfairTryAcquireShared(int acquires) {
// 自旋
for (;;) {
int available = getState(); // 获取令牌池的 state 值
int remaining = available - acquires;
if (remaining < 0 ||
// CAS 修改值
compareAndSetState(available, remaining))
// 修改成功才会返回 remaining
return remaining;
}
}
}
public void acquire() throws InterruptedException {
// 调用顶层 AQS 类内的方法
sync.acquireSharedInterruptibly(1);
}
// 在非公平锁中的处理方式
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
///////////////////////////////////////////////////////
// AQS.java
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted() ||
// 调用 Semaphore 中实现的 tryAcquireShared 方法
(tryAcquireShared(arg) < 0 &&
// 从 jdk 17 开始 oracle 对 acquire 方法进行了统一封装
acquire(null, arg, true, true, false, 0L) < 0))
throw new InterruptedException();
}
Semaphore 与 CountDownLatch 对 AQS 处理的唯一一个不同之处就在于 tryAcquireShared()
方法的实现上,Semaphore 是一种令牌的处理方式。
release()方法
代码语言:javascript复制// jdk 17
// Semaphore.java
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
}
public void release() {
sync.releaseShared(1);
}
///////////////////////////////////////////////////////
// AQS.java
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
signalNext(head);
return true;
}
return false;
}