前言
为了巩固AQS. 看一下Semaphore的源码.
简介
大部分都是直接翻译的官方代码注释,嘻嘻
一个计数的信号量.
概念上讲,信号量维护了一个许可证的集合. 每一个获取操作可能会阻塞,直到有许可证可用.
每一个释放操作,会添加一个许可证. 相当于隐式的释放一个阻塞的获取者.
信号量经常用于, 严格数量的线程访问资源. 比如下面是一个例子:
使用信号量来控制对一个对象池的访问.
(个人感觉,更像是使用信号量来实现一个对象池)
代码语言:javascript复制 class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
// Not a particularly efficient data structure; just for demo
protected Object[] items = ... whatever kinds of items being managed
protected boolean[] used = new boolean[MAX_AVAILABLE];
protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
在获取每一个Item之前,必须先从信号量获取一个许可证,保证有一个对象是可用的。
当线程使用完该对象,将其返回给对象池时, 同时返回给信号量一个许可证. 允许其他线程申请该对象.
注意: 如果没有acquire的线程,那么将阻止一个对象返还给对象池.
信号量封装了对对象吃的访问同步控制,但是池子本身的同步需要自己实现.
如果将一个信号量初始化为只有1个. 因为只有一个可用的许可证,所以信号量使用起来就像一个独占式的锁. 就是经常说的binary semaphore
.
因为他只有两种状态: 一个许可证可用, 没有许可证可用.
当使用binary semaphore
时, 他有以下的特性: “锁”可以被除了锁的持有者之外的线程释放.(因为信号量没有拥有者的概念)
这在某些特殊的上下文中是有用的, 比如死锁的恢复.
构造方法可以接受一个fairness
的参数,如果设置为false. 这个类不保证线程申请许可证的公平性. 一个线程申请许可证,可能比已经在等待的线程拿到的早.
当公平性设置为true. 线程获取许可证的顺序与他们调用acquire的顺序一致.
一般来讲, 信号量用来控制资源方法时, 应该被初始化为公平的。以保证没有线程饿死.
当使用信号量做其他类型的同步控制时,非公平顺序的吞吐量优势经常是比公平性更加重要的。
这个类还提供了一些方便的方法,比如一次性申请多个许可证的acquire
和release
方法.
这些方法比使用循环获取有更好的性能. 然而,他们不保证任何偏好顺序,比如,如果线程A调用了acquire(3)
, 线程B调用了acquire(2)
. 即将有两个许可证变得可用,没有保证说线程B会获取这两个许可证。除非线程B是首先进行申请的,且当前信号量是公平模式.
源码
Sync同步器
首先当前是最核心的同步器的实现了.
代码语言:javascript复制 /**
* Synchronization implementation for semaphore. Uses AQS state
* to represent permits. Subclassed into fair and nonfair
* versions.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
// 非公平模式的获取
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 剩余
int available = getState();
// 减去此次获取的值
int remaining = available - acquires;
// 没有剩余了. 或者获取成功,返回剩余数量.
// 这里的两个条件,一个是成功,一个是失败.
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 释放
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 当前
int current = getState();
// 释放后
int next = current releases;
// 溢出了
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 释放操作,成功返回true. 否则重试
if (compareAndSetState(current, next))
return true;
}
}
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
构造方法
初始化时提供一个许可证的数量. 将其设置为AQS的State.
nonfaireTryAcquireShared(int acquire)
非公平模式的获取许可证.
首先获取当前剩余数量,减去此次申请的值后, 如果小于0. 获取失败,返回缺少的数量. 如果大于0. 尝试更改状态,成功即返回.
tryReleaseShared(int release)
首先获取当前剩余数量,加上此次释放的数量. 如果溢出,报错. 之后进行CAS的设置状态操作.
其他两个非公用API用到的时候再看.
NonfaireSync 同步器
非公平模式的同步器.
代码语言:javascript复制 /**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
只是将AQS的tryAcquireShared
申请共享锁指向了在Sync
中实现的非公平模式获取.
FairSync 公平模式同步器
代码语言:javascript复制 /**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
公平模式的同步器,实现了公平模式的获取许可证.
- 如果已经有队列中的节点,直接返回获取失败.
- 其他和非公平模式一样,这样可以确保获取许可证的顺序和申请顺序是一致的.
构造方法
有点像ReentrantLock
的构造方法,可以指定公平或者非公平模式. 此外传入一个许可证的数量.
acquire系列.
- acquire() 获取许可证,调用AQS的
acquireSharedInterruptibly
. - acquireUninterruptibly(). 忽略中断的获取许可证.
- tryAcquire(). 尝试获取一次许可证
- tryAcquire(long timeout, TimeUnit unit). 带有超时的尝试获取许可证
- acquire(int permits). 一次性获取多个许可证.
- …上面方法的多个许可证版本
release系列
- release() 释放一个许可证. 调用AQS的
releaseShared
. - release(int permits). 一次性释放多个许可证.
总结
这是对AQS的又一个直接应用.
那么他是怎么定义State的呢?
初始化State为许可证的数量.
- 加锁,递减State. 只要State仍然大于0. 加锁即视为成功.
- 解锁, 递增State. 除了溢出肯定会成功.
完。