分布式Semaphore

2021-03-23 10:52:38 浏览数 (1)

  1. semaphore的定义,意义
  2. 在没有juc semaphore之前怎么实现
  3. semaphore使用
  4. 分布式semaphore实现

信号量

最早用来解决进程同步与互斥问题的机制: 包括一个称为信号量的变量及对它进行的两个原语操作(PV操作)

什么是信号量?

信号量(semaphore)的数据结构为一个值和一个指针,指针指向等待该信号量的下一个进程。信号量的值与相应资源的使用情况有关。

PV操作由P操作原语和V操作原语组成(原语是不可中断的过程)

(注,P是荷兰语的Passeren,相当于英文的pass,V是荷兰语的Verhoog,相当于英文中的incremnet)

对信号量进行操作,具体定义如下:

  • P(S):
    • ①将信号量S的值减1,即S=S-1;
    • ②如果S>=0,则该进程继续执行;否则该进程置为等待状态,排入等待队列
  • V(S):
    • ①将信号量S的值加1,即S=S 1;
    • ②如果S>0,则该进程继续执行;否则释放队列中第一个等待信号量的进程

PV操作的意义:我们用信号量及PV操作来实现进程的同步和互斥。PV操作属于进程的低级通信

使用PV操作实现进程互斥时应该注意的是:

  1. 每个程序中用户实现互斥的P、V操作必须成对出现,先做P操作,进临界区,后做V操作,出临界区。若有多个分支,要认真检查其成对性
  2. P、V操作应分别紧靠临界区的头尾部,临界区的代码应尽可能短,不能有死循环
  3. 互斥信号量的初值一般为1
代码语言:javascript复制
//许可数量
private int permits = 1;

public synchronized void P() {
    permits--;
    if(permits < 0 ){
        try {
            wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public synchronized void V(){
    permits  ;
    if(permits <=0){
        notifyAll();
    }
}

J.U.C Semaphore

JUC提供了工具类之一就是Semaphore,提供了丰富的API,不再需要自己实现

代码语言:javascript复制
// 创建具有给定的许可数和非公平的公平设置的 Semaphore。
Semaphore(int permits)
// 创建具有给定的许可数和给定的公平设置的 Semaphore。
Semaphore(int permits, boolean fair)

// 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
void acquire()
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。
void acquire(int permits)
// 从此信号量中获取许可,在有可用的许可前将其阻塞。
void acquireUninterruptibly()
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。
void acquireUninterruptibly(int permits)
// 返回此信号量中当前可用的许可数。
int availablePermits()
// 获取并返回立即可用的所有许可。
int drainPermits()
// 返回一个 collection,包含可能等待获取的线程。
protected Collection<Thread> getQueuedThreads()
// 返回正在等待获取的线程的估计数目。
int getQueueLength()
// 查询是否有线程正在等待获取。
boolean hasQueuedThreads()
// 如果此信号量的公平设置为 true,则返回 true。
boolean isFair()
// 根据指定的缩减量减小可用许可的数目。
protected void reducePermits(int reduction)
// 释放一个许可,将其返回给信号量。
void release()
// 释放给定数目的许可,将其返回到信号量。
void release(int permits)
// 返回标识此信号量的字符串,以及信号量的状态。
String toString()
// 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
boolean tryAcquire()
// 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。
boolean tryAcquire(int permits)
// 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
// 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可。
boolean tryAcquire(long timeout, TimeUnit unit)

对于JUC的Semaphore源码,此篇不阐述了,另开新篇;但对分布式的Semaphore倒是可以研究下

分布式Semaphore

Redission中有对应的RSemaphore

代码语言:javascript复制
RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();

可过期信号量

代码语言:javascript复制
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 获取一个信号,有效期只有2秒钟。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);

直接上最本质的源码片段,lua脚本很简单,对信号量进行计数,acquire时,信号量减1,release时,信号量加1;主要是保证操作的原子性

代码语言:javascript复制
@Override
public RFuture<Boolean> tryAcquireAsync(int permits) {
    if (permits < 0) {
        throw new IllegalArgumentException("Permits amount can't be negative");
    }
    if (permits == 0) {
        return RedissonPromise.newSucceededFuture(true);
    }

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
              "local value = redis.call('get', KEYS[1]); "  
              "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then "  
                  "local val = redis.call('decrby', KEYS[1], ARGV[1]); "  
                  "return 1; "  
              "end; "  
              "return 0;",
              Collections.<Object>singletonList(getName()), permits);
}

@Override
public RFuture<Void> releaseAsync(int permits) {
    if (permits < 0) {
        throw new IllegalArgumentException("Permits amount can't be negative");
    }
    if (permits == 0) {
        return RedissonPromise.newSucceededFuture(null);
    }

    return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
        "local value = redis.call('incrby', KEYS[1], ARGV[1]); "  
        "redis.call('publish', KEYS[2], value); ",
        Arrays.<Object>asList(getName(), getChannelName()), permits);
}

