官方解释:
- 一个计数信号量。在概念上,信号量维持一组许可证。如果有必要,每个
acquire()
都会阻塞,直到许可证可用,然后才能使用它。每个release()
添加许可证,潜在地释放阻塞获取方。但是,没有使用实际的许可证对象;Semaphore
只保留可用数量的计数,并相应地执行。 信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源
我记得考科目一的时候有一个大教室,这个教室只能同时允许两百人考试,当有一个考完之后,下一个才能进去进行考试。门口会有安检人员进行安检,这个Semaphore就相当于这个安检员。
也可以理解为停车场,停车场内的停车位是固定的,只有当一辆或多辆车开走之后外面等待的车才能进去停车。
用法:
代码语言:javascript复制1、定义三个资格
Semaphore semaphore = new Semaphore(3);
ThreadPoolExecutor poolExecutor =
new ThreadPoolExecutor(10, 20,
5000, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(100));
for (int i = 0; i < 10; i ) {
int finalI = i;
poolExecutor.execute(new Thread() {
@Override
public void run() {
try {
//获取执行资格
semaphore.acquire(1);
System.out.println(finalI "=========");
//模拟每个线程运行的时间
Thread.sleep(1000);
//释放执行资格
semaphore.release(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
poolExecutor.shutdown();
运行结果如下:(同一时刻只能运行三个线程。有点模糊,凑合看)
解析:
一、定义:
代码语言:javascript复制public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
Semaphroe底层也是用Sync类,默认是非公平的,也有公平的构造方法。
代码语言:javascript复制public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
定义的资格数其实是设置锁的状态值的(AQS之前已说过,维护锁状态值和线程等待队列)
代码语言:javascript复制abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}
}
二、为什么能限制同时执行的线程数量呢?
这就是acquire方法的用处了
代码语言:javascript复制public void acquire(int permits) {
sync.acquireSharedInterruptibly(permits);
}
点进acquireSharedInterruptibly这个方法看看:
代码语言:javascript复制public final void acquireSharedInterruptibly(int arg)
{
1、尝试获取锁,返回值小于0就是获取锁失败
if (tryAcquireShared(arg) < 0)
2、如果获取失败,则进入队列进行等待,之前已经解析过
doAcquireSharedInterruptibly(arg);
}
可以看到,跟之前CountDownLatch的await方法是一样的。
tryAcquireShared方法最终执行的如下方法:
代码语言:javascript复制final int nonfairTryAcquireShared(int acquires) {
for (;;) {
1、获取当前锁状态,锁状态值一开始是自定义的
int available = getState();
2、当前申请后剩余的锁状态值
int remaining = available - acquires;
if (3、如小于0,则申请失败,进入等待队列中
remaining < 0 ||
4、CAS替换锁状态值
compareAndSetState(available, remaining))
return remaining;
}
}
上述是非公平的,公平的只加了一个判断线程等待队列前是否有其它线程。排队一个一个来。
代码语言:javascript复制static final class FairSync extends Sync {
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
这个就是为什么Semaphore能控制当前并发线程的数量的原因。
三、释放锁
线程获取执行资格之后需要释放锁。这就是release方法的用处。不释放的话锁会一直被占用,其他线程就无法运行。
代码语言:javascript复制public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
点进releaseShared看看
代码语言:javascript复制public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
跟之前的CountDownLatch是一样的,只是实现不一样。Semaphore实现如下:
代码语言:javascript复制protected final boolean tryReleaseShared(int releases) {
for (;;) {
1、获取锁当前状态
int current = getState();
2、释放锁,直接相加
int next = current releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
3、用CAS更新锁状态
if (compareAndSetState(current, next))
return true;
}
}