Semaphore是什么?
Semaphore是一个计数信号量,底层依赖AQS
和CAS
来实现,可用来做限流。
初始化Semaphore时设置信号量数量,同一时间只有固定数量的线程可以持有信号量,没有获取信号量的线程进入等待队列排队等待,等持有信号量的线程释放了信号量,会唤醒等待队列中的线程持有信号量并执行。
1. 使用
代码语言:javascript复制public class SemaphoreTest {
static Semaphore semaphore = new Semaphore(2); // 限制只有2个信号量
public static void main(String[] args) {
new Thread(new MThread()).start();
new Thread(new MThread()).start();
new Thread(new MThread()).start();
new Thread(new MThread()).start();
new Thread(new MThread()).start();
new Thread(new MThread()).start();
new Thread(new MThread()).start();
}
public static class MThread implements Runnable{
@Override
public void run() {
try {
semaphore.acquire();
Thread.sleep(2000);
semaphore.release();
System.out.println(Thread.currentThread().getName() " release");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- 输出
out =>
Thread-0 release
Thread-1 release
Thread-2 release
Thread-4 release
Thread-6 release
Thread-3 release
Thread-5 release
2. 流程演示
- 初始化Semaphore时,赋值
state
- 线程
acquire
获取信号量时先判断state
-acquires
<0(acquiresd表示需要获取几个信号量,不传参默认获取一个),如果有剩余信号量,则直接执行。如果信号量都被占用,则进入到等待队列中进行等待并park挂起线程。 - 占用信号量的线程
release
释放了信号量时,会按照排队顺序unpark
唤醒等待队列中线程,等待挂起线程继续执行。
3. 重点源码分析
- 初始化 注意: 信号量与ReentrantLock相似,都有公平和非公平的实现方式,默认都是非公平的实现。
public Semaphore(int permits) {
sync = new NonfairSync(permits);//默认非公平
}
代码语言:javascript复制protected final void setState(int newState) {
state = newState; // 初始化state
}
- acquire 获取信号量
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
代码语言:javascript复制public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // 尝试获取信号量,获取成功执行CAS state-arg,并执行线程
doAcquireSharedInterruptibly(arg); // 获取失败,加入等待队列
}
代码语言:javascript复制private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 共享锁
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 挂起线程
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
代码语言:javascript复制private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 线程挂起
return Thread.interrupted();
}
- release
public void release() {
sync.releaseShared(1);
}
代码语言:javascript复制public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 释放信号量
doReleaseShared(); // 唤醒等待队列中的线程
return true;
}
return false;
}
代码语言:javascript复制private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h); // 唤醒线程
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}