在最本质的基础上,再深入看一下还做了哪些事,能真正达到一个工业生产标准

tryAcquire()

非阻塞式,有信息量就正常获取,没有刚快速返回,就是lua本质,没有做额外的事情

acquire()

代码语言:javascript复制
@Override
public void acquire(int permits) throws InterruptedException {
    if (tryAcquire(permits)) {
        return;
    }
    RFuture<RedissonLockEntry> future = subscribe();
    commandExecutor.syncSubscription(future);
    try {
        while (true) {
            if (tryAcquire(permits)) {
                return;
            }
            getEntry().getLatch().acquire(permits);
        }
    } finally {
        unsubscribe(future);
    }
}

阻塞式,相对非阻塞式就多了一些事

  • 1.先tryAcquire,看是否能获取到信号量
  • 2.订阅channel事件
  • 3.无限循环
    • 3.1.先tryAcquire(),尝试一下
    • 3.2.通过getEntry().getLatch(),也就是j.u.c.Semaphore,acquire()阻塞
  • 4.取消订阅

订阅事件内部细节,另开篇再说了,他的目的其实就是释放Semaphore

想像一下,同一个client的两个线程A,B 同时需要获取信号量,如果A成功获取,那么B将被Semaphore阻塞住了,何时退出阻塞呢?

就在线程A进行release()之后,会publish,细节可查看上面的release()中的lua脚本,当B监听到事件时,就会调用Semaphore.release(),再次进行tryAcquire()

tryAcquire(int permits, long waitTime, TimeUnit unit)

如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可

代码语言:javascript复制
@Override
public boolean tryAcquire(int permits, long waitTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();

    if (tryAcquire(permits)) {
        return true;
    }

    time -= (System.currentTimeMillis() - current);
    if (time <= 0) {
        return false;
    }

    current = System.currentTimeMillis();
    RFuture<RedissonLockEntry> future = subscribe();
    if (!await(future, time, TimeUnit.MILLISECONDS)) {
        return false;
    }

    try {
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            return false;
        }

        while (true) {
            current = System.currentTimeMillis();
            if (tryAcquire(permits)) {
                return true;
            }

            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                return false;
            }

            // waiting for message
            current = System.currentTimeMillis();

            getEntry().getLatch().tryAcquire(permits, time, TimeUnit.MILLISECONDS);

            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                return false;
            }
        }
    } finally {
        unsubscribe(future);
    }
//        return get(tryAcquireAsync(permits, waitTime, unit));
}

其实await(future, time, TimeUnit.MILLISECONDS)是使用的CountDownLatch

如果计数到达零,则返回 true;如果在计数到达零之前超过了等待时间,则返回 false

当前是第一个请求,或者别的释放,那就再往下进入循环

CountDownLatch.await() Semaphore.tryAcquire()配合使用

每一次等待时间后,都需要检查是否超过等待时间

为什么需要引入CountDownLatch.await()呢? 都使用Semaphore.tryAcquire()不行吗?这个需要再次深入挖掘了

总结

分布式信号量,原理很明了,主要还是通过lua保障redis操作的原子性

阅读redisson源码,发现里面的操作基本都是异步化,底层又是基于netty,大量使用了future模式,如果不知道future模式,会很绕,debug都会晕掉,所以在深入redisson之前,需要再对future模式温习一下

0 人点赞