- semaphore的定义,意义
- 在没有juc semaphore之前怎么实现
- semaphore使用
- 分布式semaphore实现
信号量
最早用来解决进程同步与互斥问题的机制: 包括一个称为信号量的变量及对它进行的两个原语操作(PV操作)
什么是信号量?
信号量(semaphore)的数据结构为一个值和一个指针,指针指向等待该信号量的下一个进程。信号量的值与相应资源的使用情况有关。
PV操作由P操作原语和V操作原语组成(原语是不可中断的过程)
(注,P是荷兰语的Passeren,相当于英文的pass,V是荷兰语的Verhoog,相当于英文中的incremnet)
对信号量进行操作,具体定义如下:
- P(S):
- ①将信号量S的值减1,即S=S-1;
- ②如果S>=0,则该进程继续执行;否则该进程置为等待状态,排入等待队列
- V(S):
- ①将信号量S的值加1,即S=S 1;
- ②如果S>0,则该进程继续执行;否则释放队列中第一个等待信号量的进程
PV操作的意义:我们用信号量及PV操作来实现进程的同步和互斥。PV操作属于进程的低级通信
使用PV操作实现进程互斥时应该注意的是:
- 每个程序中用户实现互斥的P、V操作必须成对出现,先做P操作,进临界区,后做V操作,出临界区。若有多个分支,要认真检查其成对性
- P、V操作应分别紧靠临界区的头尾部,临界区的代码应尽可能短,不能有死循环
- 互斥信号量的初值一般为1
//许可数量
private int permits = 1;
public synchronized void P() {
permits--;
if(permits < 0 ){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public synchronized void V(){
permits ;
if(permits <=0){
notifyAll();
}
}
J.U.C Semaphore
JUC提供了工具类之一就是Semaphore,提供了丰富的API,不再需要自己实现
代码语言:javascript复制// 创建具有给定的许可数和非公平的公平设置的 Semaphore。
Semaphore(int permits)
// 创建具有给定的许可数和给定的公平设置的 Semaphore。
Semaphore(int permits, boolean fair)
// 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
void acquire()
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。
void acquire(int permits)
// 从此信号量中获取许可,在有可用的许可前将其阻塞。
void acquireUninterruptibly()
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。
void acquireUninterruptibly(int permits)
// 返回此信号量中当前可用的许可数。
int availablePermits()
// 获取并返回立即可用的所有许可。
int drainPermits()
// 返回一个 collection,包含可能等待获取的线程。
protected Collection<Thread> getQueuedThreads()
// 返回正在等待获取的线程的估计数目。
int getQueueLength()
// 查询是否有线程正在等待获取。
boolean hasQueuedThreads()
// 如果此信号量的公平设置为 true,则返回 true。
boolean isFair()
// 根据指定的缩减量减小可用许可的数目。
protected void reducePermits(int reduction)
// 释放一个许可,将其返回给信号量。
void release()
// 释放给定数目的许可,将其返回到信号量。
void release(int permits)
// 返回标识此信号量的字符串,以及信号量的状态。
String toString()
// 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
boolean tryAcquire()
// 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。
boolean tryAcquire(int permits)
// 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
// 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可。
boolean tryAcquire(long timeout, TimeUnit unit)
对于JUC的Semaphore源码,此篇不阐述了,另开新篇;但对分布式的Semaphore倒是可以研究下
分布式Semaphore
Redission中有对应的RSemaphore
代码语言:javascript复制RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
可过期信号量
代码语言:javascript复制RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 获取一个信号,有效期只有2秒钟。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);
直接上最本质的源码片段,lua脚本很简单,对信号量进行计数,acquire时,信号量减1,release时,信号量加1;主要是保证操作的原子性
代码语言:javascript复制@Override
public RFuture<Boolean> tryAcquireAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
if (permits == 0) {
return RedissonPromise.newSucceededFuture(true);
}
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('get', KEYS[1]); "
"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then "
"local val = redis.call('decrby', KEYS[1], ARGV[1]); "
"return 1; "
"end; "
"return 0;",
Collections.<Object>singletonList(getName()), permits);
}
@Override
public RFuture<Void> releaseAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
if (permits == 0) {
return RedissonPromise.newSucceededFuture(null);
}
return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local value = redis.call('incrby', KEYS[1], ARGV[1]); "
"redis.call('publish', KEYS[2], value); ",
Arrays.<Object>asList(getName(), getChannelName()), permits);
}
在最本质的基础上,再深入看一下还做了哪些事,能真正达到一个工业生产标准
tryAcquire()
非阻塞式,有信息量就正常获取,没有刚快速返回,就是lua本质,没有做额外的事情
acquire()
代码语言:javascript复制@Override
public void acquire(int permits) throws InterruptedException {
if (tryAcquire(permits)) {
return;
}
RFuture<RedissonLockEntry> future = subscribe();
commandExecutor.syncSubscription(future);
try {
while (true) {
if (tryAcquire(permits)) {
return;
}
getEntry().getLatch().acquire(permits);
}
} finally {
unsubscribe(future);
}
}
阻塞式,相对非阻塞式就多了一些事
- 1.先tryAcquire,看是否能获取到信号量
- 2.订阅channel事件
- 3.无限循环
- 3.1.先tryAcquire(),尝试一下
- 3.2.通过getEntry().getLatch(),也就是j.u.c.Semaphore,acquire()阻塞
- 4.取消订阅
订阅事件内部细节,另开篇再说了,他的目的其实就是释放Semaphore
想像一下,同一个client的两个线程A,B 同时需要获取信号量,如果A成功获取,那么B将被Semaphore阻塞住了,何时退出阻塞呢?
就在线程A进行release()之后,会publish,细节可查看上面的release()中的lua脚本,当B监听到事件时,就会调用Semaphore.release(),再次进行tryAcquire()
tryAcquire(int permits, long waitTime, TimeUnit unit)
如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可
代码语言:javascript复制@Override
public boolean tryAcquire(int permits, long waitTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
if (tryAcquire(permits)) {
return true;
}
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
return false;
}
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> future = subscribe();
if (!await(future, time, TimeUnit.MILLISECONDS)) {
return false;
}
try {
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
return false;
}
while (true) {
current = System.currentTimeMillis();
if (tryAcquire(permits)) {
return true;
}
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
return false;
}
// waiting for message
current = System.currentTimeMillis();
getEntry().getLatch().tryAcquire(permits, time, TimeUnit.MILLISECONDS);
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
return false;
}
}
} finally {
unsubscribe(future);
}
// return get(tryAcquireAsync(permits, waitTime, unit));
}
其实await(future, time, TimeUnit.MILLISECONDS)是使用的CountDownLatch
如果计数到达零,则返回 true;如果在计数到达零之前超过了等待时间,则返回 false
当前是第一个请求,或者别的释放,那就再往下进入循环
CountDownLatch.await() Semaphore.tryAcquire()配合使用
每一次等待时间后,都需要检查是否超过等待时间
为什么需要引入CountDownLatch.await()呢? 都使用Semaphore.tryAcquire()不行吗?这个需要再次深入挖掘了
总结
分布式信号量,原理很明了,主要还是通过lua保障redis操作的原子性
阅读redisson源码,发现里面的操作基本都是异步化,底层又是基于netty,大量使用了future模式,如果不知道future模式,会很绕,debug都会晕掉,所以在深入redisson之前,需要再对future模式温习一下